cellstate/entity/
message.rs1use anyhow::Result;
6use clap::{Arg, ArgMatches, Command};
7
8use cellstate_core::api_types::{ListMessagesResponse, MessageResponse, SendMessageRequest};
9use cellstate_core::{AgentId, AgentTarget, AgentType, MessagePriority, MessageType};
10
11use super::client::ApiClient;
12use super::output::OutputConfig;
13
14const BASE: &str = "/api/v1/messages";
15
16pub fn build_command() -> Command {
17 Command::new("message")
18 .about("Manage inter-agent messages")
19 .subcommand_required(true)
20 .subcommand(
21 Command::new("create")
22 .about("Send a new message")
23 .arg(
24 Arg::new("from-agent-id")
25 .long("from-agent-id")
26 .required(true)
27 .help("Sender agent ID"),
28 )
29 .arg(
30 Arg::new("to-agent-id")
31 .long("to-agent-id")
32 .help("Recipient agent ID (mutually exclusive with --to-agent-type)"),
33 )
34 .arg(
35 Arg::new("to-agent-type")
36 .long("to-agent-type")
37 .help("Recipient agent type (mutually exclusive with --to-agent-id)"),
38 )
39 .arg(
40 Arg::new("message-type")
41 .long("message-type")
42 .required(true)
43 .help("Message type (task_delegation, task_result, context_request, context_share, coordination_signal, handoff, interrupt, heartbeat)"),
44 )
45 .arg(
46 Arg::new("payload")
47 .required(true)
48 .help("Message payload (JSON string)"),
49 )
50 .arg(
51 Arg::new("trajectory-id")
52 .long("trajectory-id")
53 .help("Related trajectory ID"),
54 )
55 .arg(
56 Arg::new("scope-id")
57 .long("scope-id")
58 .help("Related scope ID"),
59 )
60 .arg(
61 Arg::new("priority")
62 .long("priority")
63 .default_value("normal")
64 .help("Priority (low, normal, high, critical)"),
65 )
66 .arg(
67 Arg::new("expires-at")
68 .long("expires-at")
69 .help("Expiration timestamp (ISO 8601)"),
70 ),
71 )
72 .subcommand(
73 Command::new("list")
74 .about("List messages")
75 .arg(
76 Arg::new("message-type")
77 .long("message-type")
78 .help("Filter by message type"),
79 )
80 .arg(
81 Arg::new("from-agent-id")
82 .long("from-agent-id")
83 .help("Filter by sender agent"),
84 )
85 .arg(
86 Arg::new("to-agent-id")
87 .long("to-agent-id")
88 .help("Filter by recipient agent ID"),
89 )
90 .arg(
91 Arg::new("to-agent-type")
92 .long("to-agent-type")
93 .help("Filter by recipient agent type"),
94 )
95 .arg(
96 Arg::new("trajectory-id")
97 .long("trajectory-id")
98 .help("Filter by trajectory"),
99 )
100 .arg(
101 Arg::new("priority")
102 .long("priority")
103 .help("Filter by priority"),
104 )
105 .arg(
106 Arg::new("undelivered-only")
107 .long("undelivered-only")
108 .action(clap::ArgAction::SetTrue)
109 .help("Only return undelivered messages"),
110 )
111 .arg(
112 Arg::new("unacknowledged-only")
113 .long("unacknowledged-only")
114 .action(clap::ArgAction::SetTrue)
115 .help("Only return unacknowledged messages"),
116 )
117 .arg(
118 Arg::new("limit")
119 .long("limit")
120 .default_value("20")
121 .help("Max results"),
122 )
123 .arg(
124 Arg::new("offset")
125 .long("offset")
126 .default_value("0")
127 .help("Offset"),
128 ),
129 )
130 .subcommand(
131 Command::new("get")
132 .about("Get message details")
133 .arg(Arg::new("id").required(true).help("Message ID")),
134 )
135 .subcommand(
136 Command::new("acknowledge")
137 .about("Acknowledge a message")
138 .arg(Arg::new("id").required(true).help("Message ID")),
139 )
140 .subcommand(
141 Command::new("deliver")
142 .about("Mark a message as delivered")
143 .arg(Arg::new("id").required(true).help("Message ID")),
144 )
145}
146
147pub async fn dispatch(
148 matches: &ArgMatches,
149 client: &ApiClient,
150 output: &OutputConfig,
151 _session: &crate::session::CliSession,
152) -> Result<()> {
153 match matches.subcommand() {
154 Some(("create", sub)) => {
155 let from_agent_id: AgentId = sub.get_one::<String>("from-agent-id").unwrap().parse()?;
156
157 let to = if let Some(id_str) = sub.get_one::<String>("to-agent-id") {
158 let agent_id: AgentId = id_str.parse()?;
159 Some(AgentTarget::ById(agent_id))
160 } else if let Some(type_str) = sub.get_one::<String>("to-agent-type") {
161 let agent_type: AgentType = type_str.parse().unwrap(); Some(AgentTarget::ByType(agent_type))
163 } else {
164 None
165 };
166
167 let message_type: MessageType =
168 sub.get_one::<String>("message-type").unwrap().parse()?;
169 let priority: MessagePriority = sub.get_one::<String>("priority").unwrap().parse()?;
170
171 let req = SendMessageRequest {
172 from_agent_id,
173 to,
174 message_type,
175 payload: sub.get_one::<String>("payload").unwrap().clone(),
176 trajectory_id: sub
177 .get_one::<String>("trajectory-id")
178 .map(|s| s.parse())
179 .transpose()?,
180 scope_id: sub
181 .get_one::<String>("scope-id")
182 .map(|s| s.parse())
183 .transpose()?,
184 artifact_ids: vec![],
185 priority,
186 expires_at: sub
187 .get_one::<String>("expires-at")
188 .map(|s| s.parse())
189 .transpose()?,
190 };
191 let resp: MessageResponse = client.post(BASE, &req).await?;
192 output.print(&resp);
193 }
194 Some(("list", sub)) => {
195 let limit: i64 = sub.get_one::<String>("limit").unwrap().parse()?;
196 let offset: i64 = sub.get_one::<String>("offset").unwrap().parse()?;
197 let mut path = format!("{BASE}?limit={limit}&offset={offset}");
198 if let Some(mt) = sub.get_one::<String>("message-type") {
199 path.push_str(&format!("&message_type={mt}"));
200 }
201 if let Some(from) = sub.get_one::<String>("from-agent-id") {
202 path.push_str(&format!("&from_agent_id={from}"));
203 }
204 if let Some(to_id) = sub.get_one::<String>("to-agent-id") {
205 path.push_str(&format!("&to_agent_id={to_id}"));
206 }
207 if let Some(to_type) = sub.get_one::<String>("to-agent-type") {
208 path.push_str(&format!("&to_agent_type={to_type}"));
209 }
210 if let Some(tid) = sub.get_one::<String>("trajectory-id") {
211 path.push_str(&format!("&trajectory_id={tid}"));
212 }
213 if let Some(p) = sub.get_one::<String>("priority") {
214 path.push_str(&format!("&priority={p}"));
215 }
216 if sub.get_flag("undelivered-only") {
217 path.push_str("&undelivered_only=true");
218 }
219 if sub.get_flag("unacknowledged-only") {
220 path.push_str("&unacknowledged_only=true");
221 }
222 let resp: ListMessagesResponse = client.get(&path).await?;
223 output.print_list(&resp.messages, resp.total);
224 }
225 Some(("get", sub)) => {
226 let id = sub.get_one::<String>("id").unwrap();
227 let resp: MessageResponse = client.get(&format!("{BASE}/{id}")).await?;
228 output.print(&resp);
229 }
230 Some(("acknowledge", sub)) => {
231 let id = sub.get_one::<String>("id").unwrap();
232 let resp: serde_json::Value = client
233 .post_no_body_raw(&format!("{BASE}/{id}/acknowledge"))
234 .await?;
235 output.print_value(&resp);
236 }
237 Some(("deliver", sub)) => {
238 let id = sub.get_one::<String>("id").unwrap();
239 let resp: serde_json::Value = client
240 .post_no_body_raw(&format!("{BASE}/{id}/deliver"))
241 .await?;
242 output.print_value(&resp);
243 }
244 _ => unreachable!("subcommand_required(true) prevents this"),
245 }
246 Ok(())
247}