cellstate_core/
event.rs

1//! Event types for the Event DAG system.
2//!
3//! The Event DAG provides unidirectional event flow with upstream signaling
4//! (the "tram car tracks" pattern).
5//!
6//! # Event Header
7//!
8//! The `EventHeader` is a 64-byte cache-aligned structure for optimal memory access.
9//! It contains all metadata needed to process an event without accessing the payload.
10
11use bitflags::bitflags;
12use serde::{Deserialize, Serialize};
13use std::fmt;
14use uuid::Uuid;
15
16use crate::identity::EntityIdType;
17use crate::EventId;
18
19// ============================================================================
20// EVENT IDENTIFICATION
21// ============================================================================
22
23// ============================================================================
24// DAG POSITION
25// ============================================================================
26
27/// Position of an event in the DAG.
28///
29/// The DAG position uniquely identifies where an event sits in the event graph.
30/// - `depth`: Distance from the root event (0 = root)
31/// - `lane`: Parallel track for fan-out scenarios
32/// - `sequence`: Monotonic counter within a lane
33#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
34#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
35#[repr(C)]
36pub struct DagPosition {
37    /// Distance from the root event (0 = root event)
38    pub depth: u32,
39    /// Parallel track number for fan-out (0 = main track)
40    pub lane: u32,
41    /// Monotonic sequence number within the lane
42    pub sequence: u32,
43}
44
45impl DagPosition {
46    /// Create a new DAG position.
47    pub const fn new(depth: u32, lane: u32, sequence: u32) -> Self {
48        Self {
49            depth,
50            lane,
51            sequence,
52        }
53    }
54
55    /// Create the root position.
56    pub const fn root() -> Self {
57        Self::new(0, 0, 0)
58    }
59
60    /// Create a child position on the same lane.
61    pub const fn child(&self, sequence: u32) -> Self {
62        Self::new(self.depth + 1, self.lane, sequence)
63    }
64
65    /// Create a position on a new lane (fork).
66    pub const fn fork(&self, new_lane: u32, sequence: u32) -> Self {
67        Self::new(self.depth + 1, new_lane, sequence)
68    }
69
70    /// Check if this position is a potential ancestor of another based on depth and lane.
71    ///
72    /// Returns `true` if `self` is at a shallower depth on the same lane as `other`.
73    /// Note: this is a necessary but not sufficient condition for true ancestry —
74    /// it does not verify sequence continuity along the parent chain.
75    pub const fn is_ancestor_of(&self, other: &Self) -> bool {
76        self.depth < other.depth && self.lane == other.lane
77    }
78
79    /// Check if this is the root position.
80    pub const fn is_root(&self) -> bool {
81        self.depth == 0
82    }
83}
84
85impl Default for DagPosition {
86    fn default() -> Self {
87        Self::root()
88    }
89}
90
91impl fmt::Display for DagPosition {
92    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
93        write!(f, "{}:{}:{}", self.depth, self.lane, self.sequence)
94    }
95}
96
97// ============================================================================
98// EVENT KIND
99// ============================================================================
100
101/// Event kind encoded as a 16-bit value.
102///
103/// The upper 4 bits encode the category, the lower 12 bits encode the specific type.
104/// This allows 16 categories with 4096 event types each (65536 total).
105///
106/// Category allocation:
107/// - 0x0xxx: System events
108/// - 0x1xxx: Trajectory events
109/// - 0x2xxx: Scope events
110/// - 0x3xxx: Artifact events
111/// - 0x4xxx: Note events
112/// - 0x5xxx: Turn events
113/// - 0x6xxx: Agent events
114/// - 0x7xxx: Lock events
115/// - 0x8xxx: Message events
116/// - 0x9xxx: Delegation events
117/// - 0xAxxx: Handoff events
118/// - 0xBxxx: Edge events
119/// - 0xCxxx: Evolution events
120/// - 0xDxxx: Effect events (errors, compensations)
121/// - 0xExxx: Reserved
122/// - 0xFxxx: Custom/extension events
123#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
124#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
125#[repr(transparent)]
126pub struct EventKind(pub u16);
127
128impl EventKind {
129    // Generic data event (neutral/unspecified kind)
130    pub const DATA: Self = Self(0x0000);
131
132    // System events (0x0xxx)
133    pub const SYSTEM_INIT: Self = Self(0x0001);
134    pub const SYSTEM_SHUTDOWN: Self = Self(0x0002);
135    pub const SYSTEM_HEARTBEAT: Self = Self(0x0003);
136    pub const SYSTEM_CONFIG_CHANGE: Self = Self(0x0004);
137
138    // Trajectory events (0x1xxx)
139    pub const TRAJECTORY_CREATED: Self = Self(0x1001);
140    pub const TRAJECTORY_UPDATED: Self = Self(0x1002);
141    pub const TRAJECTORY_COMPLETED: Self = Self(0x1003);
142    pub const TRAJECTORY_FAILED: Self = Self(0x1004);
143    pub const TRAJECTORY_SUSPENDED: Self = Self(0x1005);
144    pub const TRAJECTORY_RESUMED: Self = Self(0x1006);
145    pub const TRAJECTORY_DELETED: Self = Self(0x1007);
146
147    // Scope events (0x2xxx)
148    pub const SCOPE_CREATED: Self = Self(0x2001);
149    pub const SCOPE_UPDATED: Self = Self(0x2002);
150    pub const SCOPE_CLOSED: Self = Self(0x2003);
151    pub const SCOPE_CHECKPOINTED: Self = Self(0x2004);
152    /// Context window was repaged mid-scope (sections evicted/promoted).
153    pub const SCOPE_CONTEXT_PAGED: Self = Self(0x2005);
154
155    // Artifact events (0x3xxx)
156    pub const ARTIFACT_CREATED: Self = Self(0x3001);
157    pub const ARTIFACT_UPDATED: Self = Self(0x3002);
158    pub const ARTIFACT_SUPERSEDED: Self = Self(0x3003);
159    pub const ARTIFACT_DELETED: Self = Self(0x3004);
160
161    // Note events (0x4xxx)
162    pub const NOTE_CREATED: Self = Self(0x4001);
163    pub const NOTE_UPDATED: Self = Self(0x4002);
164    pub const NOTE_SUPERSEDED: Self = Self(0x4003);
165    pub const NOTE_DELETED: Self = Self(0x4004);
166    pub const NOTE_ACCESSED: Self = Self(0x4005);
167
168    // Turn events (0x5xxx)
169    pub const TURN_CREATED: Self = Self(0x5001);
170
171    // Agent events (0x6xxx)
172    pub const AGENT_REGISTERED: Self = Self(0x6001);
173    pub const AGENT_UPDATED: Self = Self(0x6002);
174    pub const AGENT_UNREGISTERED: Self = Self(0x6003);
175    pub const AGENT_STATUS_CHANGED: Self = Self(0x6004);
176
177    // BDI Goal events (0x6xxx — agent subcategory)
178    pub const GOAL_CREATED: Self = Self(0x6010);
179    pub const GOAL_ACTIVATED: Self = Self(0x6011);
180    pub const GOAL_ACHIEVED: Self = Self(0x6012);
181    pub const GOAL_FAILED: Self = Self(0x6013);
182
183    // BDI Plan events (0x6xxx — agent subcategory)
184    pub const PLAN_CREATED: Self = Self(0x6020);
185    pub const PLAN_STARTED: Self = Self(0x6021);
186    pub const PLAN_COMPLETED: Self = Self(0x6022);
187    pub const PLAN_FAILED: Self = Self(0x6023);
188
189    // BDI Step events (0x6xxx — agent subcategory)
190    pub const STEP_COMPLETED: Self = Self(0x6030);
191    pub const STEP_FAILED: Self = Self(0x6031);
192
193    // BDI Belief events (0x6xxx — agent subcategory)
194    pub const BELIEF_CREATED: Self = Self(0x6040);
195    pub const BELIEF_SUPERSEDED: Self = Self(0x6041);
196
197    // BDI Deliberation events (0x6xxx — agent subcategory)
198    pub const DELIBERATION_COMPLETED: Self = Self(0x6050);
199    pub const ENGINE_STATE_PERSISTED: Self = Self(0x6051);
200
201    // Synchronization events (0x6xxx — agent subcategory)
202    /// SyncPulse emitted when multi-agent drift exceeds threshold.
203    pub const SYNC_PULSE_EMITTED: Self = Self(0x6060);
204    /// SyncPulse reconciliation completed.
205    pub const SYNC_PULSE_RECONCILED: Self = Self(0x6061);
206
207    // Lock events (0x7xxx)
208    pub const LOCK_ACQUIRED: Self = Self(0x7001);
209    pub const LOCK_EXTENDED: Self = Self(0x7002);
210    pub const LOCK_RELEASED: Self = Self(0x7003);
211    pub const LOCK_EXPIRED: Self = Self(0x7004);
212    pub const LOCK_CONTENTION: Self = Self(0x7005);
213
214    // Message events (0x8xxx)
215    pub const MESSAGE_SENT: Self = Self(0x8001);
216    pub const MESSAGE_DELIVERED: Self = Self(0x8002);
217    pub const MESSAGE_ACKNOWLEDGED: Self = Self(0x8003);
218    pub const MESSAGE_EXPIRED: Self = Self(0x8004);
219
220    // Delegation events (0x9xxx)
221    pub const DELEGATION_CREATED: Self = Self(0x9001);
222    pub const DELEGATION_ACCEPTED: Self = Self(0x9002);
223    pub const DELEGATION_REJECTED: Self = Self(0x9003);
224    pub const DELEGATION_STARTED: Self = Self(0x9004);
225    pub const DELEGATION_COMPLETED: Self = Self(0x9005);
226    pub const DELEGATION_FAILED: Self = Self(0x9006);
227
228    // Handoff events (0xAxxx)
229    pub const HANDOFF_CREATED: Self = Self(0xA001);
230    pub const HANDOFF_ACCEPTED: Self = Self(0xA002);
231    pub const HANDOFF_REJECTED: Self = Self(0xA003);
232    pub const HANDOFF_COMPLETED: Self = Self(0xA004);
233
234    // Edge events (0xBxxx)
235    pub const EDGE_CREATED: Self = Self(0xB001);
236    pub const EDGE_UPDATED: Self = Self(0xB002);
237    pub const EDGE_DELETED: Self = Self(0xB003);
238
239    // Evolution events (0xCxxx)
240    pub const EVOLUTION_SNAPSHOT_CREATED: Self = Self(0xC001);
241    pub const EVOLUTION_PHASE_CHANGED: Self = Self(0xC002);
242
243    // Effect events (0xDxxx)
244    pub const EFFECT_ERROR: Self = Self(0xD001);
245    pub const EFFECT_RETRY: Self = Self(0xD002);
246    pub const EFFECT_COMPENSATE: Self = Self(0xD003);
247    pub const EFFECT_ACK: Self = Self(0xD004);
248    pub const EFFECT_BACKPRESSURE: Self = Self(0xD005);
249    pub const EFFECT_CANCEL: Self = Self(0xD006);
250    pub const EFFECT_CONFLICT: Self = Self(0xD007);
251
252    // Cache invalidation events (0xExxx)
253    pub const CACHE_INVALIDATE_TRAJECTORY: Self = Self(0xE001);
254    pub const CACHE_INVALIDATE_SCOPE: Self = Self(0xE002);
255    pub const CACHE_INVALIDATE_ARTIFACT: Self = Self(0xE003);
256    pub const CACHE_INVALIDATE_NOTE: Self = Self(0xE004);
257
258    // Working set events (0xF1xx range)
259    pub const WORKING_SET_UPDATED: Self = Self(0xF101);
260    pub const WORKING_SET_DELETED: Self = Self(0xF102);
261
262    // Bash execution events (0xF2xx range)
263    //
264    // Bash commands are IO operations — execution happens in the SDK's
265    // TypeScript sandbox (just-bash). Only file writes flow through
266    // MutationPipeline. These events record the execution for audit trail.
267    //
268    // Event flow:
269    // 1. BASH_EXECUTION_STARTED → command begins (correlation_id links to scope)
270    // 2. BASH_EXECUTION_COMPLETED → command finishes (exit code, duration, files modified)
271    // 3. BASH_FILE_WRITTEN → for each file write, a separate mutation event
272    pub const BASH_EXECUTION_STARTED: Self = Self(0xF201);
273    pub const BASH_EXECUTION_COMPLETED: Self = Self(0xF202);
274    pub const BASH_FILE_WRITTEN: Self = Self(0xF203);
275
276    // Browser execution events (0xF4xx range)
277    pub const BROWSER_ACTION_COMPLETED: Self = Self(0xF401);
278
279    // WebMCP events (0xF5xx range)
280    pub const WEB_MCP_TOOL_EXECUTED: Self = Self(0xF501);
281    pub const WEB_MCP_DISCOVERY_SNAPSHOT: Self = Self(0xF502);
282
283    // Proxy / reasoning trace events (0xF3xx range)
284    //
285    // Captured from upstream LLM providers (e.g., OpenRouter) when models
286    // emit extended thinking / reasoning traces alongside their responses.
287    // The proxy intercepts these traces and records them as first-class events
288    // for observability, audit trails, and real-time MemoryStream subscribers.
289    pub const REASONING_TRACE: Self = Self(0xF301);
290
291    // AG-UI protocol events (0xF0xx range - reserved for protocols)
292    //
293    // CopilotKit/AG-UI protocol compatibility layer for agent lifecycle and state management.
294    //
295    // Protocol mapping to CELLSTATE concepts:
296    // - AG_UI_RUN_STARTED    => Scope created event (trajectory begins)
297    // - AG_UI_STEP_STARTED   => Turn started event (agent begins processing)
298    // - AG_UI_STEP_FINISHED  => Turn completed event (agent finishes processing)
299    // - AG_UI_RUN_FINISHED   => Scope closed event (trajectory completes)
300    // - AG_UI_RUN_ERROR      => Trajectory failed event (unrecoverable error)
301    // - AG_UI_INTERRUPT      => Effect::Pending checkpoint (pause for user input)
302    // - AG_UI_STATE_SNAPSHOT => Working set snapshot event (full state capture)
303    // - AG_UI_STATE_DELTA    => Working set delta event (incremental state update)
304    pub const AG_UI_RUN_STARTED: Self = Self(0xF001);
305    pub const AG_UI_STEP_STARTED: Self = Self(0xF002);
306    pub const AG_UI_STEP_FINISHED: Self = Self(0xF003);
307    pub const AG_UI_RUN_FINISHED: Self = Self(0xF004);
308    pub const AG_UI_RUN_ERROR: Self = Self(0xF005);
309    pub const AG_UI_INTERRUPT: Self = Self(0xF010);
310    pub const AG_UI_STATE_SNAPSHOT: Self = Self(0xF020);
311    pub const AG_UI_STATE_DELTA: Self = Self(0xF021);
312
313    // Session events (0xF6xx range)
314    //
315    // Stateful LLM session lifecycle. Sessions allow sending only deltas
316    // (new messages) instead of full context on each tool loop round.
317    pub const SESSION_CREATED: Self = Self(0xF601);
318    pub const SESSION_ACTIVATED: Self = Self(0xF602);
319    pub const SESSION_DELTA: Self = Self(0xF603);
320    pub const SESSION_CLOSED: Self = Self(0xF604);
321    pub const SESSION_EXPIRED: Self = Self(0xF605);
322    pub const SESSION_FALLBACK: Self = Self(0xF606);
323
324    // Instance events (0xF7xx range)
325    //
326    // Multi-instance coordination. Tracks API server instances for
327    // affinity routing and failover.
328    pub const INSTANCE_REGISTERED: Self = Self(0xF701);
329    pub const INSTANCE_HEARTBEAT: Self = Self(0xF702);
330    pub const INSTANCE_DEREGISTERED: Self = Self(0xF703);
331    pub const INSTANCE_STALE: Self = Self(0xF704);
332
333    // A2UI component events (0x00xx system subcategory)
334    //
335    // Incremental UI updates via JSON Patch for A2UI components.
336    pub const COMPONENT_PATCH: Self = Self(0x0020);
337    pub const COMPONENT_SNAPSHOT: Self = Self(0x0021);
338
339    // Security events (0xF9xx range)
340    pub const INVARIANT_VIOLATION: Self = Self(0xF901);
341    pub const PII_EGRESS_BLOCKED: Self = Self(0xF902);
342    pub const PII_REDACTION_APPLIED: Self = Self(0xF903);
343    pub const PII_VAULT_WRITE_FAILED: Self = Self(0xF904);
344
345    /// Get the category (upper 4 bits).
346    pub const fn category(&self) -> u8 {
347        (self.0 >> 12) as u8
348    }
349
350    /// Get the type within the category (lower 12 bits).
351    pub const fn type_id(&self) -> u16 {
352        self.0 & 0x0FFF
353    }
354
355    /// Create a custom event kind.
356    pub const fn custom(category: u8, type_id: u16) -> Self {
357        Self(((category as u16) << 12) | (type_id & 0x0FFF))
358    }
359
360    /// Check if this is a system event.
361    pub const fn is_system(&self) -> bool {
362        self.category() == 0
363    }
364
365    /// Check if this is an effect event.
366    pub const fn is_effect(&self) -> bool {
367        self.category() == 0xD
368    }
369
370    /// Check if this event kind matches a known constant.
371    ///
372    /// Returns `false` for undefined kinds (e.g. 0x0FFF). This prevents
373    /// accidental use of unregistered event types in production.
374    pub fn is_defined(&self) -> bool {
375        matches!(
376            *self,
377            Self::DATA
378                | Self::SYSTEM_INIT
379                | Self::SYSTEM_SHUTDOWN
380                | Self::SYSTEM_HEARTBEAT
381                | Self::SYSTEM_CONFIG_CHANGE
382                | Self::TRAJECTORY_CREATED
383                | Self::TRAJECTORY_UPDATED
384                | Self::TRAJECTORY_COMPLETED
385                | Self::TRAJECTORY_FAILED
386                | Self::TRAJECTORY_SUSPENDED
387                | Self::TRAJECTORY_RESUMED
388                | Self::TRAJECTORY_DELETED
389                | Self::SCOPE_CREATED
390                | Self::SCOPE_UPDATED
391                | Self::SCOPE_CLOSED
392                | Self::SCOPE_CHECKPOINTED
393                | Self::SCOPE_CONTEXT_PAGED
394                | Self::ARTIFACT_CREATED
395                | Self::ARTIFACT_UPDATED
396                | Self::ARTIFACT_SUPERSEDED
397                | Self::ARTIFACT_DELETED
398                | Self::NOTE_CREATED
399                | Self::NOTE_UPDATED
400                | Self::NOTE_SUPERSEDED
401                | Self::NOTE_DELETED
402                | Self::NOTE_ACCESSED
403                | Self::TURN_CREATED
404                | Self::AGENT_REGISTERED
405                | Self::AGENT_UPDATED
406                | Self::AGENT_UNREGISTERED
407                | Self::AGENT_STATUS_CHANGED
408                | Self::GOAL_CREATED
409                | Self::GOAL_ACTIVATED
410                | Self::GOAL_ACHIEVED
411                | Self::GOAL_FAILED
412                | Self::PLAN_CREATED
413                | Self::PLAN_STARTED
414                | Self::PLAN_COMPLETED
415                | Self::PLAN_FAILED
416                | Self::STEP_COMPLETED
417                | Self::STEP_FAILED
418                | Self::BELIEF_CREATED
419                | Self::BELIEF_SUPERSEDED
420                | Self::DELIBERATION_COMPLETED
421                | Self::ENGINE_STATE_PERSISTED
422                | Self::SYNC_PULSE_EMITTED
423                | Self::SYNC_PULSE_RECONCILED
424                | Self::LOCK_ACQUIRED
425                | Self::LOCK_EXTENDED
426                | Self::LOCK_RELEASED
427                | Self::LOCK_EXPIRED
428                | Self::LOCK_CONTENTION
429                | Self::MESSAGE_SENT
430                | Self::MESSAGE_DELIVERED
431                | Self::MESSAGE_ACKNOWLEDGED
432                | Self::MESSAGE_EXPIRED
433                | Self::DELEGATION_CREATED
434                | Self::DELEGATION_ACCEPTED
435                | Self::DELEGATION_REJECTED
436                | Self::DELEGATION_STARTED
437                | Self::DELEGATION_COMPLETED
438                | Self::DELEGATION_FAILED
439                | Self::HANDOFF_CREATED
440                | Self::HANDOFF_ACCEPTED
441                | Self::HANDOFF_REJECTED
442                | Self::HANDOFF_COMPLETED
443                | Self::EDGE_CREATED
444                | Self::EDGE_UPDATED
445                | Self::EDGE_DELETED
446                | Self::EVOLUTION_SNAPSHOT_CREATED
447                | Self::EVOLUTION_PHASE_CHANGED
448                | Self::EFFECT_ERROR
449                | Self::EFFECT_RETRY
450                | Self::EFFECT_COMPENSATE
451                | Self::EFFECT_ACK
452                | Self::EFFECT_BACKPRESSURE
453                | Self::EFFECT_CANCEL
454                | Self::EFFECT_CONFLICT
455                | Self::CACHE_INVALIDATE_TRAJECTORY
456                | Self::CACHE_INVALIDATE_SCOPE
457                | Self::CACHE_INVALIDATE_ARTIFACT
458                | Self::CACHE_INVALIDATE_NOTE
459                | Self::WORKING_SET_UPDATED
460                | Self::WORKING_SET_DELETED
461                | Self::BASH_EXECUTION_STARTED
462                | Self::BASH_EXECUTION_COMPLETED
463                | Self::BASH_FILE_WRITTEN
464                | Self::BROWSER_ACTION_COMPLETED
465                | Self::WEB_MCP_TOOL_EXECUTED
466                | Self::WEB_MCP_DISCOVERY_SNAPSHOT
467                | Self::REASONING_TRACE
468                | Self::AG_UI_RUN_STARTED
469                | Self::AG_UI_STEP_STARTED
470                | Self::AG_UI_STEP_FINISHED
471                | Self::AG_UI_RUN_FINISHED
472                | Self::AG_UI_RUN_ERROR
473                | Self::AG_UI_INTERRUPT
474                | Self::AG_UI_STATE_SNAPSHOT
475                | Self::AG_UI_STATE_DELTA
476                | Self::INVARIANT_VIOLATION
477                | Self::PII_EGRESS_BLOCKED
478                | Self::PII_REDACTION_APPLIED
479                | Self::PII_VAULT_WRITE_FAILED
480                | Self::SESSION_CREATED
481                | Self::SESSION_ACTIVATED
482                | Self::SESSION_DELTA
483                | Self::SESSION_CLOSED
484                | Self::SESSION_EXPIRED
485                | Self::SESSION_FALLBACK
486                | Self::INSTANCE_REGISTERED
487                | Self::INSTANCE_HEARTBEAT
488                | Self::INSTANCE_DEREGISTERED
489                | Self::INSTANCE_STALE
490                | Self::COMPONENT_PATCH
491                | Self::COMPONENT_SNAPSHOT
492        )
493    }
494}
495
496/// Validate that an event kind is a recognized, defined variant.
497///
498/// This standalone function is intended to be called before storage operations
499/// to reject events with undefined (unregistered) kinds at the boundary,
500/// preventing garbage data from entering the DAG.
501pub fn validate_event_kind(kind: EventKind) -> Result<(), &'static str> {
502    if !kind.is_defined() {
503        return Err("undefined event kind");
504    }
505    Ok(())
506}
507
508impl fmt::Display for EventKind {
509    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
510        write!(f, "{:#06X}", self.0)
511    }
512}
513
514// ============================================================================
515// EVENT FLAGS
516// ============================================================================
517
518bitflags! {
519    /// Flags for event processing hints.
520    #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
521    pub struct EventFlags: u8 {
522        /// Event requires acknowledgment
523        const REQUIRES_ACK = 0b0000_0001;
524        /// Event is part of a transaction
525        const TRANSACTIONAL = 0b0000_0010;
526        /// Event payload is compressed
527        const COMPRESSED = 0b0000_0100;
528        /// Event is a replay (not original)
529        const REPLAY = 0b0000_1000;
530        /// Event has been acknowledged
531        const ACKNOWLEDGED = 0b0001_0000;
532        /// Event triggered compensation
533        const COMPENSATED = 0b0010_0000;
534        /// Event is critical (must be processed)
535        const CRITICAL = 0b0100_0000;
536        /// Event payload had PII scrubbed (DAG queries can filter)
537        const PII_SCRUBBED = 0b1000_0000;
538    }
539}
540
541impl Default for EventFlags {
542    fn default() -> Self {
543        Self::empty()
544    }
545}
546
547// Manual serde implementation for EventFlags (bitflags 2.x + serde)
548impl Serialize for EventFlags {
549    fn serialize<S: serde::Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
550        self.bits().serialize(serializer)
551    }
552}
553
554impl<'de> Deserialize<'de> for EventFlags {
555    fn deserialize<D: serde::Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
556        let bits = u8::deserialize(deserializer)?;
557        Self::from_bits(bits).ok_or_else(|| {
558            serde::de::Error::custom(format!("invalid EventFlags bits: {:#04x}", bits))
559        })
560    }
561}
562
563#[cfg(feature = "openapi")]
564impl utoipa::ToSchema for EventFlags {
565    fn name() -> std::borrow::Cow<'static, str> {
566        std::borrow::Cow::Borrowed("EventFlags")
567    }
568}
569
570#[cfg(feature = "openapi")]
571impl utoipa::PartialSchema for EventFlags {
572    fn schema() -> utoipa::openapi::RefOr<utoipa::openapi::schema::Schema> {
573        utoipa::openapi::ObjectBuilder::new()
574            .schema_type(utoipa::openapi::schema::SchemaType::Type(
575                utoipa::openapi::schema::Type::Integer,
576            ))
577            .description(Some("Event processing flags as a u8 bitfield (0-255)"))
578            .minimum(Some(0.0))
579            .maximum(Some(255.0))
580            .into()
581    }
582}
583
584// ============================================================================
585// EVENT HEADER (64-byte cache-aligned)
586// ============================================================================
587
588/// Event header with all metadata needed for processing.
589///
590/// This structure is 64 bytes and cache-line aligned for optimal performance.
591/// The payload is stored separately and referenced by the header.
592///
593/// Layout (64 bytes total, ordered by alignment to minimize padding):
594/// - event_id: 16 bytes (UUIDv7)
595/// - correlation_id: 16 bytes (UUIDv7)
596/// - timestamp: 8 bytes (microseconds since epoch)
597/// - position: 12 bytes (DagPosition)
598/// - payload_size: 4 bytes (u32)
599/// - event_kind: 2 bytes (EventKind)
600/// - flags: 1 byte (EventFlags)
601/// - _reserved: 5 bytes (padding to 64)
602#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
603#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
604#[repr(C, align(64))]
605pub struct EventHeader {
606    /// Unique event identifier (UUIDv7 for timestamp-sortable IDs)
607    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
608    pub event_id: EventId,
609    /// Correlation ID for tracing related events
610    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
611    pub correlation_id: EventId,
612    /// Timestamp in microseconds since Unix epoch
613    pub timestamp: i64,
614    /// Position in the event DAG
615    pub position: DagPosition,
616    /// Size of the payload in bytes
617    pub payload_size: u32,
618    /// Event kind (category + type)
619    pub event_kind: EventKind,
620    /// Processing flags
621    pub flags: EventFlags,
622    /// Random seed captured at emit time for deterministic replay
623    pub random_seed: Option<u64>,
624    /// Optional causality context for distributed tracing.
625    ///
626    /// When present, links this event to an OpenTelemetry trace via W3C
627    /// Trace Context. Populated by the `CausalitySpanLinker` in the
628    /// telemetry layer; omitted from serialization when absent.
629    #[serde(default, skip_serializing_if = "Option::is_none")]
630    pub causality: Option<Causality>,
631    /// Reserved for future use (padding to alignment)
632    #[serde(skip)]
633    _reserved: [u8; 5],
634}
635
636// Compile-time check that EventHeader alignment
637// Note: Size increased from 64 to accommodate random_seed field
638const _: () = assert!(std::mem::align_of::<EventHeader>() == 64);
639
640impl EventHeader {
641    /// Create a new event header.
642    #[allow(clippy::too_many_arguments)]
643    pub fn new(
644        event_id: EventId,
645        correlation_id: EventId,
646        timestamp: i64,
647        position: DagPosition,
648        payload_size: u32,
649        event_kind: EventKind,
650        flags: EventFlags,
651        random_seed: Option<u64>,
652    ) -> Self {
653        Self {
654            event_id,
655            correlation_id,
656            timestamp,
657            position,
658            payload_size,
659            event_kind,
660            flags,
661            random_seed,
662            causality: None,
663            _reserved: [0; 5],
664        }
665    }
666
667    /// Attach a causality context to this header for distributed tracing.
668    pub fn with_causality(mut self, causality: Causality) -> Self {
669        self.causality = Some(causality);
670        self
671    }
672
673    /// Check if this event requires acknowledgment.
674    pub fn requires_ack(&self) -> bool {
675        self.flags.contains(EventFlags::REQUIRES_ACK)
676    }
677
678    /// Check if this event is part of a transaction.
679    pub fn is_transactional(&self) -> bool {
680        self.flags.contains(EventFlags::TRANSACTIONAL)
681    }
682
683    /// Check if this event is a replay.
684    pub fn is_replay(&self) -> bool {
685        self.flags.contains(EventFlags::REPLAY)
686    }
687
688    /// Check if this event has been acknowledged.
689    pub fn is_acknowledged(&self) -> bool {
690        self.flags.contains(EventFlags::ACKNOWLEDGED)
691    }
692
693    /// Check if this event is critical.
694    pub fn is_critical(&self) -> bool {
695        self.flags.contains(EventFlags::CRITICAL)
696    }
697
698    /// Mark this event as acknowledged.
699    pub fn acknowledge(&mut self) {
700        self.flags |= EventFlags::ACKNOWLEDGED;
701    }
702
703    /// Get the event age in microseconds from a reference time.
704    pub fn age_micros(&self, now_micros: i64) -> i64 {
705        now_micros - self.timestamp
706    }
707}
708
709// ============================================================================
710// FULL EVENT (Header + Payload)
711// ============================================================================
712
713/// A complete event with header and payload.
714///
715/// The payload is generic to allow different event types to have different
716/// payload structures while sharing the same header format.
717#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
718#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
719pub struct Event<P> {
720    pub header: EventHeader,
721    pub payload: P,
722    /// Hash chain for tamper-evident audit trail (Blake3 hash of parent + self).
723    /// None for genesis events.
724    #[serde(skip_serializing_if = "Option::is_none")]
725    pub hash_chain: Option<HashChain>,
726}
727
728impl<P> Event<P> {
729    /// Create a new event with the given header and payload (genesis event, no hash chain).
730    pub fn new(header: EventHeader, payload: P) -> Self {
731        Self {
732            header,
733            payload,
734            hash_chain: None,
735        }
736    }
737
738    /// Create a new event with the given header, payload, and hash chain.
739    pub fn with_hash_chain(header: EventHeader, payload: P, hash_chain: HashChain) -> Self {
740        Self {
741            header,
742            payload,
743            hash_chain: Some(hash_chain),
744        }
745    }
746
747    /// Get the event ID.
748    pub fn event_id(&self) -> EventId {
749        self.header.event_id
750    }
751
752    /// Get the event kind.
753    pub fn event_kind(&self) -> EventKind {
754        self.header.event_kind
755    }
756
757    /// Get the correlation ID.
758    pub fn correlation_id(&self) -> EventId {
759        self.header.correlation_id
760    }
761
762    /// Get the DAG position.
763    pub fn position(&self) -> DagPosition {
764        self.header.position
765    }
766
767    /// Map the payload to a different type.
768    pub fn map_payload<Q, F: FnOnce(P) -> Q>(self, f: F) -> Event<Q> {
769        Event {
770            header: self.header,
771            payload: f(self.payload),
772            hash_chain: self.hash_chain,
773        }
774    }
775}
776
777/// Borrows header + payload for content-only hashing.
778///
779/// The hash_chain field is intentionally excluded: `event_hash` is the hash
780/// of the event *content*, so including it would create a circular dependency
781/// (the hash would need to contain itself).
782#[derive(Serialize)]
783struct EventContentRef<'a, P: Serialize> {
784    header: &'a EventHeader,
785    payload: &'a P,
786}
787
788impl<P: Serialize> Event<P> {
789    /// Compute Blake3 hash of this event's content (header + payload).
790    ///
791    /// Hashes only header and payload — the hash_chain field is excluded to
792    /// avoid a circular dependency (event_hash cannot include itself).
793    pub fn compute_hash(&self) -> [u8; 32] {
794        let content = EventContentRef {
795            header: &self.header,
796            payload: &self.payload,
797        };
798        let canonical = serde_json::to_vec(&content).unwrap_or_default();
799        blake3::hash(&canonical).into()
800    }
801
802    /// Verify this event against a parent hash using Blake3.
803    /// Returns true for genesis events (no hash chain).
804    pub fn verify(&self, parent_hash: &[u8; 32]) -> bool {
805        Blake3Verifier.verify_chain(self, parent_hash)
806    }
807
808    /// Check if this is a genesis event (no parent).
809    pub fn is_genesis(&self) -> bool {
810        self.hash_chain.is_none()
811    }
812}
813
814// ============================================================================
815// UPSTREAM SIGNALS
816// ============================================================================
817
818/// Signals that can be sent upstream in the DAG ("tram car tracks").
819///
820/// These signals flow in the opposite direction to events, allowing
821/// downstream processors to communicate back to upstream producers.
822#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
823#[serde(rename_all = "snake_case")]
824#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
825pub enum UpstreamSignal {
826    /// Acknowledge receipt/processing of an event
827    Ack {
828        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
829        event_id: EventId,
830    },
831    /// Request backpressure (slow down event production)
832    Backpressure {
833        /// Resume timestamp in microseconds
834        until: i64,
835    },
836    /// Cancel a correlation chain
837    Cancel {
838        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
839        correlation_id: EventId,
840        reason: String,
841    },
842    /// Signal that compensation is complete
843    CompensationComplete {
844        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
845        event_id: EventId,
846    },
847    /// Propagate an error upstream
848    ErrorPropagation {
849        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
850        source_event_id: EventId,
851        error_code: String,
852        message: String,
853    },
854    /// Conflict detected between two concurrent branches in the DAG.
855    /// This is a coordination signal, not a failure — the handler decides resolution.
856    ConflictDetected {
857        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
858        branch_a: EventId,
859        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
860        branch_b: EventId,
861    },
862}
863
864// ============================================================================
865// HASH CHAINING FOR AUDIT INTEGRITY (Phase 1.1)
866// ============================================================================
867
868/// Hash algorithm for event integrity.
869#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
870#[serde(rename_all = "snake_case")]
871#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
872pub enum HashAlgorithm {
873    /// SHA-256 hash algorithm
874    Sha256,
875    /// Blake3 hash algorithm (faster, recommended)
876    #[default]
877    Blake3,
878}
879
880/// Hash chain for tamper-evident event log.
881///
882/// Each event in the chain contains a hash of the previous event,
883/// creating an immutable, verifiable audit trail.
884#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
885#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
886pub struct HashChain {
887    /// Hash of previous event (creates chain). Genesis event has zero prev_hash.
888    #[cfg_attr(feature = "openapi", schema(value_type = String))]
889    pub prev_hash: [u8; 32],
890    /// Hash of this event (canonical serialization)
891    #[cfg_attr(feature = "openapi", schema(value_type = String))]
892    pub event_hash: [u8; 32],
893    /// Algorithm used for hashing
894    pub algorithm: HashAlgorithm,
895}
896
897impl Default for HashChain {
898    fn default() -> Self {
899        Self {
900            prev_hash: [0u8; 32], // Genesis event has zero prev_hash
901            event_hash: [0u8; 32],
902            algorithm: HashAlgorithm::Blake3,
903        }
904    }
905}
906
907// ============================================================================
908// CAUSALITY TRACKING (Phase 1.2)
909// ============================================================================
910
911/// W3C Trace Context compatible distributed tracing.
912///
913/// Provides full causality tracking for events across distributed systems.
914/// Compatible with OpenTelemetry and other distributed tracing systems.
915#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
916#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
917pub struct Causality {
918    /// Stable trace ID across request lifecycle (W3C compatible)
919    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
920    pub trace_id: Uuid,
921    /// Current operation span ID
922    pub span_id: u64,
923    /// Parent span (if not root)
924    pub parent_span_id: Option<u64>,
925    /// Parent event IDs for causality fan-in (multiple parents)
926    #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
927    pub parent_event_ids: Vec<EventId>,
928    /// Root event of this trace tree
929    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
930    pub root_event_id: EventId,
931}
932
933impl Default for Causality {
934    fn default() -> Self {
935        let root_id = EventId::now_v7();
936        Self {
937            trace_id: Uuid::now_v7(),
938            span_id: 0,
939            parent_span_id: None,
940            parent_event_ids: Vec::new(),
941            root_event_id: root_id,
942        }
943    }
944}
945
946impl Causality {
947    /// Create a new causality context with a fresh trace.
948    pub fn new() -> Self {
949        Self::default()
950    }
951
952    /// Create a child span from this causality context.
953    pub fn child(&self, new_span_id: u64) -> Self {
954        Self {
955            trace_id: self.trace_id,
956            span_id: new_span_id,
957            parent_span_id: Some(self.span_id),
958            parent_event_ids: vec![self.root_event_id],
959            root_event_id: self.root_event_id,
960        }
961    }
962
963    /// Create a causality context with multiple parents (fan-in).
964    pub fn merge(parents: &[&Causality], new_span_id: u64) -> Option<Self> {
965        let first = parents.first()?;
966        Some(Self {
967            trace_id: first.trace_id,
968            span_id: new_span_id,
969            parent_span_id: Some(first.span_id),
970            parent_event_ids: parents.iter().map(|p| p.root_event_id).collect(),
971            root_event_id: first.root_event_id,
972        })
973    }
974}
975
976// ============================================================================
977// RICH EVIDENCE REFERENCES (Phase 1.3)
978// ============================================================================
979
980use crate::{AgentId, ArtifactId, ExtractionMethod, NoteId, Timestamp};
981
982/// Evidence reference types for provenance tracking.
983///
984/// Rich references to external sources that support claims or data in an event.
985#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
986#[serde(rename_all = "snake_case")]
987#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
988pub enum EvidenceRef {
989    /// Reference to a document chunk
990    DocChunk {
991        doc_id: String,
992        chunk_id: String,
993        offset: u32,
994        length: u32,
995    },
996    /// Reference to another event
997    Event {
998        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
999        event_id: EventId,
1000        timestamp: i64,
1001    },
1002    /// Reference to a tool call result
1003    ToolResult {
1004        call_id: String,
1005        tool_name: String,
1006        result_index: u32,
1007    },
1008    /// Reference to external URL
1009    Url {
1010        url: String,
1011        accessed_at: i64,
1012        #[cfg_attr(feature = "openapi", schema(value_type = Option<String>))]
1013        hash: Option<[u8; 32]>,
1014    },
1015    /// Reference to knowledge pack section
1016    KnowledgePack { pack_id: String, section: String },
1017    /// Reference to artifact
1018    Artifact { artifact_id: ArtifactId },
1019    /// Reference to note
1020    Note { note_id: NoteId },
1021    /// Manual user-provided evidence
1022    Manual {
1023        user_id: String,
1024        timestamp: i64,
1025        description: String,
1026    },
1027}
1028
1029/// Verification status for evidence.
1030#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
1031#[serde(rename_all = "snake_case")]
1032#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
1033pub enum VerificationStatus {
1034    /// Evidence has not been verified
1035    #[default]
1036    Unverified,
1037    /// Evidence has been verified
1038    Verified,
1039    /// Evidence has been partially verified
1040    PartiallyVerified,
1041    /// Evidence verification failed
1042    Invalid,
1043    /// Evidence has expired
1044    Expired,
1045}
1046
1047/// Enhanced provenance with evidence chains.
1048///
1049/// Extends basic provenance with rich evidence references, chain of custody,
1050/// and verification status for robust audit trails.
1051#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
1052#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
1053pub struct EnhancedProvenance {
1054    /// Source turn number
1055    pub source_turn: i32,
1056    /// How this data was extracted
1057    pub extraction_method: ExtractionMethod,
1058    /// Confidence score (0.0 to 1.0)
1059    pub confidence: Option<f32>,
1060    /// Rich evidence references
1061    pub evidence_refs: Vec<EvidenceRef>,
1062    /// Chain of custody (agent trail) - tuples of (AgentId, Timestamp)
1063    #[cfg_attr(feature = "openapi", schema(value_type = Vec<serde_json::Value>))]
1064    pub chain_of_custody: Vec<(AgentId, Timestamp)>,
1065    /// Verification status
1066    pub verification_status: VerificationStatus,
1067}
1068
1069impl Default for EnhancedProvenance {
1070    fn default() -> Self {
1071        Self {
1072            source_turn: 0,
1073            extraction_method: ExtractionMethod::Unknown,
1074            confidence: None,
1075            evidence_refs: Vec::new(),
1076            chain_of_custody: Vec::new(),
1077            verification_status: VerificationStatus::Unverified,
1078        }
1079    }
1080}
1081
1082impl EnhancedProvenance {
1083    /// Create a new enhanced provenance.
1084    pub fn new(source_turn: i32, extraction_method: ExtractionMethod) -> Self {
1085        Self {
1086            source_turn,
1087            extraction_method,
1088            ..Default::default()
1089        }
1090    }
1091
1092    /// Add an evidence reference.
1093    pub fn with_evidence(mut self, evidence: EvidenceRef) -> Self {
1094        self.evidence_refs.push(evidence);
1095        self
1096    }
1097
1098    /// Add a custody entry.
1099    pub fn with_custody(mut self, agent_id: AgentId, timestamp: Timestamp) -> Self {
1100        self.chain_of_custody.push((agent_id, timestamp));
1101        self
1102    }
1103
1104    /// Set confidence score.
1105    pub fn with_confidence(mut self, confidence: f32) -> Self {
1106        self.confidence = Some(confidence.clamp(0.0, 1.0));
1107        self
1108    }
1109
1110    /// Set verification status.
1111    pub fn with_verification(mut self, status: VerificationStatus) -> Self {
1112        self.verification_status = status;
1113        self
1114    }
1115}
1116
1117// ============================================================================
1118// EVENT VERIFIER TRAIT (Phase 1.4)
1119// ============================================================================
1120
1121/// Trait for cryptographic event verification.
1122///
1123/// Implementations of this trait provide hash computation and verification
1124/// for events, enabling tamper-evident audit logs.
1125pub trait EventVerifier: Send + Sync {
1126    /// Compute hash for an event.
1127    fn compute_hash<P: Serialize>(&self, event: &Event<P>) -> [u8; 32];
1128
1129    /// Verify event integrity using its hash.
1130    fn verify_hash<P: Serialize>(&self, event: &Event<P>, expected: &[u8; 32]) -> bool {
1131        &self.compute_hash(event) == expected
1132    }
1133
1134    /// Verify hash chain (current depends on previous).
1135    fn verify_chain<P: Serialize>(&self, current: &Event<P>, previous_hash: &[u8; 32]) -> bool;
1136
1137    /// Get the algorithm used by this verifier.
1138    fn algorithm(&self) -> HashAlgorithm;
1139}
1140
1141/// Blake3-based event verifier (default, recommended for performance).
1142#[derive(Debug, Clone, Copy, Default)]
1143pub struct Blake3Verifier;
1144
1145impl EventVerifier for Blake3Verifier {
1146    fn compute_hash<P: Serialize>(&self, event: &Event<P>) -> [u8; 32] {
1147        let content = EventContentRef {
1148            header: &event.header,
1149            payload: &event.payload,
1150        };
1151        let canonical = serde_json::to_vec(&content).unwrap_or_default();
1152        blake3::hash(&canonical).into()
1153    }
1154
1155    fn verify_chain<P: Serialize>(&self, current: &Event<P>, previous_hash: &[u8; 32]) -> bool {
1156        let Some(hash_chain) = &current.hash_chain else {
1157            return true; // Genesis events are always valid
1158        };
1159
1160        // Reject events whose hash chain claims a different algorithm.
1161        if hash_chain.algorithm != HashAlgorithm::Blake3 {
1162            return false;
1163        }
1164
1165        if hash_chain.prev_hash != *previous_hash {
1166            return false;
1167        }
1168
1169        let content = EventContentRef {
1170            header: &current.header,
1171            payload: &current.payload,
1172        };
1173        let canonical = serde_json::to_vec(&content).unwrap_or_default();
1174        let computed_hash = blake3::hash(&canonical);
1175        computed_hash.as_bytes() == &hash_chain.event_hash
1176    }
1177
1178    fn algorithm(&self) -> HashAlgorithm {
1179        HashAlgorithm::Blake3
1180    }
1181}
1182
1183/// SHA-256-based event verifier (for compatibility).
1184#[derive(Debug, Clone, Copy, Default)]
1185pub struct Sha256Verifier;
1186
1187impl EventVerifier for Sha256Verifier {
1188    fn compute_hash<P: Serialize>(&self, event: &Event<P>) -> [u8; 32] {
1189        use sha2::{Digest, Sha256};
1190        let content = EventContentRef {
1191            header: &event.header,
1192            payload: &event.payload,
1193        };
1194        let canonical = serde_json::to_vec(&content).unwrap_or_default();
1195        let result = Sha256::digest(&canonical);
1196        let mut hash = [0u8; 32];
1197        hash.copy_from_slice(&result);
1198        hash
1199    }
1200
1201    fn verify_chain<P: Serialize>(&self, current: &Event<P>, previous_hash: &[u8; 32]) -> bool {
1202        let Some(hash_chain) = &current.hash_chain else {
1203            return true; // Genesis events are always valid
1204        };
1205
1206        if hash_chain.prev_hash != *previous_hash {
1207            return false;
1208        }
1209
1210        use sha2::{Digest, Sha256};
1211        let content = EventContentRef {
1212            header: &current.header,
1213            payload: &current.payload,
1214        };
1215        let canonical = serde_json::to_vec(&content).unwrap_or_default();
1216        let computed = Sha256::digest(&canonical);
1217        computed[..] == hash_chain.event_hash
1218    }
1219
1220    fn algorithm(&self) -> HashAlgorithm {
1221        HashAlgorithm::Sha256
1222    }
1223}
1224
1225// ============================================================================
1226// EVENT DAG TRAIT (Async for PG async I/O + LMDB hot path)
1227// ============================================================================
1228
1229use crate::Effect;
1230
1231/// Trait for Event DAG operations.
1232///
1233/// Implementations of this trait provide persistent storage and traversal
1234/// of the event graph. The DAG supports:
1235///
1236/// - Appending new events with automatic position calculation
1237/// - Reading events by ID
1238/// - Walking ancestor chains for context reconstruction
1239/// - Sending upstream signals for coordination
1240/// - Correlation chain traversal for related event discovery
1241///
1242/// # Async Design
1243///
1244/// All methods are async to support:
1245/// - LMDB hot cache (sync but fast - microseconds, safe in async context)
1246/// - PostgreSQL fallback (truly async - milliseconds, non-blocking)
1247///
1248/// # Payload Type
1249///
1250/// The `Payload` associated type determines what data is stored with each event.
1251/// This is typically a serializable enum of all possible event payloads.
1252#[async_trait::async_trait]
1253pub trait EventDag: Send + Sync {
1254    /// The payload type stored with events.
1255    type Payload: Clone + Send + Sync + 'static;
1256
1257    /// Append a new event to the DAG.
1258    ///
1259    /// The event's position should be set by the caller based on the parent event.
1260    /// Returns the assigned event ID on success.
1261    async fn append(&self, event: Event<Self::Payload>) -> Effect<EventId>;
1262
1263    /// Read an event by its ID.
1264    async fn read(&self, event_id: EventId) -> Effect<Event<Self::Payload>>;
1265
1266    /// Walk the ancestor chain from a given event.
1267    ///
1268    /// Returns events from `from` toward the root, limited by `limit`.
1269    /// The events are returned in order from most recent to oldest.
1270    async fn walk_ancestors(
1271        &self,
1272        from: EventId,
1273        limit: usize,
1274    ) -> Effect<Vec<Event<Self::Payload>>>;
1275
1276    /// Walk descendants from a given event.
1277    async fn walk_descendants(
1278        &self,
1279        from: EventId,
1280        limit: usize,
1281    ) -> Effect<Vec<Event<Self::Payload>>>;
1282
1283    /// Send an upstream signal from a downstream event.
1284    ///
1285    /// Signals propagate backward through the DAG to notify upstream
1286    /// producers of acknowledgments, backpressure, or errors.
1287    async fn signal_upstream(&self, from: EventId, signal: UpstreamSignal) -> Effect<()>;
1288
1289    /// Find all events in a correlation chain.
1290    async fn find_correlation_chain(
1291        &self,
1292        tenant_id: uuid::Uuid,
1293        correlation_id: EventId,
1294    ) -> Effect<Vec<Event<Self::Payload>>>;
1295
1296    /// Get the current position for appending a new event.
1297    async fn next_position(&self, parent: Option<EventId>, lane: u32) -> Effect<DagPosition>;
1298
1299    /// Get events by kind within a position range.
1300    async fn find_by_kind(
1301        &self,
1302        kind: EventKind,
1303        min_depth: u32,
1304        max_depth: u32,
1305        limit: usize,
1306    ) -> Effect<Vec<Event<Self::Payload>>>;
1307
1308    /// Get events by kind within a position range for a tenant.
1309    ///
1310    /// Default behavior falls back to `find_by_kind` for implementations that
1311    /// do not support tenant-scoped querying at the storage boundary.
1312    async fn find_by_kind_for_tenant(
1313        &self,
1314        tenant_id: uuid::Uuid,
1315        kind: EventKind,
1316        min_depth: u32,
1317        max_depth: u32,
1318        limit: usize,
1319    ) -> Effect<Vec<Event<Self::Payload>>> {
1320        let _ = tenant_id;
1321        self.find_by_kind(kind, min_depth, max_depth, limit).await
1322    }
1323
1324    /// Acknowledge an event.
1325    async fn acknowledge(&self, event_id: EventId, send_upstream: bool) -> Effect<()>;
1326
1327    /// Get unacknowledged events that require acknowledgment.
1328    async fn unacknowledged(&self, limit: usize) -> Effect<Vec<Event<Self::Payload>>>;
1329}
1330
1331/// Extension trait for EventDag with convenience methods.
1332#[async_trait::async_trait]
1333pub trait EventDagExt: EventDag {
1334    /// Append a new root event.
1335    async fn append_root(&self, payload: Self::Payload) -> Effect<EventId> {
1336        let position = match self.next_position(None, 0).await {
1337            Effect::Ok(pos) => pos,
1338            Effect::Err(e) => return Effect::Err(e),
1339            other => return other.map(|_| unreachable!()),
1340        };
1341
1342        let event_id = EventId::now_v7();
1343        let event = Event {
1344            header: EventHeader::new(
1345                event_id,
1346                event_id,
1347                chrono::Utc::now().timestamp_micros(),
1348                position,
1349                0,
1350                EventKind::DATA,
1351                EventFlags::empty(),
1352                None,
1353            ),
1354            payload,
1355            hash_chain: None,
1356        };
1357        self.append(event).await
1358    }
1359
1360    /// Append a child event to an existing event.
1361    async fn append_child(&self, parent: EventId, payload: Self::Payload) -> Effect<EventId> {
1362        let parent_event = match self.read(parent).await {
1363            Effect::Ok(e) => e,
1364            Effect::Err(e) => return Effect::Err(e),
1365            other => return other.map(|_| unreachable!()),
1366        };
1367
1368        let position = match self
1369            .next_position(Some(parent), parent_event.header.position.lane)
1370            .await
1371        {
1372            Effect::Ok(pos) => pos,
1373            Effect::Err(e) => return Effect::Err(e),
1374            other => return other.map(|_| unreachable!()),
1375        };
1376
1377        let event_id = EventId::now_v7();
1378        let event = Event {
1379            header: EventHeader::new(
1380                event_id,
1381                parent_event.header.correlation_id,
1382                chrono::Utc::now().timestamp_micros(),
1383                position,
1384                0,
1385                EventKind::DATA,
1386                EventFlags::empty(),
1387                None,
1388            ),
1389            payload,
1390            hash_chain: None,
1391        };
1392        self.append(event).await
1393    }
1394
1395    /// Fork a new lane from an existing event.
1396    async fn fork(
1397        &self,
1398        parent: EventId,
1399        new_lane: u32,
1400        payload: Self::Payload,
1401    ) -> Effect<EventId> {
1402        let parent_event = match self.read(parent).await {
1403            Effect::Ok(e) => e,
1404            Effect::Err(e) => return Effect::Err(e),
1405            other => return other.map(|_| unreachable!()),
1406        };
1407
1408        let position = match self.next_position(Some(parent), new_lane).await {
1409            Effect::Ok(pos) => pos,
1410            Effect::Err(e) => return Effect::Err(e),
1411            other => return other.map(|_| unreachable!()),
1412        };
1413
1414        let event_id = EventId::now_v7();
1415        let event = Event {
1416            header: EventHeader::new(
1417                event_id,
1418                parent_event.header.correlation_id,
1419                chrono::Utc::now().timestamp_micros(),
1420                position,
1421                0,
1422                EventKind::DATA,
1423                EventFlags::empty(),
1424                None,
1425            ),
1426            payload,
1427            hash_chain: None,
1428        };
1429        self.append(event).await
1430    }
1431
1432    /// Get the depth of an event in the DAG.
1433    async fn depth(&self, event_id: EventId) -> Effect<u32> {
1434        match self.read(event_id).await {
1435            Effect::Ok(event) => Effect::Ok(event.header.position.depth),
1436            Effect::Err(e) => Effect::Err(e),
1437            other => other.map(|_| unreachable!()),
1438        }
1439    }
1440
1441    /// Check if one event is an ancestor of another.
1442    async fn is_ancestor(&self, ancestor: EventId, descendant: EventId) -> Effect<bool> {
1443        let ancestors = match self.walk_ancestors(descendant, 1000).await {
1444            Effect::Ok(events) => events,
1445            Effect::Err(e) => return Effect::Err(e),
1446            other => return other.map(|_| unreachable!()),
1447        };
1448
1449        for event in ancestors {
1450            if event.header.event_id == ancestor {
1451                return Effect::Ok(true);
1452            }
1453        }
1454        Effect::Ok(false)
1455    }
1456}
1457
1458// Blanket implementation: any type implementing EventDag automatically gets EventDagExt
1459impl<T: EventDag> EventDagExt for T {}
1460
1461/// Builder for creating events with proper positioning.
1462#[derive(Debug, Clone)]
1463pub struct EventBuilder<P> {
1464    parent: Option<EventId>,
1465    lane: u32,
1466    correlation_id: Option<EventId>,
1467    payload: P,
1468    flags: EventFlags,
1469}
1470
1471impl<P> EventBuilder<P> {
1472    /// Create a new event builder with the given payload.
1473    pub fn new(payload: P) -> Self {
1474        Self {
1475            parent: None,
1476            lane: 0,
1477            correlation_id: None,
1478            payload,
1479            flags: EventFlags::empty(),
1480        }
1481    }
1482
1483    /// Set the parent event.
1484    pub fn parent(mut self, parent: EventId) -> Self {
1485        self.parent = Some(parent);
1486        self
1487    }
1488
1489    /// Set the lane.
1490    pub fn lane(mut self, lane: u32) -> Self {
1491        self.lane = lane;
1492        self
1493    }
1494
1495    /// Set the correlation ID.
1496    pub fn correlation(mut self, correlation_id: EventId) -> Self {
1497        self.correlation_id = Some(correlation_id);
1498        self
1499    }
1500
1501    /// Mark this event as requiring acknowledgment.
1502    pub fn requires_ack(mut self) -> Self {
1503        self.flags |= EventFlags::REQUIRES_ACK;
1504        self
1505    }
1506
1507    /// Mark this event as critical.
1508    pub fn critical(mut self) -> Self {
1509        self.flags |= EventFlags::CRITICAL;
1510        self
1511    }
1512
1513    /// Mark this event as transactional.
1514    pub fn transactional(mut self) -> Self {
1515        self.flags |= EventFlags::TRANSACTIONAL;
1516        self
1517    }
1518
1519    /// Get the configured parent.
1520    pub fn get_parent(&self) -> Option<EventId> {
1521        self.parent
1522    }
1523
1524    /// Get the configured lane.
1525    pub fn get_lane(&self) -> u32 {
1526        self.lane
1527    }
1528
1529    /// Get the configured flags.
1530    pub fn get_flags(&self) -> EventFlags {
1531        self.flags
1532    }
1533
1534    /// Get the correlation ID (or None).
1535    pub fn get_correlation_id(&self) -> Option<EventId> {
1536        self.correlation_id
1537    }
1538
1539    /// Consume the builder and return the payload.
1540    pub fn into_payload(self) -> P {
1541        self.payload
1542    }
1543}
1544
1545#[cfg(test)]
1546mod tests {
1547    use super::*;
1548
1549    #[test]
1550    fn test_event_header_size() {
1551        // Note: Size increased from 64 bytes after adding random_seed field
1552        // Size is now larger but still cache-aligned
1553        assert_eq!(std::mem::align_of::<EventHeader>(), 64);
1554    }
1555
1556    #[test]
1557    fn test_dag_position() {
1558        let root = DagPosition::root();
1559        assert!(root.is_root());
1560        assert_eq!(root.depth, 0);
1561
1562        let child = root.child(1);
1563        assert!(!child.is_root());
1564        assert_eq!(child.depth, 1);
1565        assert!(root.is_ancestor_of(&child));
1566
1567        let fork = root.fork(1, 0);
1568        assert_eq!(fork.lane, 1);
1569        assert!(!root.is_ancestor_of(&fork)); // Different lane
1570    }
1571
1572    #[test]
1573    fn test_event_kind_categories() {
1574        assert!(EventKind::SYSTEM_INIT.is_system());
1575        assert!(!EventKind::TRAJECTORY_CREATED.is_system());
1576        assert!(EventKind::EFFECT_ERROR.is_effect());
1577
1578        let custom = EventKind::custom(0xF, 0x123);
1579        assert_eq!(custom.category(), 0xF);
1580        assert_eq!(custom.type_id(), 0x123);
1581    }
1582
1583    #[test]
1584    fn test_event_flags() {
1585        let mut flags = EventFlags::REQUIRES_ACK | EventFlags::CRITICAL;
1586        assert!(flags.contains(EventFlags::REQUIRES_ACK));
1587        assert!(flags.contains(EventFlags::CRITICAL));
1588        assert!(!flags.contains(EventFlags::REPLAY));
1589
1590        flags |= EventFlags::ACKNOWLEDGED;
1591        assert!(flags.contains(EventFlags::ACKNOWLEDGED));
1592    }
1593
1594    #[test]
1595    fn test_event_header_creation() {
1596        let event_id = EventId::now_v7();
1597        let correlation_id = EventId::now_v7();
1598        let header = EventHeader::new(
1599            event_id,
1600            correlation_id,
1601            1234567890,
1602            DagPosition::root(),
1603            100,
1604            EventKind::TRAJECTORY_CREATED,
1605            EventFlags::REQUIRES_ACK,
1606            None,
1607        );
1608
1609        assert_eq!(header.event_id, event_id);
1610        assert!(header.requires_ack());
1611        assert!(!header.is_acknowledged());
1612    }
1613
1614    #[test]
1615    fn test_hash_chain_genesis_event() {
1616        use serde_json::json;
1617
1618        let event = Event::new(
1619            EventHeader::new(
1620                EventId::now_v7(),
1621                EventId::now_v7(),
1622                chrono::Utc::now().timestamp_micros(),
1623                DagPosition::root(),
1624                0,
1625                EventKind::DATA,
1626                EventFlags::empty(),
1627                None,
1628            ),
1629            json!({"type": "genesis"}),
1630        );
1631
1632        assert!(event.is_genesis());
1633        assert!(event.verify(&[0u8; 32]));
1634    }
1635
1636    #[test]
1637    fn test_hash_chain_tamper_detection() {
1638        use serde_json::json;
1639
1640        let parent = Event::new(
1641            EventHeader::new(
1642                EventId::now_v7(),
1643                EventId::now_v7(),
1644                chrono::Utc::now().timestamp_micros(),
1645                DagPosition::root(),
1646                0,
1647                EventKind::DATA,
1648                EventFlags::empty(),
1649                None,
1650            ),
1651            json!({"data": "original"}),
1652        );
1653
1654        let parent_hash = parent.compute_hash();
1655        let wrong_hash = [0xFF; 32];
1656
1657        let child_hash = {
1658            let temp = Event::new(
1659                EventHeader::new(
1660                    EventId::now_v7(),
1661                    EventId::now_v7(),
1662                    chrono::Utc::now().timestamp_micros(),
1663                    DagPosition::new(1, 0, 0),
1664                    0,
1665                    EventKind::DATA,
1666                    EventFlags::empty(),
1667                    None,
1668                ),
1669                json!({"data": "child"}),
1670            );
1671            temp.compute_hash()
1672        };
1673
1674        let hash_chain = HashChain {
1675            prev_hash: wrong_hash, // Wrong!
1676            event_hash: child_hash,
1677            algorithm: HashAlgorithm::Blake3,
1678        };
1679
1680        let tampered = Event::with_hash_chain(
1681            EventHeader::new(
1682                EventId::now_v7(),
1683                EventId::now_v7(),
1684                chrono::Utc::now().timestamp_micros(),
1685                DagPosition::new(1, 0, 0),
1686                0,
1687                EventKind::DATA,
1688                EventFlags::empty(),
1689                None,
1690            ),
1691            json!({"data": "child"}),
1692            hash_chain,
1693        );
1694
1695        assert!(!tampered.verify(&parent_hash)); // Should fail
1696    }
1697
1698    #[test]
1699    fn upstream_signal_conflict_detected_roundtrip() {
1700        let signal = UpstreamSignal::ConflictDetected {
1701            branch_a: EventId::now_v7(),
1702            branch_b: EventId::now_v7(),
1703        };
1704        let json = serde_json::to_string(&signal).unwrap();
1705        let deserialized: UpstreamSignal = serde_json::from_str(&json).unwrap();
1706        assert!(matches!(
1707            deserialized,
1708            UpstreamSignal::ConflictDetected { .. }
1709        ));
1710    }
1711
1712    #[test]
1713    fn effect_conflict_event_kind() {
1714        assert!(EventKind::EFFECT_CONFLICT.is_effect());
1715        assert_eq!(EventKind::EFFECT_CONFLICT.0, 0xD007);
1716    }
1717
1718    #[test]
1719    fn is_defined_accepts_known_kinds() {
1720        assert!(EventKind::SCOPE_CREATED.is_defined());
1721        assert!(EventKind::AGENT_STATUS_CHANGED.is_defined());
1722        assert!(EventKind::EFFECT_ERROR.is_defined());
1723        assert!(EventKind::SCOPE_CONTEXT_PAGED.is_defined());
1724        assert!(EventKind::SYNC_PULSE_EMITTED.is_defined());
1725        assert!(EventKind::SYNC_PULSE_RECONCILED.is_defined());
1726    }
1727
1728    #[test]
1729    fn is_defined_rejects_unknown_kinds() {
1730        assert!(!EventKind(0x0FFF).is_defined());
1731        assert!(!EventKind(0xFFFF).is_defined());
1732        assert!(!EventKind(0x1FFF).is_defined());
1733    }
1734
1735    #[test]
1736    fn test_undefined_event_kind_fails_validation() {
1737        assert!(validate_event_kind(EventKind(0x9999)).is_err());
1738    }
1739
1740    #[test]
1741    fn test_defined_event_kinds_pass_validation() {
1742        assert!(validate_event_kind(EventKind::DATA).is_ok());
1743    }
1744}