cellstate_core/
sync_pulse.rs

1//! Event-driven synchronization pulse for multi-agent coordination.
2//!
3//! A `SyncPulse` is emitted when multi-agent drift exceeds a threshold,
4//! a conflict is detected, or a watermark lag is observed. It carries
5//! the trigger condition and optional reconciliation results.
6//!
7//! SyncPulse integrates with:
8//! - `DriftMeter` (trigger: drift threshold breached)
9//! - `UpstreamSignal::ConflictDetected` (trigger: DAG conflict)
10//! - `ChangeRelay` watermarks (trigger: watermark lag)
11//!
12//! Re-export path: `cellstate_core::sync_pulse::*`
13
14use crate::identity::EntityIdType;
15use crate::{AgentId, EventId, TenantId, Timestamp};
16use serde::{Deserialize, Serialize};
17
18/// A synchronization pulse emitted when agents need to reconcile.
19#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
20#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
21pub struct SyncPulse {
22    /// Unique ID for this pulse (correlates with Event DAG entry).
23    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
24    pub pulse_id: EventId,
25    /// Tenant this pulse belongs to.
26    pub tenant_id: TenantId,
27    /// What triggered this synchronization pulse.
28    pub trigger: SyncTrigger,
29    /// Agents affected by this pulse.
30    pub affected_agents: Vec<AgentId>,
31    /// Optional drift measurement that triggered the pulse.
32    #[serde(default, skip_serializing_if = "Option::is_none")]
33    pub drift_meter: Option<crate::drift::DriftMeter>,
34    /// Reconciliation result (populated after resolution).
35    #[serde(default, skip_serializing_if = "Option::is_none")]
36    pub reconciliation: Option<Reconciliation>,
37    /// When this pulse was emitted.
38    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "date-time"))]
39    pub emitted_at: Timestamp,
40}
41
42/// What triggered the synchronization pulse.
43#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
44#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
45#[serde(tag = "type", rename_all = "snake_case")]
46pub enum SyncTrigger {
47    /// DriftMeter composite score exceeded threshold.
48    DriftThresholdBreached,
49
50    /// UpstreamSignal::ConflictDetected received from the Event DAG.
51    ConflictDetected {
52        /// First conflicting event branch.
53        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
54        branch_a: EventId,
55        /// Second conflicting event branch.
56        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
57        branch_b: EventId,
58    },
59
60    /// Operator or API manually requested synchronization.
61    ManualRequest,
62
63    /// Change Relay watermark delta between agents exceeds threshold.
64    WatermarkLag {
65        /// The agent that is lagging behind.
66        lagging_agent: AgentId,
67        /// Watermark delta (number of unprocessed changes).
68        delta: i64,
69    },
70}
71
72/// Result of a reconciliation attempt.
73#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
74#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
75pub struct Reconciliation {
76    /// Strategy used for reconciliation.
77    pub strategy: ReconciliationStrategy,
78    /// Number of events replayed during reconciliation.
79    pub events_replayed: u32,
80    /// Whether reconciliation succeeded.
81    pub success: bool,
82}
83
84/// Strategy used to reconcile diverged agent states.
85#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
86#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
87#[serde(rename_all = "snake_case")]
88pub enum ReconciliationStrategy {
89    /// Last writer wins (simplest, least safe).
90    LastWriterWins,
91    /// Replay both event streams and merge (event-sourced merge).
92    EventSourcedMerge,
93    /// Requires manual resolution by an operator or higher-autonomy agent.
94    ManualResolution,
95}
96
97impl SyncPulse {
98    /// Create a new SyncPulse from a drift threshold breach.
99    pub fn from_drift(tenant_id: TenantId, drift_meter: crate::drift::DriftMeter) -> Self {
100        Self {
101            pulse_id: EventId::now_v7(),
102            tenant_id,
103            trigger: SyncTrigger::DriftThresholdBreached,
104            affected_agents: vec![drift_meter.agent_a, drift_meter.agent_b],
105            drift_meter: Some(drift_meter),
106            reconciliation: None,
107            emitted_at: chrono::Utc::now(),
108        }
109    }
110
111    /// Create a new SyncPulse from a DAG conflict detection.
112    pub fn from_conflict(
113        tenant_id: TenantId,
114        branch_a: EventId,
115        branch_b: EventId,
116        affected_agents: Vec<AgentId>,
117    ) -> Self {
118        Self {
119            pulse_id: EventId::now_v7(),
120            tenant_id,
121            trigger: SyncTrigger::ConflictDetected { branch_a, branch_b },
122            affected_agents,
123            drift_meter: None,
124            reconciliation: None,
125            emitted_at: chrono::Utc::now(),
126        }
127    }
128
129    /// Create a new SyncPulse from a watermark lag detection.
130    pub fn from_watermark_lag(
131        tenant_id: TenantId,
132        lagging_agent: AgentId,
133        delta: i64,
134        affected_agents: Vec<AgentId>,
135    ) -> Self {
136        Self {
137            pulse_id: EventId::now_v7(),
138            tenant_id,
139            trigger: SyncTrigger::WatermarkLag {
140                lagging_agent,
141                delta,
142            },
143            affected_agents,
144            drift_meter: None,
145            reconciliation: None,
146            emitted_at: chrono::Utc::now(),
147        }
148    }
149
150    /// Attach a reconciliation result to this pulse.
151    pub fn with_reconciliation(mut self, reconciliation: Reconciliation) -> Self {
152        self.reconciliation = Some(reconciliation);
153        self
154    }
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::drift::{DriftInput, DriftMeter, DriftWeights};
161    use crate::DecayParams;
162
163    #[test]
164    fn test_sync_pulse_from_drift() {
165        let tenant = TenantId::now_v7();
166        let agent_a = AgentId::now_v7();
167        let agent_b = AgentId::now_v7();
168
169        let meter = DriftMeter::compute(DriftInput {
170            agent_a,
171            agent_b,
172            events_a: &[],
173            events_b: &[],
174            context_items_a: &[],
175            context_items_b: &[],
176            decay: &DecayParams::exponential(10.0),
177            weights: &DriftWeights::default(),
178            threshold: 0.85,
179        });
180
181        let pulse = SyncPulse::from_drift(tenant, meter);
182        assert_eq!(pulse.tenant_id, tenant);
183        assert_eq!(pulse.affected_agents.len(), 2);
184        assert!(pulse.drift_meter.is_some());
185        assert!(pulse.reconciliation.is_none());
186        assert!(matches!(pulse.trigger, SyncTrigger::DriftThresholdBreached));
187    }
188
189    #[test]
190    fn test_sync_pulse_from_conflict() {
191        let tenant = TenantId::now_v7();
192        let branch_a = EventId::now_v7();
193        let branch_b = EventId::now_v7();
194        let agents = vec![AgentId::now_v7()];
195
196        let pulse = SyncPulse::from_conflict(tenant, branch_a, branch_b, agents.clone());
197        match &pulse.trigger {
198            SyncTrigger::ConflictDetected {
199                branch_a: ba,
200                branch_b: bb,
201            } => {
202                assert_eq!(*ba, branch_a, "branch_a should be preserved");
203                assert_eq!(*bb, branch_b, "branch_b should be preserved");
204            }
205            other => panic!("expected ConflictDetected, got {:?}", other),
206        }
207        assert_eq!(pulse.affected_agents, agents);
208    }
209
210    #[test]
211    fn test_sync_pulse_from_watermark_lag() {
212        let tenant = TenantId::now_v7();
213        let lagging = AgentId::now_v7();
214        let agents = vec![lagging, AgentId::now_v7()];
215
216        let pulse = SyncPulse::from_watermark_lag(tenant, lagging, 150, agents);
217        match &pulse.trigger {
218            SyncTrigger::WatermarkLag {
219                lagging_agent,
220                delta,
221            } => {
222                assert_eq!(*lagging_agent, lagging, "lagging_agent should be preserved");
223                assert_eq!(*delta, 150, "delta should be preserved");
224            }
225            other => panic!("expected WatermarkLag, got {:?}", other),
226        }
227    }
228
229    #[test]
230    fn test_sync_pulse_with_reconciliation() {
231        let tenant = TenantId::now_v7();
232        let pulse = SyncPulse::from_watermark_lag(tenant, AgentId::now_v7(), 100, vec![])
233            .with_reconciliation(Reconciliation {
234                strategy: ReconciliationStrategy::LastWriterWins,
235                events_replayed: 42,
236                success: true,
237            });
238
239        assert!(pulse.reconciliation.is_some());
240        let recon = pulse.reconciliation.unwrap();
241        assert_eq!(recon.strategy, ReconciliationStrategy::LastWriterWins);
242        assert_eq!(recon.events_replayed, 42);
243        assert!(recon.success);
244    }
245
246    #[test]
247    fn test_sync_trigger_serde_roundtrip() {
248        let branch_a = EventId::now_v7();
249        let branch_b = EventId::now_v7();
250        let trigger = SyncTrigger::ConflictDetected { branch_a, branch_b };
251        let json = serde_json::to_string(&trigger).unwrap();
252        let deserialized: SyncTrigger = serde_json::from_str(&json).unwrap();
253        match deserialized {
254            SyncTrigger::ConflictDetected {
255                branch_a: ba,
256                branch_b: bb,
257            } => {
258                assert_eq!(ba, branch_a, "branch_a should survive serde roundtrip");
259                assert_eq!(bb, branch_b, "branch_b should survive serde roundtrip");
260            }
261            other => panic!("expected ConflictDetected after roundtrip, got {:?}", other),
262        }
263    }
264}