1use crate::{DagPosition, EventId};
43use serde::{Deserialize, Serialize};
44use std::fmt;
45use std::time::Duration;
46use uuid::Uuid;
47
48#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66#[serde(rename_all = "snake_case")]
67#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
68pub enum Effect<T> {
69 Ok(T),
71 Err(ErrorEffect),
73 Retry {
75 #[serde(with = "duration_millis")]
77 after: Duration,
78 attempt: u32,
80 max_attempts: u32,
82 reason: String,
84 },
85 Compensate {
87 action: CompensationAction,
89 cause: Box<ErrorEffect>,
91 },
92 Pending {
94 waiting_for: WaitCondition,
96 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
98 resume_token: EventId,
99 #[cfg_attr(feature = "openapi", schema(value_type = Object))]
101 progress_hint: Option<serde_json::Value>,
102 },
103 Batch(Vec<Effect<T>>),
105}
106
107impl<T> Effect<T> {
108 pub fn ok(value: T) -> Self {
110 Effect::Ok(value)
111 }
112
113 pub fn err(error: ErrorEffect) -> Self {
115 Effect::Err(error)
116 }
117
118 pub fn domain_error(error: DomainError, source_event: EventId, position: DagPosition) -> Self {
120 Effect::Err(ErrorEffect::Domain(Box::new(DomainErrorContext {
121 error,
122 source_event,
123 position,
124 correlation_id: source_event, })))
126 }
127
128 pub fn retry(
130 after: Duration,
131 attempt: u32,
132 max_attempts: u32,
133 reason: impl Into<String>,
134 ) -> Self {
135 Effect::Retry {
136 after,
137 attempt,
138 max_attempts,
139 reason: reason.into(),
140 }
141 }
142
143 pub fn pending(waiting_for: WaitCondition, resume_token: EventId) -> Self {
145 Effect::Pending {
146 waiting_for,
147 resume_token,
148 progress_hint: None,
149 }
150 }
151
152 pub fn is_ok(&self) -> bool {
154 matches!(self, Effect::Ok(_))
155 }
156
157 pub fn is_err(&self) -> bool {
159 matches!(self, Effect::Err(_))
160 }
161
162 pub fn needs_retry(&self) -> bool {
164 matches!(self, Effect::Retry { .. })
165 }
166
167 pub fn is_pending(&self) -> bool {
169 matches!(self, Effect::Pending { .. })
170 }
171
172 pub fn into_result(self) -> Result<T, ErrorEffect> {
174 match self {
175 Effect::Ok(v) => Ok(v),
176 Effect::Err(e) => Err(e),
177 Effect::Retry { reason, .. } => {
178 Err(ErrorEffect::Operational(OperationalError::RetryExhausted {
179 reason,
180 }))
181 }
182 Effect::Compensate { cause, .. } => Err(*cause),
183 Effect::Pending { waiting_for, .. } => {
184 Err(ErrorEffect::Operational(OperationalError::Timeout {
185 operation: format!("Pending: {:?}", waiting_for),
186 }))
187 }
188 Effect::Batch(effects) => {
189 let mut last_ok = None;
190 for effect in effects {
191 last_ok = Some(effect.into_result()?);
192 }
193 last_ok.ok_or_else(|| {
194 ErrorEffect::Operational(OperationalError::Internal {
195 message: "Empty batch".to_string(),
196 })
197 })
198 }
199 }
200 }
201
202 pub fn map<U, F: FnOnce(T) -> U + Clone>(self, f: F) -> Effect<U> {
204 match self {
205 Effect::Ok(v) => Effect::Ok(f(v)),
206 Effect::Err(e) => Effect::Err(e),
207 Effect::Retry {
208 after,
209 attempt,
210 max_attempts,
211 reason,
212 } => Effect::Retry {
213 after,
214 attempt,
215 max_attempts,
216 reason,
217 },
218 Effect::Compensate { action, cause } => Effect::Compensate { action, cause },
219 Effect::Pending {
220 waiting_for,
221 resume_token,
222 progress_hint,
223 } => Effect::Pending {
224 waiting_for,
225 resume_token,
226 progress_hint,
227 },
228 Effect::Batch(effects) => {
229 Effect::Batch(effects.into_iter().map(|e| e.map(f.clone())).collect())
230 }
231 }
232 }
233
234 pub fn unwrap(self) -> T {
236 match self {
237 Effect::Ok(v) => v,
238 _ => panic!(
239 "Called unwrap on non-Ok effect: {:?}",
240 std::any::type_name::<Self>()
241 ),
242 }
243 }
244
245 pub fn expect(self, msg: &str) -> T {
247 match self {
248 Effect::Ok(v) => v,
249 _ => panic!("{}", msg),
250 }
251 }
252
253 pub fn and_then<U, F: FnOnce(T) -> Effect<U>>(self, f: F) -> Effect<U> {
258 match self {
259 Effect::Ok(v) => f(v),
260 Effect::Err(e) => Effect::Err(e),
261 Effect::Retry {
262 after,
263 attempt,
264 max_attempts,
265 reason,
266 } => Effect::Retry {
267 after,
268 attempt,
269 max_attempts,
270 reason,
271 },
272 Effect::Compensate { action, cause } => Effect::Compensate { action, cause },
273 Effect::Pending {
274 waiting_for,
275 resume_token,
276 progress_hint,
277 } => Effect::Pending {
278 waiting_for,
279 resume_token,
280 progress_hint,
281 },
282 Effect::Batch(_) => Effect::Err(ErrorEffect::Operational(OperationalError::Internal {
283 message: "Cannot and_then on Batch effect".to_string(),
284 })),
285 }
286 }
287
288 pub fn map_err<F: FnOnce(ErrorEffect) -> ErrorEffect + Clone>(self, f: F) -> Self {
291 match self {
292 Effect::Err(e) => Effect::Err(f(e)),
293 Effect::Batch(effects) => {
294 Effect::Batch(effects.into_iter().map(|e| e.map_err(f.clone())).collect())
295 }
296 other => other,
297 }
298 }
299
300 pub fn or_else<F: FnOnce(ErrorEffect) -> Effect<T> + Clone>(self, f: F) -> Self {
303 match self {
304 Effect::Err(e) => f(e),
305 Effect::Batch(effects) => {
306 Effect::Batch(effects.into_iter().map(|e| e.or_else(f.clone())).collect())
307 }
308 other => other,
309 }
310 }
311
312 pub fn unwrap_or(self, default: T) -> T {
316 match self {
317 Effect::Ok(v) => v,
318 _ => default,
319 }
320 }
321
322 pub fn unwrap_or_else<F: FnOnce(ErrorEffect) -> T>(self, f: F) -> T {
327 match self {
328 Effect::Ok(v) => v,
329 Effect::Err(e) => f(e),
330 Effect::Retry { reason, .. } => {
331 f(ErrorEffect::Operational(OperationalError::RetryExhausted {
332 reason,
333 }))
334 }
335 Effect::Compensate { cause, .. } => f(*cause),
336 Effect::Pending { waiting_for, .. } => {
337 f(ErrorEffect::Operational(OperationalError::Timeout {
338 operation: format!("Pending: {:?}", waiting_for),
339 }))
340 }
341 Effect::Batch(_) => f(ErrorEffect::Operational(OperationalError::Internal {
342 message: "Cannot unwrap Batch effect".to_string(),
343 })),
344 }
345 }
346}
347
348impl<T, E: Into<ErrorEffect>> From<Result<T, E>> for Effect<T> {
349 fn from(result: Result<T, E>) -> Self {
350 match result {
351 Ok(v) => Effect::Ok(v),
352 Err(e) => Effect::Err(e.into()),
353 }
354 }
355}
356
357#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
367#[serde(rename_all = "snake_case")]
368#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
369pub enum ErrorEffect {
370 Domain(Box<DomainErrorContext>),
372 Operational(OperationalError),
374}
375
376impl ErrorEffect {
377 pub fn is_domain(&self) -> bool {
379 matches!(self, ErrorEffect::Domain(_))
380 }
381
382 pub fn is_operational(&self) -> bool {
384 matches!(self, ErrorEffect::Operational(_))
385 }
386
387 pub fn kind(&self) -> ErrorKind {
389 match self {
390 ErrorEffect::Domain(ctx) => ctx.error.kind(),
391 ErrorEffect::Operational(op) => op.kind(),
392 }
393 }
394}
395
396impl fmt::Display for ErrorEffect {
397 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
398 match self {
399 ErrorEffect::Domain(ctx) => write!(f, "Domain error: {}", ctx.error),
400 ErrorEffect::Operational(op) => write!(f, "Operational error: {}", op),
401 }
402 }
403}
404
405impl std::error::Error for ErrorEffect {}
406
407#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
413#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
414pub struct DomainErrorContext {
415 pub error: DomainError,
417 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
419 pub source_event: EventId,
420 pub position: DagPosition,
422 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
424 pub correlation_id: EventId,
425}
426
427#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
435#[serde(rename_all = "snake_case")]
436#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
437pub enum DomainError {
438 EntityNotFound {
440 entity_type: crate::EntityType,
441 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
442 id: Uuid,
443 },
444 EntityAlreadyExists {
445 entity_type: crate::EntityType,
446 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
447 id: Uuid,
448 },
449 EntityConflict {
450 entity_type: crate::EntityType,
451 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
452 id: Uuid,
453 reason: String,
454 },
455
456 InvalidStateTransition {
458 entity_type: crate::EntityType,
459 from_state: String,
460 to_state: String,
461 reason: String,
462 },
463 StaleData {
464 entity_type: crate::EntityType,
465 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
466 id: Uuid,
467 expected_version: u64,
468 actual_version: u64,
469 },
470
471 ValidationFailed {
473 field: String,
474 reason: String,
475 },
476 ConstraintViolation {
477 constraint: String,
478 reason: String,
479 },
480 CircularReference {
481 entity_type: crate::EntityType,
482 #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
483 ids: Vec<Uuid>,
484 },
485
486 Contradiction {
488 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
489 artifact_a: Uuid,
490 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
491 artifact_b: Uuid,
492 description: String,
493 },
494 QuotaExceeded {
495 resource: String,
496 limit: u64,
497 requested: u64,
498 },
499 PermissionDenied {
500 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
501 agent_id: Uuid,
502 action: String,
503 resource: String,
504 },
505
506 LockAcquisitionFailed {
508 resource: String,
509 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
510 holder: Uuid,
511 },
512 LockExpired {
513 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
514 lock_id: Uuid,
515 },
516 DelegationFailed {
517 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
518 delegation_id: Uuid,
519 reason: String,
520 },
521 HandoffFailed {
522 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
523 handoff_id: Uuid,
524 reason: String,
525 },
526
527 ToolPolicyViolation {
529 tool_name: String,
530 policy: String,
531 reason: String,
532 },
533
534 PiiEgressViolation {
536 field: String,
537 pii_type: String,
538 reason: String,
539 },
540 PiiManifestInvalid {
541 violations: Vec<String>,
542 },
543}
544
545impl DomainError {
546 pub fn kind(&self) -> ErrorKind {
548 match self {
549 DomainError::EntityNotFound { .. } => ErrorKind::NotFound,
550 DomainError::EntityAlreadyExists { .. } => ErrorKind::AlreadyExists,
551 DomainError::EntityConflict { .. } => ErrorKind::Conflict,
552 DomainError::InvalidStateTransition { .. } => ErrorKind::InvalidState,
553 DomainError::StaleData { .. } => ErrorKind::Conflict,
554 DomainError::ValidationFailed { .. } => ErrorKind::Validation,
555 DomainError::ConstraintViolation { .. } => ErrorKind::Validation,
556 DomainError::CircularReference { .. } => ErrorKind::Validation,
557 DomainError::Contradiction { .. } => ErrorKind::BusinessLogic,
558 DomainError::QuotaExceeded { .. } => ErrorKind::QuotaExceeded,
559 DomainError::PermissionDenied { .. } => ErrorKind::PermissionDenied,
560 DomainError::LockAcquisitionFailed { .. } => ErrorKind::LockFailed,
561 DomainError::LockExpired { .. } => ErrorKind::LockFailed,
562 DomainError::DelegationFailed { .. } => ErrorKind::CoordinationFailed,
563 DomainError::HandoffFailed { .. } => ErrorKind::CoordinationFailed,
564 DomainError::ToolPolicyViolation { .. } => ErrorKind::PermissionDenied,
565 DomainError::PiiEgressViolation { .. } => ErrorKind::PiiViolation,
566 DomainError::PiiManifestInvalid { .. } => ErrorKind::PiiViolation,
567 }
568 }
569}
570
571impl fmt::Display for DomainError {
572 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
573 match self {
574 DomainError::EntityNotFound { entity_type, id } => {
575 write!(f, "{} not found: {}", entity_type, id)
576 }
577 DomainError::EntityAlreadyExists { entity_type, id } => {
578 write!(f, "{} already exists: {}", entity_type, id)
579 }
580 DomainError::EntityConflict {
581 entity_type,
582 id,
583 reason,
584 } => {
585 write!(f, "Conflict on {} {}: {}", entity_type, id, reason)
586 }
587 DomainError::InvalidStateTransition {
588 entity_type,
589 from_state,
590 to_state,
591 reason,
592 } => {
593 write!(
594 f,
595 "Invalid {} transition {} -> {}: {}",
596 entity_type, from_state, to_state, reason
597 )
598 }
599 DomainError::StaleData {
600 entity_type,
601 id,
602 expected_version,
603 actual_version,
604 } => {
605 write!(
606 f,
607 "Stale {} {}: expected version {}, got {}",
608 entity_type, id, expected_version, actual_version
609 )
610 }
611 DomainError::ValidationFailed { field, reason } => {
612 write!(f, "Validation failed for {}: {}", field, reason)
613 }
614 DomainError::ConstraintViolation { constraint, reason } => {
615 write!(f, "Constraint {} violated: {}", constraint, reason)
616 }
617 DomainError::CircularReference { entity_type, ids } => {
618 write!(f, "Circular reference in {}: {:?}", entity_type, ids)
619 }
620 DomainError::Contradiction {
621 artifact_a,
622 artifact_b,
623 description,
624 } => {
625 write!(
626 f,
627 "Contradiction between {} and {}: {}",
628 artifact_a, artifact_b, description
629 )
630 }
631 DomainError::QuotaExceeded {
632 resource,
633 limit,
634 requested,
635 } => {
636 write!(
637 f,
638 "Quota exceeded for {}: limit {}, requested {}",
639 resource, limit, requested
640 )
641 }
642 DomainError::PermissionDenied {
643 agent_id,
644 action,
645 resource,
646 } => {
647 write!(
648 f,
649 "Permission denied: agent {} cannot {} on {}",
650 agent_id, action, resource
651 )
652 }
653 DomainError::LockAcquisitionFailed { resource, holder } => {
654 write!(
655 f,
656 "Lock acquisition failed for {}: held by {}",
657 resource, holder
658 )
659 }
660 DomainError::LockExpired { lock_id } => {
661 write!(f, "Lock expired: {}", lock_id)
662 }
663 DomainError::DelegationFailed {
664 delegation_id,
665 reason,
666 } => {
667 write!(f, "Delegation {} failed: {}", delegation_id, reason)
668 }
669 DomainError::HandoffFailed { handoff_id, reason } => {
670 write!(f, "Handoff {} failed: {}", handoff_id, reason)
671 }
672 DomainError::ToolPolicyViolation {
673 tool_name,
674 policy,
675 reason,
676 } => {
677 write!(
678 f,
679 "Tool policy violation for '{}' ({}): {}",
680 tool_name, policy, reason
681 )
682 }
683 DomainError::PiiEgressViolation {
684 field,
685 pii_type,
686 reason,
687 } => {
688 write!(
689 f,
690 "PII egress violation on field '{}' ({}): {}",
691 field, pii_type, reason
692 )
693 }
694 DomainError::PiiManifestInvalid { violations } => {
695 write!(f, "PII manifest invalid: {}", violations.join("; "))
696 }
697 }
698 }
699}
700
701impl std::error::Error for DomainError {}
702
703#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
712#[serde(rename_all = "snake_case")]
713#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
714pub enum OperationalError {
715 NetworkError { message: String },
717 DatabaseConnectionError { message: String },
719 Timeout { operation: String },
721 RateLimited {
723 service: String,
724 retry_after_ms: i64,
725 },
726 Internal { message: String },
728 Unavailable { resource: String },
730 RetryExhausted { reason: String },
732 SerializationError { message: String },
734}
735
736impl OperationalError {
737 pub fn kind(&self) -> ErrorKind {
739 match self {
740 OperationalError::NetworkError { .. } => ErrorKind::Network,
741 OperationalError::DatabaseConnectionError { .. } => ErrorKind::Database,
742 OperationalError::Timeout { .. } => ErrorKind::Timeout,
743 OperationalError::RateLimited { .. } => ErrorKind::RateLimited,
744 OperationalError::Internal { .. } => ErrorKind::Internal,
745 OperationalError::Unavailable { .. } => ErrorKind::Unavailable,
746 OperationalError::RetryExhausted { .. } => ErrorKind::RetryExhausted,
747 OperationalError::SerializationError { .. } => ErrorKind::Serialization,
748 }
749 }
750}
751
752impl fmt::Display for OperationalError {
753 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
754 match self {
755 OperationalError::NetworkError { message } => write!(f, "Network error: {}", message),
756 OperationalError::DatabaseConnectionError { message } => {
757 write!(f, "Database error: {}", message)
758 }
759 OperationalError::Timeout { operation } => write!(f, "Timeout: {}", operation),
760 OperationalError::RateLimited {
761 service,
762 retry_after_ms,
763 } => {
764 write!(
765 f,
766 "Rate limited by {}, retry after {}ms",
767 service, retry_after_ms
768 )
769 }
770 OperationalError::Internal { message } => write!(f, "Internal error: {}", message),
771 OperationalError::Unavailable { resource } => write!(f, "Unavailable: {}", resource),
772 OperationalError::RetryExhausted { reason } => {
773 write!(f, "Retries exhausted: {}", reason)
774 }
775 OperationalError::SerializationError { message } => {
776 write!(f, "Serialization error: {}", message)
777 }
778 }
779 }
780}
781
782impl std::error::Error for OperationalError {}
783
784#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
790#[serde(rename_all = "snake_case")]
791#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
792pub enum ErrorKind {
793 NotFound,
795 AlreadyExists,
796 Conflict,
797 InvalidState,
798 Validation,
799 BusinessLogic,
800 QuotaExceeded,
801 PermissionDenied,
802 LockFailed,
803 CoordinationFailed,
804 PiiViolation,
805
806 Network,
808 Database,
809 Timeout,
810 RateLimited,
811 Internal,
812 Unavailable,
813 RetryExhausted,
814 Serialization,
815}
816
817impl ErrorKind {
818 pub fn is_retriable(&self) -> bool {
820 matches!(
821 self,
822 ErrorKind::Network
823 | ErrorKind::Database
824 | ErrorKind::Timeout
825 | ErrorKind::RateLimited
826 | ErrorKind::Unavailable
827 )
828 }
829
830 pub fn is_domain(&self) -> bool {
832 matches!(
833 self,
834 ErrorKind::NotFound
835 | ErrorKind::AlreadyExists
836 | ErrorKind::Conflict
837 | ErrorKind::InvalidState
838 | ErrorKind::Validation
839 | ErrorKind::BusinessLogic
840 | ErrorKind::QuotaExceeded
841 | ErrorKind::PermissionDenied
842 | ErrorKind::LockFailed
843 | ErrorKind::CoordinationFailed
844 | ErrorKind::PiiViolation
845 )
846 }
847}
848
849pub trait ErrorClassifiable {
858 fn is_retryable(&self) -> bool;
860
861 fn requires_user_intervention(&self) -> bool;
863
864 fn retry_after_ms(&self) -> Option<u64> {
866 None
867 }
868
869 fn max_retries(&self) -> u32 {
871 3
872 }
873}
874
875impl ErrorClassifiable for ErrorKind {
876 fn is_retryable(&self) -> bool {
877 self.is_retriable()
878 }
879
880 fn requires_user_intervention(&self) -> bool {
881 matches!(self, ErrorKind::PermissionDenied | ErrorKind::QuotaExceeded)
882 }
883}
884
885impl ErrorClassifiable for ErrorEffect {
886 fn is_retryable(&self) -> bool {
887 self.kind().is_retryable()
888 }
889
890 fn requires_user_intervention(&self) -> bool {
891 self.kind().requires_user_intervention()
892 }
893
894 fn retry_after_ms(&self) -> Option<u64> {
895 match self {
896 ErrorEffect::Operational(OperationalError::RateLimited { retry_after_ms, .. }) => {
897 Some((*retry_after_ms).max(0) as u64)
898 }
899 _ => None,
900 }
901 }
902}
903
904impl ErrorClassifiable for OperationalError {
905 fn is_retryable(&self) -> bool {
906 self.kind().is_retryable()
907 }
908
909 fn requires_user_intervention(&self) -> bool {
910 false
911 }
912
913 fn retry_after_ms(&self) -> Option<u64> {
914 match self {
915 OperationalError::RateLimited { retry_after_ms, .. } => {
916 Some((*retry_after_ms).max(0) as u64)
917 }
918 _ => None,
919 }
920 }
921}
922
923#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
929#[serde(rename_all = "snake_case")]
930#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
931pub enum CompensationAction {
932 Rollback {
934 #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
936 events: Vec<EventId>,
937 },
938 NotifyAgent {
940 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
941 agent_id: Uuid,
942 message: String,
943 },
944 ReleaseResources {
946 #[cfg_attr(feature = "openapi", schema(value_type = Vec<String>))]
947 resource_ids: Vec<Uuid>,
948 },
949 Custom {
951 action_type: String,
952 #[cfg_attr(feature = "openapi", schema(value_type = Object))]
953 payload: serde_json::Value,
954 },
955}
956
957#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
963#[serde(rename_all = "snake_case")]
964#[cfg_attr(feature = "openapi", derive(utoipa::ToSchema))]
965pub enum WaitCondition {
966 Event {
968 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
969 event_id: EventId,
970 },
971 Lock {
973 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
974 lock_id: Uuid,
975 },
976 Delegation {
978 #[cfg_attr(feature = "openapi", schema(value_type = String, format = "uuid"))]
979 delegation_id: Uuid,
980 },
981 Timeout {
983 resume_at: i64,
985 },
986 ExternalInput { source: String },
988}
989
990mod duration_millis {
995 use serde::{Deserialize, Deserializer, Serialize, Serializer};
996 use std::time::Duration;
997
998 pub fn serialize<S: Serializer>(duration: &Duration, serializer: S) -> Result<S::Ok, S::Error> {
999 duration.as_millis().serialize(serializer)
1000 }
1001
1002 pub fn deserialize<'de, D: Deserializer<'de>>(deserializer: D) -> Result<Duration, D::Error> {
1003 let millis = u64::deserialize(deserializer)?;
1004 Ok(Duration::from_millis(millis))
1005 }
1006}
1007
1008#[cfg(test)]
1009mod tests {
1010 use super::*;
1011 use crate::identity::{EntityIdType, EventId};
1012
1013 #[test]
1014 fn test_effect_ok() {
1015 let effect: Effect<i32> = Effect::ok(42);
1016 assert!(effect.is_ok());
1017 assert_eq!(effect.expect("effect should be ok"), 42);
1018 }
1019
1020 #[test]
1021 fn test_effect_err() {
1022 let effect: Effect<i32> = Effect::domain_error(
1023 DomainError::EntityNotFound {
1024 entity_type: crate::EntityType::Trajectory,
1025 id: EventId::now_v7().as_uuid(),
1026 },
1027 EventId::now_v7(),
1028 DagPosition::root(),
1029 );
1030 assert!(effect.is_err());
1031 }
1032
1033 #[test]
1034 fn test_effect_retry() {
1035 let effect: Effect<i32> = Effect::retry(Duration::from_secs(1), 1, 3, "Temporary failure");
1036 assert!(effect.needs_retry());
1037 }
1038
1039 #[test]
1040 fn test_effect_map() {
1041 let effect: Effect<i32> = Effect::ok(42);
1042 let mapped = effect.map(|n| n * 2);
1043 assert_eq!(mapped.expect("mapped effect should be ok"), 84);
1044 }
1045
1046 #[test]
1047 fn test_error_kind_retriable() {
1048 assert!(ErrorKind::Network.is_retriable());
1049 assert!(ErrorKind::Timeout.is_retriable());
1050 assert!(!ErrorKind::NotFound.is_retriable());
1051 assert!(!ErrorKind::PermissionDenied.is_retriable());
1052 }
1053
1054 #[test]
1055 fn test_domain_vs_operational() {
1056 let domain = ErrorEffect::Domain(Box::new(DomainErrorContext {
1057 error: DomainError::EntityNotFound {
1058 entity_type: crate::EntityType::Artifact,
1059 id: EventId::now_v7().as_uuid(),
1060 },
1061 source_event: EventId::now_v7(),
1062 position: DagPosition::root(),
1063 correlation_id: EventId::now_v7(),
1064 }));
1065 assert!(domain.is_domain());
1066
1067 let operational = ErrorEffect::Operational(OperationalError::Timeout {
1068 operation: "test".to_string(),
1069 });
1070 assert!(operational.is_operational());
1071 }
1072
1073 #[test]
1074 fn error_classifiable_matches_is_retriable() {
1075 let all_kinds = [
1076 ErrorKind::NotFound,
1077 ErrorKind::AlreadyExists,
1078 ErrorKind::Conflict,
1079 ErrorKind::InvalidState,
1080 ErrorKind::Validation,
1081 ErrorKind::BusinessLogic,
1082 ErrorKind::QuotaExceeded,
1083 ErrorKind::PermissionDenied,
1084 ErrorKind::LockFailed,
1085 ErrorKind::CoordinationFailed,
1086 ErrorKind::Network,
1087 ErrorKind::Database,
1088 ErrorKind::Timeout,
1089 ErrorKind::RateLimited,
1090 ErrorKind::Internal,
1091 ErrorKind::Unavailable,
1092 ErrorKind::RetryExhausted,
1093 ErrorKind::Serialization,
1094 ];
1095 for kind in &all_kinds {
1096 assert_eq!(
1097 kind.is_retryable(),
1098 kind.is_retriable(),
1099 "ErrorClassifiable::is_retryable() diverged from is_retriable() for {:?}",
1100 kind
1101 );
1102 }
1103 }
1104
1105 #[test]
1106 fn error_classifiable_user_intervention() {
1107 assert!(ErrorKind::PermissionDenied.requires_user_intervention());
1108 assert!(ErrorKind::QuotaExceeded.requires_user_intervention());
1109 assert!(!ErrorKind::Network.requires_user_intervention());
1110 assert!(!ErrorKind::NotFound.requires_user_intervention());
1111 assert!(!ErrorKind::Timeout.requires_user_intervention());
1112 }
1113
1114 #[test]
1115 fn error_classifiable_retry_after() {
1116 let rate_limited = OperationalError::RateLimited {
1117 service: "openai".to_string(),
1118 retry_after_ms: 5000,
1119 };
1120 assert_eq!(rate_limited.retry_after_ms(), Some(5000));
1121 assert!(rate_limited.is_retryable());
1122 assert!(!rate_limited.requires_user_intervention());
1123
1124 let timeout = OperationalError::Timeout {
1125 operation: "db_query".to_string(),
1126 };
1127 assert_eq!(timeout.retry_after_ms(), None);
1128
1129 let effect = ErrorEffect::Operational(OperationalError::RateLimited {
1130 service: "anthropic".to_string(),
1131 retry_after_ms: 2000,
1132 });
1133 assert_eq!(effect.retry_after_ms(), Some(2000));
1134 }
1135}