Coverage for postrfp / model / jobs.py: 96%

47 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1import enum 

2from datetime import datetime 

3from typing import Optional, TYPE_CHECKING, Any 

4 

5import sqids 

6from sqlalchemy import ( 

7 Integer, 

8 String, 

9 DateTime, 

10 ForeignKey, 

11 Text, 

12 JSON, 

13 Enum as SqlaEnum, 

14 CheckConstraint, 

15 func, 

16) 

17from sqlalchemy.orm import Mapped, mapped_column, relationship 

18 

19from postrfp.model.meta import Base, SqidsMixin 

20 

21if TYPE_CHECKING: 

22 from postrfp.model.audit import AuditEvent 

23 from postrfp.model.notify import WebhookSubscription 

24 from postrfp.model.datafeeds import Datafeed 

25 

26 

27SQID = sqids.Sqids( 

28 alphabet="fueamhq89463w7vx5n2dgryzctjskpb", 

29 min_length=5, 

30) 

31 

32 

33class JobStatus(enum.Enum): 

34 pending = "pending" 

35 running = "running" 

36 success = "success" 

37 failed = "failed" 

38 partial = "partial" 

39 aborted = "aborted" 

40 

41 

42class JobExecution(Base, SqidsMixin): 

43 """ 

44 Unified model for tracking asynchronous job executions (Webhooks, Datafeeds, etc.) 

45 """ 

46 

47 __tablename__ = "job_executions" 

48 _sqids_alphabet = "fueamhq89463w7vx5n2dgryzctjskpb" 

49 

50 id: Mapped[int] = mapped_column(Integer, primary_key=True) 

51 

52 # Discriminator / Type 

53 job_type: Mapped[str] = mapped_column(String(20), nullable=False, index=True) 

54 

55 status: Mapped[JobStatus] = mapped_column( 

56 SqlaEnum(JobStatus), default=JobStatus.pending, nullable=False, index=True 

57 ) 

58 

59 dagu_run_id: Mapped[Optional[str]] = mapped_column( 

60 String(100), nullable=True, unique=True, index=True 

61 ) 

62 

63 created_at: Mapped[datetime] = mapped_column( 

64 DateTime, default=func.utc_timestamp(), nullable=False 

65 ) 

66 

67 completed_at: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

68 

69 # Unified logging/error message 

70 log: Mapped[Optional[str]] = mapped_column(Text, nullable=True) 

71 

72 # Flexible result data (e.g. items_processed, http_status_code) 

73 result_data: Mapped[Optional[dict[str, Any]]] = mapped_column(JSON, nullable=True) 

74 

75 # Parent Relationships (Exclusive) 

76 webhook_id: Mapped[Optional[int]] = mapped_column( 

77 Integer, 

78 ForeignKey("webhook_subscriptions.id", ondelete="CASCADE"), 

79 nullable=True, 

80 index=True, 

81 ) 

82 

83 datafeed_id: Mapped[Optional[int]] = mapped_column( 

84 Integer, 

85 ForeignKey("datafeeds.id", ondelete="CASCADE"), 

86 nullable=True, 

87 index=True, 

88 ) 

89 

90 @property 

91 def datafeed_uid(self) -> Optional[str]: 

92 if self.datafeed is None: 

93 return None 

94 return self.datafeed.uid 

95 

96 # Trigger Context 

97 trigger_event_id: Mapped[Optional[int]] = mapped_column( 

98 Integer, 

99 ForeignKey("audit_events.id", ondelete="SET NULL"), 

100 nullable=True, 

101 index=True, 

102 ) 

103 

104 # Relationships 

105 webhook: Mapped[Optional["WebhookSubscription"]] = relationship( 

106 "WebhookSubscription", back_populates="executions" 

107 ) 

108 

109 datafeed: Mapped[Optional["Datafeed"]] = relationship( 

110 "Datafeed", back_populates="executions" 

111 ) 

112 

113 trigger_event: Mapped[Optional["AuditEvent"]] = relationship("AuditEvent") 

114 

115 __table_args__ = ( 

116 CheckConstraint( 

117 "(webhook_id IS NOT NULL) OR (datafeed_id IS NOT NULL)", 

118 name="check_job_has_parent", 

119 ), 

120 CheckConstraint( 

121 "NOT ((webhook_id IS NOT NULL) AND (datafeed_id IS NOT NULL))", 

122 name="check_job_exclusive_parent", 

123 ), 

124 ) 

125 

126 @property 

127 def event_id(self) -> Optional[int]: 

128 """Alias for trigger_event_id to maintain compatibility with WebhookExec Pydantic model""" 

129 return self.trigger_event_id 

130 

131 @property 

132 def error_message(self) -> Optional[str]: 

133 """Alias for log to maintain compatibility with WebhookExec Pydantic model""" 

134 return self.log 

135 

136 @error_message.setter 

137 def error_message(self, value: str): 

138 self.log = value 

139 

140 def __repr__(self) -> str: 

141 return f"<JobExecution {self.id} ({self.job_type}): {self.status.name}>"