1use crate::identity::EntityIdType;
15use crate::{AgentId, EventId, TenantId, Timestamp};
16use serde::{Deserialize, Serialize};
17
18#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
20#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
21pub struct SyncPulse {
22 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
24 pub pulse_id: EventId,
25 pub tenant_id: TenantId,
27 pub trigger: SyncTrigger,
29 pub affected_agents: Vec<AgentId>,
31 #[serde(default, skip_serializing_if = "Option::is_none")]
33 pub drift_meter: Option<crate::drift::DriftMeter>,
34 #[serde(default, skip_serializing_if = "Option::is_none")]
36 pub reconciliation: Option<Reconciliation>,
37 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "date-time"))]
39 pub emitted_at: Timestamp,
40}
41
42#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
44#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum SyncTrigger {
47 DriftThresholdBreached,
49
50 ConflictDetected {
52 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
54 branch_a: EventId,
55 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
57 branch_b: EventId,
58 },
59
60 ManualRequest,
62
63 WatermarkLag {
65 lagging_agent: AgentId,
67 delta: i64,
69 },
70}
71
72#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
74#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
75pub struct Reconciliation {
76 pub strategy: ReconciliationStrategy,
78 pub events_replayed: u32,
80 pub success: bool,
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
87#[serde(rename_all = "snake_case")]
88pub enum ReconciliationStrategy {
89 LastWriterWins,
91 EventSourcedMerge,
93 ManualResolution,
95}
96
97impl SyncPulse {
98 pub fn from_drift(tenant_id: TenantId, drift_meter: crate::drift::DriftMeter) -> Self {
100 Self {
101 pulse_id: EventId::now_v7(),
102 tenant_id,
103 trigger: SyncTrigger::DriftThresholdBreached,
104 affected_agents: vec![drift_meter.agent_a, drift_meter.agent_b],
105 drift_meter: Some(drift_meter),
106 reconciliation: None,
107 emitted_at: chrono::Utc::now(),
108 }
109 }
110
111 pub fn from_conflict(
113 tenant_id: TenantId,
114 branch_a: EventId,
115 branch_b: EventId,
116 affected_agents: Vec<AgentId>,
117 ) -> Self {
118 Self {
119 pulse_id: EventId::now_v7(),
120 tenant_id,
121 trigger: SyncTrigger::ConflictDetected { branch_a, branch_b },
122 affected_agents,
123 drift_meter: None,
124 reconciliation: None,
125 emitted_at: chrono::Utc::now(),
126 }
127 }
128
129 pub fn from_watermark_lag(
131 tenant_id: TenantId,
132 lagging_agent: AgentId,
133 delta: i64,
134 affected_agents: Vec<AgentId>,
135 ) -> Self {
136 Self {
137 pulse_id: EventId::now_v7(),
138 tenant_id,
139 trigger: SyncTrigger::WatermarkLag {
140 lagging_agent,
141 delta,
142 },
143 affected_agents,
144 drift_meter: None,
145 reconciliation: None,
146 emitted_at: chrono::Utc::now(),
147 }
148 }
149
150 pub fn with_reconciliation(mut self, reconciliation: Reconciliation) -> Self {
152 self.reconciliation = Some(reconciliation);
153 self
154 }
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use crate::drift::{DriftInput, DriftMeter, DriftWeights};
161 use crate::DecayParams;
162
163 #[test]
164 fn test_sync_pulse_from_drift() {
165 let tenant = TenantId::now_v7();
166 let agent_a = AgentId::now_v7();
167 let agent_b = AgentId::now_v7();
168
169 let meter = DriftMeter::compute(DriftInput {
170 agent_a,
171 agent_b,
172 events_a: &[],
173 events_b: &[],
174 context_items_a: &[],
175 context_items_b: &[],
176 decay: &DecayParams::exponential(10.0),
177 weights: &DriftWeights::default(),
178 threshold: 0.85,
179 });
180
181 let pulse = SyncPulse::from_drift(tenant, meter);
182 assert_eq!(pulse.tenant_id, tenant);
183 assert_eq!(pulse.affected_agents.len(), 2);
184 assert!(pulse.drift_meter.is_some());
185 assert!(pulse.reconciliation.is_none());
186 assert!(matches!(pulse.trigger, SyncTrigger::DriftThresholdBreached));
187 }
188
189 #[test]
190 fn test_sync_pulse_from_conflict() {
191 let tenant = TenantId::now_v7();
192 let branch_a = EventId::now_v7();
193 let branch_b = EventId::now_v7();
194 let agents = vec![AgentId::now_v7()];
195
196 let pulse = SyncPulse::from_conflict(tenant, branch_a, branch_b, agents.clone());
197 match &pulse.trigger {
198 SyncTrigger::ConflictDetected {
199 branch_a: ba,
200 branch_b: bb,
201 } => {
202 assert_eq!(*ba, branch_a, "branch_a should be preserved");
203 assert_eq!(*bb, branch_b, "branch_b should be preserved");
204 }
205 other => panic!("expected ConflictDetected, got {:?}", other),
206 }
207 assert_eq!(pulse.affected_agents, agents);
208 }
209
210 #[test]
211 fn test_sync_pulse_from_watermark_lag() {
212 let tenant = TenantId::now_v7();
213 let lagging = AgentId::now_v7();
214 let agents = vec![lagging, AgentId::now_v7()];
215
216 let pulse = SyncPulse::from_watermark_lag(tenant, lagging, 150, agents);
217 match &pulse.trigger {
218 SyncTrigger::WatermarkLag {
219 lagging_agent,
220 delta,
221 } => {
222 assert_eq!(*lagging_agent, lagging, "lagging_agent should be preserved");
223 assert_eq!(*delta, 150, "delta should be preserved");
224 }
225 other => panic!("expected WatermarkLag, got {:?}", other),
226 }
227 }
228
229 #[test]
230 fn test_sync_pulse_with_reconciliation() {
231 let tenant = TenantId::now_v7();
232 let pulse = SyncPulse::from_watermark_lag(tenant, AgentId::now_v7(), 100, vec![])
233 .with_reconciliation(Reconciliation {
234 strategy: ReconciliationStrategy::LastWriterWins,
235 events_replayed: 42,
236 success: true,
237 });
238
239 assert!(pulse.reconciliation.is_some());
240 let recon = pulse.reconciliation.unwrap();
241 assert_eq!(recon.strategy, ReconciliationStrategy::LastWriterWins);
242 assert_eq!(recon.events_replayed, 42);
243 assert!(recon.success);
244 }
245
246 #[test]
247 fn test_sync_trigger_serde_roundtrip() {
248 let branch_a = EventId::now_v7();
249 let branch_b = EventId::now_v7();
250 let trigger = SyncTrigger::ConflictDetected { branch_a, branch_b };
251 let json = serde_json::to_string(&trigger).unwrap();
252 let deserialized: SyncTrigger = serde_json::from_str(&json).unwrap();
253 match deserialized {
254 SyncTrigger::ConflictDetected {
255 branch_a: ba,
256 branch_b: bb,
257 } => {
258 assert_eq!(ba, branch_a, "branch_a should survive serde roundtrip");
259 assert_eq!(bb, branch_b, "branch_b should survive serde roundtrip");
260 }
261 other => panic!("expected ConflictDetected after roundtrip, got {:?}", other),
262 }
263 }
264}