Coverage for postrfp / model / jobs.py: 96%
47 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1import enum
2from datetime import datetime
3from typing import Optional, TYPE_CHECKING, Any
5import sqids
6from sqlalchemy import (
7 Integer,
8 String,
9 DateTime,
10 ForeignKey,
11 Text,
12 JSON,
13 Enum as SqlaEnum,
14 CheckConstraint,
15 func,
16)
17from sqlalchemy.orm import Mapped, mapped_column, relationship
19from postrfp.model.meta import Base, SqidsMixin
21if TYPE_CHECKING:
22 from postrfp.model.audit import AuditEvent
23 from postrfp.model.notify import WebhookSubscription
24 from postrfp.model.datafeeds import Datafeed
27SQID = sqids.Sqids(
28 alphabet="fueamhq89463w7vx5n2dgryzctjskpb",
29 min_length=5,
30)
33class JobStatus(enum.Enum):
34 pending = "pending"
35 running = "running"
36 success = "success"
37 failed = "failed"
38 partial = "partial"
39 aborted = "aborted"
42class JobExecution(Base, SqidsMixin):
43 """
44 Unified model for tracking asynchronous job executions (Webhooks, Datafeeds, etc.)
45 """
47 __tablename__ = "job_executions"
48 _sqids_alphabet = "fueamhq89463w7vx5n2dgryzctjskpb"
50 id: Mapped[int] = mapped_column(Integer, primary_key=True)
52 # Discriminator / Type
53 job_type: Mapped[str] = mapped_column(String(20), nullable=False, index=True)
55 status: Mapped[JobStatus] = mapped_column(
56 SqlaEnum(JobStatus), default=JobStatus.pending, nullable=False, index=True
57 )
59 dagu_run_id: Mapped[Optional[str]] = mapped_column(
60 String(100), nullable=True, unique=True, index=True
61 )
63 created_at: Mapped[datetime] = mapped_column(
64 DateTime, default=func.utc_timestamp(), nullable=False
65 )
67 completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
69 # Unified logging/error message
70 log: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
72 # Flexible result data (e.g. items_processed, http_status_code)
73 result_data: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True)
75 # Parent Relationships (Exclusive)
76 webhook_id: Mapped[Optional[int]] = mapped_column(
77 Integer,
78 ForeignKey("webhook_subscriptions.id", ondelete="CASCADE"),
79 nullable=True,
80 index=True,
81 )
83 datafeed_id: Mapped[Optional[int]] = mapped_column(
84 Integer,
85 ForeignKey("datafeeds.id", ondelete="CASCADE"),
86 nullable=True,
87 index=True,
88 )
90 @property
91 def datafeed_uid(self) -> Optional[str]:
92 if self.datafeed is None:
93 return None
94 return self.datafeed.uid
96 # Trigger Context
97 trigger_event_id: Mapped[Optional[int]] = mapped_column(
98 Integer,
99 ForeignKey("audit_events.id", ondelete="SET NULL"),
100 nullable=True,
101 index=True,
102 )
104 # Relationships
105 webhook: Mapped[Optional["WebhookSubscription"]] = relationship(
106 "WebhookSubscription", back_populates="executions"
107 )
109 datafeed: Mapped[Optional["Datafeed"]] = relationship(
110 "Datafeed", back_populates="executions"
111 )
113 trigger_event: Mapped[Optional["AuditEvent"]] = relationship("AuditEvent")
115 __table_args__ = (
116 CheckConstraint(
117 "(webhook_id IS NOT NULL) OR (datafeed_id IS NOT NULL)",
118 name="check_job_has_parent",
119 ),
120 CheckConstraint(
121 "NOT ((webhook_id IS NOT NULL) AND (datafeed_id IS NOT NULL))",
122 name="check_job_exclusive_parent",
123 ),
124 )
126 @property
127 def event_id(self) -> Optional[int]:
128 """Alias for trigger_event_id to maintain compatibility with WebhookExec Pydantic model"""
129 return self.trigger_event_id
131 @property
132 def error_message(self) -> Optional[str]:
133 """Alias for log to maintain compatibility with WebhookExec Pydantic model"""
134 return self.log
136 @error_message.setter
137 def error_message(self, value: str):
138 self.log = value
140 def __repr__(self) -> str:
141 return f"<JobExecution {self.id} ({self.job_type}): {self.status.name}>"