cellstate_core/
effect.rs

1//! Effect system for error-as-effects pattern.
2//!
3//! This module implements the "errors as effects" pattern where domain errors
4//! are first-class events that can be persisted, replayed, and affect downstream
5//! handlers.
6//!
7//! # Usage Guidelines
8//!
9//! ## Effect at Boundaries, Result Internally
10//!
11//! Internal code should use `Result<T, E>` for normal error handling.
12//! `Effect<T>` should only be used at system boundaries:
13//! - Event handler outputs
14//! - Persistence layer responses
15//! - API response wrappers
16//!
17//! ```rust,ignore
18//! // Internal: use Result
19//! async fn fetch_notes(&self, id: ScopeId) -> Result<Vec<Note>, StorageError> {
20//!     let rows = self.db.query(...).await?;
21//!     Ok(rows)
22//! }
23//!
24//! // Boundary: wrap in Effect
25//! pub async fn handle_request(&self, req: Request) -> Effect<Response> {
26//!     match self.do_work(req).await {
27//!         Ok(resp) => Effect::Ok(resp),
28//!         Err(e) if e.is_transient() => Effect::Retry { ... },
29//!         Err(e) => Effect::Err(ErrorEffect::from(e)),
30//!     }
31//! }
32//! ```
33//!
34//! # Key Distinction
35//!
36//! - **Domain errors**: Persist, replay, affect downstream handlers
37//! - **Operational errors**: Telemetry only, can sample/discard
38//!
39//! Domain errors are part of the business logic and must be tracked.
40//! Operational errors are infrastructure concerns and can be handled separately.
41
42use crate::{DagPosition, EventId};
43use serde::{Deserialize, Serialize};
44use std::fmt;
45use std::time::Duration;
46use uuid::Uuid;
47
48// ============================================================================
49// EFFECT TYPE
50// ============================================================================
51
52/// An effect represents the outcome of an operation.
53///
54/// Effects are more expressive than simple `Result<T, E>` because they can
55/// represent retry conditions, compensation actions, and pending states.
56///
57/// # Variants
58///
59/// - `Ok(T)`: Successful result
60/// - `Err(ErrorEffect)`: Domain-level error (replayable, affects downstream)
61/// - `Retry`: Operation should be retried
62/// - `Compensate`: Compensation action is needed
63/// - `Pending`: Operation is waiting for something
64/// - `Batch`: Multiple effects (for fan-out scenarios)
65#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
68pub enum Effect<T> {
69    /// Successful result
70    Ok(T),
71    /// Domain-level error (must be persisted and handled)
72    Err(ErrorEffect),
73    /// Operation should be retried
74    Retry {
75        /// Duration to wait before retrying
76        #[serde(with = "duration_millis")]
77        after: Duration,
78        /// Current attempt number (1-indexed)
79        attempt: u32,
80        /// Maximum number of attempts
81        max_attempts: u32,
82        /// Reason for retry
83        reason: String,
84    },
85    /// Compensation action is needed
86    Compensate {
87        /// The action to take
88        action: CompensationAction,
89        /// The error that caused compensation
90        cause: Box<ErrorEffect>,
91    },
92    /// Operation is waiting for something
93    Pending {
94        /// What the operation is waiting for
95        waiting_for: WaitCondition,
96        /// Token to resume when condition is met
97        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
98        resume_token: EventId,
99        /// Optimistic progress hint for UI (NOT authoritative state)
100        #[cfg_attr(feature = "openapi", schema(value_type = Object))]
101        progress_hint: Option<serde_json::Value>,
102    },
103    /// Multiple effects (for fan-out scenarios)
104    Batch(Vec<Effect<T>>),
105}
106
107impl<T> Effect<T> {
108    /// Create a successful effect.
109    pub fn ok(value: T) -> Self {
110        Effect::Ok(value)
111    }
112
113    /// Create an error effect.
114    pub fn err(error: ErrorEffect) -> Self {
115        Effect::Err(error)
116    }
117
118    /// Create a domain error effect.
119    pub fn domain_error(error: DomainError, source_event: EventId, position: DagPosition) -> Self {
120        Effect::Err(ErrorEffect::Domain(Box::new(DomainErrorContext {
121            error,
122            source_event,
123            position,
124            correlation_id: source_event, // Default to same as source
125        })))
126    }
127
128    /// Create a retry effect.
129    pub fn retry(
130        after: Duration,
131        attempt: u32,
132        max_attempts: u32,
133        reason: impl Into<String>,
134    ) -> Self {
135        Effect::Retry {
136            after,
137            attempt,
138            max_attempts,
139            reason: reason.into(),
140        }
141    }
142
143    /// Create a pending effect.
144    pub fn pending(waiting_for: WaitCondition, resume_token: EventId) -> Self {
145        Effect::Pending {
146            waiting_for,
147            resume_token,
148            progress_hint: None,
149        }
150    }
151
152    /// Check if this is a successful effect.
153    pub fn is_ok(&self) -> bool {
154        matches!(self, Effect::Ok(_))
155    }
156
157    /// Check if this is an error effect.
158    pub fn is_err(&self) -> bool {
159        matches!(self, Effect::Err(_))
160    }
161
162    /// Check if this effect requires retry.
163    pub fn needs_retry(&self) -> bool {
164        matches!(self, Effect::Retry { .. })
165    }
166
167    /// Check if this effect is pending.
168    pub fn is_pending(&self) -> bool {
169        matches!(self, Effect::Pending { .. })
170    }
171
172    /// Convert to a Result, losing retry/compensation information.
173    pub fn into_result(self) -> Result<T, ErrorEffect> {
174        match self {
175            Effect::Ok(v) => Ok(v),
176            Effect::Err(e) => Err(e),
177            Effect::Retry { reason, .. } => {
178                Err(ErrorEffect::Operational(OperationalError::RetryExhausted {
179                    reason,
180                }))
181            }
182            Effect::Compensate { cause, .. } => Err(*cause),
183            Effect::Pending { waiting_for, .. } => {
184                Err(ErrorEffect::Operational(OperationalError::Timeout {
185                    operation: format!("Pending: {:?}", waiting_for),
186                }))
187            }
188            Effect::Batch(effects) => {
189                let mut last_ok = None;
190                for effect in effects {
191                    last_ok = Some(effect.into_result()?);
192                }
193                last_ok.ok_or_else(|| {
194                    ErrorEffect::Operational(OperationalError::Internal {
195                        message: "Empty batch".to_string(),
196                    })
197                })
198            }
199        }
200    }
201
202    /// Map the success value.
203    pub fn map<U, F: FnOnce(T) -> U + Clone>(self, f: F) -> Effect<U> {
204        match self {
205            Effect::Ok(v) => Effect::Ok(f(v)),
206            Effect::Err(e) => Effect::Err(e),
207            Effect::Retry {
208                after,
209                attempt,
210                max_attempts,
211                reason,
212            } => Effect::Retry {
213                after,
214                attempt,
215                max_attempts,
216                reason,
217            },
218            Effect::Compensate { action, cause } => Effect::Compensate { action, cause },
219            Effect::Pending {
220                waiting_for,
221                resume_token,
222                progress_hint,
223            } => Effect::Pending {
224                waiting_for,
225                resume_token,
226                progress_hint,
227            },
228            Effect::Batch(effects) => {
229                Effect::Batch(effects.into_iter().map(|e| e.map(f.clone())).collect())
230            }
231        }
232    }
233
234    /// Extract the success value, panicking if not Ok.
235    pub fn unwrap(self) -> T {
236        match self {
237            Effect::Ok(v) => v,
238            _ => panic!(
239                "Called unwrap on non-Ok effect: {:?}",
240                std::any::type_name::<Self>()
241            ),
242        }
243    }
244
245    /// Extract the success value, panicking with a custom message if not Ok.
246    pub fn expect(self, msg: &str) -> T {
247        match self {
248            Effect::Ok(v) => v,
249            _ => panic!("{}", msg),
250        }
251    }
252
253    /// Chain a function that returns an Effect on the success value.
254    ///
255    /// If `self` is `Ok(t)`, returns `f(t)`. Otherwise, returns the original
256    /// non-Ok effect unchanged.
257    pub fn and_then<U, F: FnOnce(T) -> Effect<U>>(self, f: F) -> Effect<U> {
258        match self {
259            Effect::Ok(v) => f(v),
260            Effect::Err(e) => Effect::Err(e),
261            Effect::Retry {
262                after,
263                attempt,
264                max_attempts,
265                reason,
266            } => Effect::Retry {
267                after,
268                attempt,
269                max_attempts,
270                reason,
271            },
272            Effect::Compensate { action, cause } => Effect::Compensate { action, cause },
273            Effect::Pending {
274                waiting_for,
275                resume_token,
276                progress_hint,
277            } => Effect::Pending {
278                waiting_for,
279                resume_token,
280                progress_hint,
281            },
282            Effect::Batch(_) => Effect::Err(ErrorEffect::Operational(OperationalError::Internal {
283                message: "Cannot and_then on Batch effect".to_string(),
284            })),
285        }
286    }
287
288    /// Map the error effect using a transformation function.
289    /// Recurses into `Batch` to transform errors within each inner effect.
290    pub fn map_err<F: FnOnce(ErrorEffect) -> ErrorEffect + Clone>(self, f: F) -> Self {
291        match self {
292            Effect::Err(e) => Effect::Err(f(e)),
293            Effect::Batch(effects) => {
294                Effect::Batch(effects.into_iter().map(|e| e.map_err(f.clone())).collect())
295            }
296            other => other,
297        }
298    }
299
300    /// Handle an error by applying a recovery function.
301    /// Recurses into `Batch` to recover errors within each inner effect.
302    pub fn or_else<F: FnOnce(ErrorEffect) -> Effect<T> + Clone>(self, f: F) -> Self {
303        match self {
304            Effect::Err(e) => f(e),
305            Effect::Batch(effects) => {
306                Effect::Batch(effects.into_iter().map(|e| e.or_else(f.clone())).collect())
307            }
308            other => other,
309        }
310    }
311
312    /// Extract the success value or return a default.
313    ///
314    /// If `self` is `Ok(t)`, returns `t`. Otherwise, returns `default`.
315    pub fn unwrap_or(self, default: T) -> T {
316        match self {
317            Effect::Ok(v) => v,
318            _ => default,
319        }
320    }
321
322    /// Extract the success value or compute a default from the error.
323    ///
324    /// If `self` is `Ok(t)`, returns `t`. Otherwise, returns `f(e)` where
325    /// `e` is the error effect (or a synthesized one for non-error variants).
326    pub fn unwrap_or_else<F: FnOnce(ErrorEffect) -> T>(self, f: F) -> T {
327        match self {
328            Effect::Ok(v) => v,
329            Effect::Err(e) => f(e),
330            Effect::Retry { reason, .. } => {
331                f(ErrorEffect::Operational(OperationalError::RetryExhausted {
332                    reason,
333                }))
334            }
335            Effect::Compensate { cause, .. } => f(*cause),
336            Effect::Pending { waiting_for, .. } => {
337                f(ErrorEffect::Operational(OperationalError::Timeout {
338                    operation: format!("Pending: {:?}", waiting_for),
339                }))
340            }
341            Effect::Batch(_) => f(ErrorEffect::Operational(OperationalError::Internal {
342                message: "Cannot unwrap Batch effect".to_string(),
343            })),
344        }
345    }
346}
347
348impl<T, E: Into<ErrorEffect>> From<Result<T, E>> for Effect<T> {
349    fn from(result: Result<T, E>) -> Self {
350        match result {
351            Ok(v) => Effect::Ok(v),
352            Err(e) => Effect::Err(e.into()),
353        }
354    }
355}
356
357// ============================================================================
358// ERROR EFFECT
359// ============================================================================
360
361/// An error effect that can be persisted and replayed.
362///
363/// This distinguishes between:
364/// - Domain errors: Business logic errors that must be tracked
365/// - Operational errors: Infrastructure errors that can be sampled/discarded
366#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
367#[serde(rename_all = "snake_case")]
368#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
369pub enum ErrorEffect {
370    /// Domain-level error (must persist, replay, affect downstream)
371    Domain(Box<DomainErrorContext>),
372    /// Operational error (telemetry only, can sample/discard)
373    Operational(OperationalError),
374}
375
376impl ErrorEffect {
377    /// Check if this is a domain error.
378    pub fn is_domain(&self) -> bool {
379        matches!(self, ErrorEffect::Domain(_))
380    }
381
382    /// Check if this is an operational error.
383    pub fn is_operational(&self) -> bool {
384        matches!(self, ErrorEffect::Operational(_))
385    }
386
387    /// Get the error kind for categorization.
388    pub fn kind(&self) -> ErrorKind {
389        match self {
390            ErrorEffect::Domain(ctx) => ctx.error.kind(),
391            ErrorEffect::Operational(op) => op.kind(),
392        }
393    }
394}
395
396impl fmt::Display for ErrorEffect {
397    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398        match self {
399            ErrorEffect::Domain(ctx) => write!(f, "Domain error: {}", ctx.error),
400            ErrorEffect::Operational(op) => write!(f, "Operational error: {}", op),
401        }
402    }
403}
404
405impl std::error::Error for ErrorEffect {}
406
407// ============================================================================
408// DOMAIN ERROR CONTEXT
409// ============================================================================
410
411/// Domain error with event context for correlation and replay.
412#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
413#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
414pub struct DomainErrorContext {
415    /// The domain error
416    pub error: DomainError,
417    /// Event that caused this error
418    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
419    pub source_event: EventId,
420    /// Position in the DAG where error occurred
421    pub position: DagPosition,
422    /// Correlation ID for tracing
423    #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
424    pub correlation_id: EventId,
425}
426
427// ============================================================================
428// DOMAIN ERRORS
429// ============================================================================
430
431/// Domain-level errors that affect business logic.
432///
433/// These errors must be persisted and can affect downstream processing.
434#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
435#[serde(rename_all = "snake_case")]
436#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
437pub enum DomainError {
438    // Entity errors
439    EntityNotFound {
440        entity_type: crate::EntityType,
441        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
442        id: Uuid,
443    },
444    EntityAlreadyExists {
445        entity_type: crate::EntityType,
446        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
447        id: Uuid,
448    },
449    EntityConflict {
450        entity_type: crate::EntityType,
451        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
452        id: Uuid,
453        reason: String,
454    },
455
456    // State errors
457    InvalidStateTransition {
458        entity_type: crate::EntityType,
459        from_state: String,
460        to_state: String,
461        reason: String,
462    },
463    StaleData {
464        entity_type: crate::EntityType,
465        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
466        id: Uuid,
467        expected_version: u64,
468        actual_version: u64,
469    },
470
471    // Validation errors
472    ValidationFailed {
473        field: String,
474        reason: String,
475    },
476    ConstraintViolation {
477        constraint: String,
478        reason: String,
479    },
480    CircularReference {
481        entity_type: crate::EntityType,
482        #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
483        ids: Vec<Uuid>,
484    },
485
486    // Business logic errors
487    Contradiction {
488        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
489        artifact_a: Uuid,
490        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
491        artifact_b: Uuid,
492        description: String,
493    },
494    QuotaExceeded {
495        resource: String,
496        limit: u64,
497        requested: u64,
498    },
499    PermissionDenied {
500        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
501        agent_id: Uuid,
502        action: String,
503        resource: String,
504    },
505
506    // Agent coordination errors
507    LockAcquisitionFailed {
508        resource: String,
509        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
510        holder: Uuid,
511    },
512    LockExpired {
513        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
514        lock_id: Uuid,
515    },
516    DelegationFailed {
517        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
518        delegation_id: Uuid,
519        reason: String,
520    },
521    HandoffFailed {
522        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
523        handoff_id: Uuid,
524        reason: String,
525    },
526
527    // Tool security errors
528    ToolPolicyViolation {
529        tool_name: String,
530        policy: String,
531        reason: String,
532    },
533
534    // PII safety errors
535    PiiEgressViolation {
536        field: String,
537        pii_type: String,
538        reason: String,
539    },
540    PiiManifestInvalid {
541        violations: Vec<String>,
542    },
543}
544
545impl DomainError {
546    /// Get the error kind for categorization.
547    pub fn kind(&self) -> ErrorKind {
548        match self {
549            DomainError::EntityNotFound { .. } => ErrorKind::NotFound,
550            DomainError::EntityAlreadyExists { .. } => ErrorKind::AlreadyExists,
551            DomainError::EntityConflict { .. } => ErrorKind::Conflict,
552            DomainError::InvalidStateTransition { .. } => ErrorKind::InvalidState,
553            DomainError::StaleData { .. } => ErrorKind::Conflict,
554            DomainError::ValidationFailed { .. } => ErrorKind::Validation,
555            DomainError::ConstraintViolation { .. } => ErrorKind::Validation,
556            DomainError::CircularReference { .. } => ErrorKind::Validation,
557            DomainError::Contradiction { .. } => ErrorKind::BusinessLogic,
558            DomainError::QuotaExceeded { .. } => ErrorKind::QuotaExceeded,
559            DomainError::PermissionDenied { .. } => ErrorKind::PermissionDenied,
560            DomainError::LockAcquisitionFailed { .. } => ErrorKind::LockFailed,
561            DomainError::LockExpired { .. } => ErrorKind::LockFailed,
562            DomainError::DelegationFailed { .. } => ErrorKind::CoordinationFailed,
563            DomainError::HandoffFailed { .. } => ErrorKind::CoordinationFailed,
564            DomainError::ToolPolicyViolation { .. } => ErrorKind::PermissionDenied,
565            DomainError::PiiEgressViolation { .. } => ErrorKind::PiiViolation,
566            DomainError::PiiManifestInvalid { .. } => ErrorKind::PiiViolation,
567        }
568    }
569}
570
571impl fmt::Display for DomainError {
572    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573        match self {
574            DomainError::EntityNotFound { entity_type, id } => {
575                write!(f, "{} not found: {}", entity_type, id)
576            }
577            DomainError::EntityAlreadyExists { entity_type, id } => {
578                write!(f, "{} already exists: {}", entity_type, id)
579            }
580            DomainError::EntityConflict {
581                entity_type,
582                id,
583                reason,
584            } => {
585                write!(f, "Conflict on {} {}: {}", entity_type, id, reason)
586            }
587            DomainError::InvalidStateTransition {
588                entity_type,
589                from_state,
590                to_state,
591                reason,
592            } => {
593                write!(
594                    f,
595                    "Invalid {} transition {} -> {}: {}",
596                    entity_type, from_state, to_state, reason
597                )
598            }
599            DomainError::StaleData {
600                entity_type,
601                id,
602                expected_version,
603                actual_version,
604            } => {
605                write!(
606                    f,
607                    "Stale {} {}: expected version {}, got {}",
608                    entity_type, id, expected_version, actual_version
609                )
610            }
611            DomainError::ValidationFailed { field, reason } => {
612                write!(f, "Validation failed for {}: {}", field, reason)
613            }
614            DomainError::ConstraintViolation { constraint, reason } => {
615                write!(f, "Constraint {} violated: {}", constraint, reason)
616            }
617            DomainError::CircularReference { entity_type, ids } => {
618                write!(f, "Circular reference in {}: {:?}", entity_type, ids)
619            }
620            DomainError::Contradiction {
621                artifact_a,
622                artifact_b,
623                description,
624            } => {
625                write!(
626                    f,
627                    "Contradiction between {} and {}: {}",
628                    artifact_a, artifact_b, description
629                )
630            }
631            DomainError::QuotaExceeded {
632                resource,
633                limit,
634                requested,
635            } => {
636                write!(
637                    f,
638                    "Quota exceeded for {}: limit {}, requested {}",
639                    resource, limit, requested
640                )
641            }
642            DomainError::PermissionDenied {
643                agent_id,
644                action,
645                resource,
646            } => {
647                write!(
648                    f,
649                    "Permission denied: agent {} cannot {} on {}",
650                    agent_id, action, resource
651                )
652            }
653            DomainError::LockAcquisitionFailed { resource, holder } => {
654                write!(
655                    f,
656                    "Lock acquisition failed for {}: held by {}",
657                    resource, holder
658                )
659            }
660            DomainError::LockExpired { lock_id } => {
661                write!(f, "Lock expired: {}", lock_id)
662            }
663            DomainError::DelegationFailed {
664                delegation_id,
665                reason,
666            } => {
667                write!(f, "Delegation {} failed: {}", delegation_id, reason)
668            }
669            DomainError::HandoffFailed { handoff_id, reason } => {
670                write!(f, "Handoff {} failed: {}", handoff_id, reason)
671            }
672            DomainError::ToolPolicyViolation {
673                tool_name,
674                policy,
675                reason,
676            } => {
677                write!(
678                    f,
679                    "Tool policy violation for '{}' ({}): {}",
680                    tool_name, policy, reason
681                )
682            }
683            DomainError::PiiEgressViolation {
684                field,
685                pii_type,
686                reason,
687            } => {
688                write!(
689                    f,
690                    "PII egress violation on field '{}' ({}): {}",
691                    field, pii_type, reason
692                )
693            }
694            DomainError::PiiManifestInvalid { violations } => {
695                write!(f, "PII manifest invalid: {}", violations.join("; "))
696            }
697        }
698    }
699}
700
701impl std::error::Error for DomainError {}
702
703// ============================================================================
704// OPERATIONAL ERRORS
705// ============================================================================
706
707/// Operational errors that don't affect business logic.
708///
709/// These errors are for infrastructure concerns and can be sampled/discarded
710/// for telemetry purposes.
711#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
712#[serde(rename_all = "snake_case")]
713#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
714pub enum OperationalError {
715    /// Network or connection error
716    NetworkError { message: String },
717    /// Database connection error
718    DatabaseConnectionError { message: String },
719    /// Operation timed out
720    Timeout { operation: String },
721    /// Rate limited by external service
722    RateLimited {
723        service: String,
724        retry_after_ms: i64,
725    },
726    /// Internal error (unexpected)
727    Internal { message: String },
728    /// Resource temporarily unavailable
729    Unavailable { resource: String },
730    /// Retries exhausted
731    RetryExhausted { reason: String },
732    /// Serialization/deserialization error
733    SerializationError { message: String },
734}
735
736impl OperationalError {
737    /// Get the error kind for categorization.
738    pub fn kind(&self) -> ErrorKind {
739        match self {
740            OperationalError::NetworkError { .. } => ErrorKind::Network,
741            OperationalError::DatabaseConnectionError { .. } => ErrorKind::Database,
742            OperationalError::Timeout { .. } => ErrorKind::Timeout,
743            OperationalError::RateLimited { .. } => ErrorKind::RateLimited,
744            OperationalError::Internal { .. } => ErrorKind::Internal,
745            OperationalError::Unavailable { .. } => ErrorKind::Unavailable,
746            OperationalError::RetryExhausted { .. } => ErrorKind::RetryExhausted,
747            OperationalError::SerializationError { .. } => ErrorKind::Serialization,
748        }
749    }
750}
751
752impl fmt::Display for OperationalError {
753    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
754        match self {
755            OperationalError::NetworkError { message } => write!(f, "Network error: {}", message),
756            OperationalError::DatabaseConnectionError { message } => {
757                write!(f, "Database error: {}", message)
758            }
759            OperationalError::Timeout { operation } => write!(f, "Timeout: {}", operation),
760            OperationalError::RateLimited {
761                service,
762                retry_after_ms,
763            } => {
764                write!(
765                    f,
766                    "Rate limited by {}, retry after {}ms",
767                    service, retry_after_ms
768                )
769            }
770            OperationalError::Internal { message } => write!(f, "Internal error: {}", message),
771            OperationalError::Unavailable { resource } => write!(f, "Unavailable: {}", resource),
772            OperationalError::RetryExhausted { reason } => {
773                write!(f, "Retries exhausted: {}", reason)
774            }
775            OperationalError::SerializationError { message } => {
776                write!(f, "Serialization error: {}", message)
777            }
778        }
779    }
780}
781
782impl std::error::Error for OperationalError {}
783
784// ============================================================================
785// ERROR KIND
786// ============================================================================
787
788/// High-level error categorization for metrics and routing.
789#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
790#[serde(rename_all = "snake_case")]
791#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
792pub enum ErrorKind {
793    // Domain error kinds
794    NotFound,
795    AlreadyExists,
796    Conflict,
797    InvalidState,
798    Validation,
799    BusinessLogic,
800    QuotaExceeded,
801    PermissionDenied,
802    LockFailed,
803    CoordinationFailed,
804    PiiViolation,
805
806    // Operational error kinds
807    Network,
808    Database,
809    Timeout,
810    RateLimited,
811    Internal,
812    Unavailable,
813    RetryExhausted,
814    Serialization,
815}
816
817impl ErrorKind {
818    /// Check if this is a retriable error kind.
819    pub fn is_retriable(&self) -> bool {
820        matches!(
821            self,
822            ErrorKind::Network
823                | ErrorKind::Database
824                | ErrorKind::Timeout
825                | ErrorKind::RateLimited
826                | ErrorKind::Unavailable
827        )
828    }
829
830    /// Check if this is a domain error kind.
831    pub fn is_domain(&self) -> bool {
832        matches!(
833            self,
834            ErrorKind::NotFound
835                | ErrorKind::AlreadyExists
836                | ErrorKind::Conflict
837                | ErrorKind::InvalidState
838                | ErrorKind::Validation
839                | ErrorKind::BusinessLogic
840                | ErrorKind::QuotaExceeded
841                | ErrorKind::PermissionDenied
842                | ErrorKind::LockFailed
843                | ErrorKind::CoordinationFailed
844                | ErrorKind::PiiViolation
845        )
846    }
847}
848
849// ============================================================================
850// ERROR CLASSIFIABLE TRAIT
851// ============================================================================
852
853/// Unified error classification for retry/intervention decisions across all error types.
854///
855/// Generalizes `ErrorKind::is_retriable()` and adds `requires_user_intervention()`
856/// for errors that need a human to resolve (e.g., expired auth, disk full, quota hit).
857pub trait ErrorClassifiable {
858    /// Whether this error can be retried automatically.
859    fn is_retryable(&self) -> bool;
860
861    /// Whether this error requires a human to intervene.
862    fn requires_user_intervention(&self) -> bool;
863
864    /// Suggested retry delay in milliseconds (None = use default backoff).
865    fn retry_after_ms(&self) -> Option<u64> {
866        None
867    }
868
869    /// Maximum number of retries before giving up.
870    fn max_retries(&self) -> u32 {
871        3
872    }
873}
874
875impl ErrorClassifiable for ErrorKind {
876    fn is_retryable(&self) -> bool {
877        self.is_retriable()
878    }
879
880    fn requires_user_intervention(&self) -> bool {
881        matches!(self, ErrorKind::PermissionDenied | ErrorKind::QuotaExceeded)
882    }
883}
884
885impl ErrorClassifiable for ErrorEffect {
886    fn is_retryable(&self) -> bool {
887        self.kind().is_retryable()
888    }
889
890    fn requires_user_intervention(&self) -> bool {
891        self.kind().requires_user_intervention()
892    }
893
894    fn retry_after_ms(&self) -> Option<u64> {
895        match self {
896            ErrorEffect::Operational(OperationalError::RateLimited { retry_after_ms, .. }) => {
897                Some((*retry_after_ms).max(0) as u64)
898            }
899            _ => None,
900        }
901    }
902}
903
904impl ErrorClassifiable for OperationalError {
905    fn is_retryable(&self) -> bool {
906        self.kind().is_retryable()
907    }
908
909    fn requires_user_intervention(&self) -> bool {
910        false
911    }
912
913    fn retry_after_ms(&self) -> Option<u64> {
914        match self {
915            OperationalError::RateLimited { retry_after_ms, .. } => {
916                Some((*retry_after_ms).max(0) as u64)
917            }
918            _ => None,
919        }
920    }
921}
922
923// ============================================================================
924// COMPENSATION ACTION
925// ============================================================================
926
927/// Action to take for compensating a failed operation.
928#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
929#[serde(rename_all = "snake_case")]
930#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
931pub enum CompensationAction {
932    /// Rollback changes
933    Rollback {
934        /// Events to undo
935        #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
936        events: Vec<EventId>,
937    },
938    /// Notify an agent about the failure
939    NotifyAgent {
940        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
941        agent_id: Uuid,
942        message: String,
943    },
944    /// Release held resources
945    ReleaseResources {
946        #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
947        resource_ids: Vec<Uuid>,
948    },
949    /// Custom compensation
950    Custom {
951        action_type: String,
952        #[cfg_attr(feature = "openapi", schema(value_type = Object))]
953        payload: serde_json::Value,
954    },
955}
956
957// ============================================================================
958// WAIT CONDITION
959// ============================================================================
960
961/// Condition that a pending effect is waiting for.
962#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
963#[serde(rename_all = "snake_case")]
964#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
965pub enum WaitCondition {
966    /// Waiting for an event
967    Event {
968        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
969        event_id: EventId,
970    },
971    /// Waiting for a lock to be released
972    Lock {
973        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
974        lock_id: Uuid,
975    },
976    /// Waiting for a delegation to complete
977    Delegation {
978        #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
979        delegation_id: Uuid,
980    },
981    /// Waiting for a timeout
982    Timeout {
983        /// Timestamp in microseconds when to resume
984        resume_at: i64,
985    },
986    /// Waiting for external input
987    ExternalInput { source: String },
988}
989
990// ============================================================================
991// SERDE HELPERS
992// ============================================================================
993
994mod duration_millis {
995    use serde::{Deserialize, Deserializer, Serialize, Serializer};
996    use std::time::Duration;
997
998    pub fn serialize<S: Serializer>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> {
999        duration.as_millis().serialize(serializer)
1000    }
1001
1002    pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Duration, D::Error> {
1003        let millis = u64::deserialize(deserializer)?;
1004        Ok(Duration::from_millis(millis))
1005    }
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010    use super::*;
1011    use crate::identity::{EntityIdType, EventId};
1012
1013    #[test]
1014    fn test_effect_ok() {
1015        let effect: Effect<i32> = Effect::ok(42);
1016        assert!(effect.is_ok());
1017        assert_eq!(effect.expect("effect should be ok"), 42);
1018    }
1019
1020    #[test]
1021    fn test_effect_err() {
1022        let effect: Effect<i32> = Effect::domain_error(
1023            DomainError::EntityNotFound {
1024                entity_type: crate::EntityType::Trajectory,
1025                id: EventId::now_v7().as_uuid(),
1026            },
1027            EventId::now_v7(),
1028            DagPosition::root(),
1029        );
1030        assert!(effect.is_err());
1031    }
1032
1033    #[test]
1034    fn test_effect_retry() {
1035        let effect: Effect<i32> = Effect::retry(Duration::from_secs(1), 1, 3, "Temporary failure");
1036        assert!(effect.needs_retry());
1037    }
1038
1039    #[test]
1040    fn test_effect_map() {
1041        let effect: Effect<i32> = Effect::ok(42);
1042        let mapped = effect.map(|n| n * 2);
1043        assert_eq!(mapped.expect("mapped effect should be ok"), 84);
1044    }
1045
1046    #[test]
1047    fn test_error_kind_retriable() {
1048        assert!(ErrorKind::Network.is_retriable());
1049        assert!(ErrorKind::Timeout.is_retriable());
1050        assert!(!ErrorKind::NotFound.is_retriable());
1051        assert!(!ErrorKind::PermissionDenied.is_retriable());
1052    }
1053
1054    #[test]
1055    fn test_domain_vs_operational() {
1056        let domain = ErrorEffect::Domain(Box::new(DomainErrorContext {
1057            error: DomainError::EntityNotFound {
1058                entity_type: crate::EntityType::Artifact,
1059                id: EventId::now_v7().as_uuid(),
1060            },
1061            source_event: EventId::now_v7(),
1062            position: DagPosition::root(),
1063            correlation_id: EventId::now_v7(),
1064        }));
1065        assert!(domain.is_domain());
1066
1067        let operational = ErrorEffect::Operational(OperationalError::Timeout {
1068            operation: "test".to_string(),
1069        });
1070        assert!(operational.is_operational());
1071    }
1072
1073    #[test]
1074    fn error_classifiable_matches_is_retriable() {
1075        let all_kinds = [
1076            ErrorKind::NotFound,
1077            ErrorKind::AlreadyExists,
1078            ErrorKind::Conflict,
1079            ErrorKind::InvalidState,
1080            ErrorKind::Validation,
1081            ErrorKind::BusinessLogic,
1082            ErrorKind::QuotaExceeded,
1083            ErrorKind::PermissionDenied,
1084            ErrorKind::LockFailed,
1085            ErrorKind::CoordinationFailed,
1086            ErrorKind::Network,
1087            ErrorKind::Database,
1088            ErrorKind::Timeout,
1089            ErrorKind::RateLimited,
1090            ErrorKind::Internal,
1091            ErrorKind::Unavailable,
1092            ErrorKind::RetryExhausted,
1093            ErrorKind::Serialization,
1094        ];
1095        for kind in &all_kinds {
1096            assert_eq!(
1097                kind.is_retryable(),
1098                kind.is_retriable(),
1099                "ErrorClassifiable::is_retryable() diverged from is_retriable() for {:?}",
1100                kind
1101            );
1102        }
1103    }
1104
1105    #[test]
1106    fn error_classifiable_user_intervention() {
1107        assert!(ErrorKind::PermissionDenied.requires_user_intervention());
1108        assert!(ErrorKind::QuotaExceeded.requires_user_intervention());
1109        assert!(!ErrorKind::Network.requires_user_intervention());
1110        assert!(!ErrorKind::NotFound.requires_user_intervention());
1111        assert!(!ErrorKind::Timeout.requires_user_intervention());
1112    }
1113
1114    #[test]
1115    fn error_classifiable_retry_after() {
1116        let rate_limited = OperationalError::RateLimited {
1117            service: "openai".to_string(),
1118            retry_after_ms: 5000,
1119        };
1120        assert_eq!(rate_limited.retry_after_ms(), Some(5000));
1121        assert!(rate_limited.is_retryable());
1122        assert!(!rate_limited.requires_user_intervention());
1123
1124        let timeout = OperationalError::Timeout {
1125            operation: "db_query".to_string(),
1126        };
1127        assert_eq!(timeout.retry_after_ms(), None);
1128
1129        let effect = ErrorEffect::Operational(OperationalError::RateLimited {
1130            service: "anthropic".to_string(),
1131            retry_after_ms: 2000,
1132        });
1133        assert_eq!(effect.retry_after_ms(), Some(2000));
1134    }
1135}