Coverage for postrfp/model/notify.py: 81%
118 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import enum # Don't import Enum here - shadows the SQLAlchemy type
2from datetime import datetime
3from typing import Optional, TYPE_CHECKING
5from sqlalchemy import (
6 JSON,
7 DateTime,
8 text,
9 Integer,
10 ForeignKey,
11 func,
12 UniqueConstraint,
13)
14from sqlalchemy.types import INTEGER, VARCHAR, Enum, TEXT, SMALLINT
15from sqlalchemy.orm import Mapped, mapped_column, relationship, validates
17from postrfp.model.meta import Base
19if TYPE_CHECKING:
20 from postrfp.model.project import Project
21 from postrfp.model.issue import Issue
22 from postrfp.model.humans import User, Organisation
23 from postrfp.model.audit import AuditEvent
26class ProjectWatchList(Base):
27 __tablename__ = "project_watch_list"
28 __table_args__ = (
29 UniqueConstraint(
30 "user_id", "project_id"
31 ), # Removed name "_user_project_wl_uc" from Index
32 )
34 project_id: Mapped[Optional[int]] = mapped_column(
35 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=True
36 )
37 user_id: Mapped[Optional[str]] = mapped_column(
38 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE"), nullable=True
39 )
40 date_created: Mapped[Optional[datetime]] = mapped_column(
41 DateTime, default=func.utc_timestamp(), nullable=True
42 )
44 project: Mapped["Project"] = relationship("Project", back_populates="watch_list")
45 user: Mapped["User"] = relationship(
46 "User", lazy="joined", back_populates="project_watches"
47 )
49 def __repr__(self):
50 return f"<ProjectWatchList: [{self.user_id}] watching {self.project}>"
53class IssueWatchList(Base):
54 __tablename__ = "issue_watch_list"
55 __table_args__ = (
56 UniqueConstraint(
57 "user_id", "issue_id"
58 ), # Removed name "_user_issue_wl_uc" from Index
59 )
61 issue_id: Mapped[int] = mapped_column(
62 INTEGER, ForeignKey("issues.id", ondelete="CASCADE")
63 )
64 user_id: Mapped[str] = mapped_column(
65 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE")
66 )
67 date_created: Mapped[datetime] = mapped_column(
68 DateTime, default=func.utc_timestamp()
69 )
71 issue: Mapped["Issue"] = relationship(
72 "Issue",
73 back_populates="watch_list",
74 )
75 user: Mapped["User"] = relationship(
76 "User",
77 lazy="joined",
78 back_populates="issue_watches",
79 )
81 def __repr__(self) -> str:
82 return "Issue WatchList: [%s] watching %s" % (self.user_id, self.issue)
85class EmailNotification(Base):
86 __tablename__ = "notification_queue"
87 __table_args__ = (
88 UniqueConstraint(
89 "user_id", "event_id"
90 ), # Removed name "_user_event_em_uc" from Index
91 )
93 class Status(enum.Enum):
94 enqueued = 1
95 sent = 2
96 delivered = 3
97 opened = 4
98 bounced = 5
99 failed = 6
101 template_name: Mapped[str] = mapped_column(
102 VARCHAR(length=100), nullable=False, default=text("''")
103 )
104 user_id: Mapped[Optional[str]] = mapped_column(
105 VARCHAR(length=50),
106 ForeignKey("users.id", ondelete="SET NULL", onupdate="CASCADE"),
107 nullable=True,
108 )
109 org_id: Mapped[Optional[str]] = mapped_column(
110 VARCHAR(length=50),
111 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
112 nullable=True,
113 )
114 event_id: Mapped[int] = mapped_column(
115 Integer,
116 ForeignKey("audit_events.id", ondelete="CASCADE"),
117 nullable=False,
118 index=True,
119 server_default=text("'0'"),
120 )
121 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50))
122 updated_date: Mapped[datetime] = mapped_column(
123 DateTime, nullable=False, default=func.utc_timestamp()
124 )
125 sent_date: Mapped[Optional[datetime]] = mapped_column(DateTime)
126 delivered_date: Mapped[Optional[datetime]] = mapped_column(DateTime)
127 opened_date: Mapped[Optional[datetime]] = mapped_column(DateTime)
128 error: Mapped[Optional[str]] = mapped_column(TEXT())
129 message_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), index=True)
130 delivery_status: Mapped[Status] = mapped_column(
131 Enum(Status), default=Status.enqueued.name, nullable=True
132 )
134 organisation: Mapped["Organisation"] = relationship("Organisation")
135 user: Mapped["User"] = relationship("User")
136 event: Mapped["AuditEvent"] = relationship(
137 "AuditEvent", back_populates="notifications"
138 )
140 def __repr__(self):
141 return (
142 f"<EmailNotification #{self.id} ({self.delivery_status}) "
143 f"for {self.email} Evt {self.event_id}>"
144 )
146 def set_sent(self):
147 self.delivery_status = self.Status.sent
148 self.sent_date = datetime.now()
150 def set_delivered(self):
151 self.delivery_status = self.Status.delivered
152 self.delivered_date = datetime.now()
153 # self.last_delivery = datetime.now() # Assuming this was a typo/leftover, removed
154 # self.retries = 0 # Assuming this was a typo/leftover, removed
156 def set_bounced(self):
157 self.delivery_status = self.Status.bounced
159 def set_failed(self):
160 self.delivery_status = self.Status.failed
163class DeliveryStatus(enum.Enum):
164 untried = "untried"
165 delivered = "delivered"
166 failing = "failing"
167 aborted = "aborted"
170class WebhookSubscription(Base):
171 """
172 Represents a webhook subscription for an organisation. Each subscription
173 is for a specific event type and remote URL. The delivery status is used
174 to track the success of the last delivery attempt.
175 """
177 __tablename__ = "webhook_subscriptions"
178 MAX_ATTEMPTS = 4
180 id = None # type: ignore
182 org_id: Mapped[str] = mapped_column(
183 VARCHAR(length=50),
184 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"),
185 primary_key=True,
186 nullable=False,
187 )
188 event_type: Mapped[str] = mapped_column(
189 VARCHAR(length=50), nullable=False, primary_key=True
190 )
191 remote_url: Mapped[str] = mapped_column(VARCHAR(length=150), nullable=False)
192 delivery_status: Mapped[DeliveryStatus] = mapped_column(
193 Enum(DeliveryStatus), default=DeliveryStatus.untried, nullable=False
194 )
195 http_headers: Mapped[Optional[list[dict[str, str]]]] = mapped_column(
196 JSON(),
197 nullable=True,
198 comment="Optional HTTP headers to include in the webhook request, as a list of header/value JSON objects",
199 )
200 last_delivery: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True)
202 error_message: Mapped[Optional[str]] = mapped_column(
203 VARCHAR(length=100), nullable=True
204 )
205 retries: Mapped[int] = mapped_column(SMALLINT(), default=0, nullable=False)
207 guard_policy: Mapped[Optional[str]] = mapped_column(
208 VARCHAR(length=4000),
209 nullable=True,
210 comment="Optional CEL guard policy to determine if webhook should be sent",
211 )
212 transform_expression: Mapped[Optional[str]] = mapped_column(
213 VARCHAR(length=4000),
214 nullable=True,
215 comment="Optional CEL expression to transform the payload before sending",
216 )
218 organisation: Mapped["Organisation"] = relationship(
219 "Organisation", back_populates="webhook_subscriptions"
220 )
222 def __repr__(self):
223 return f"<WebhookSubscription for {self.org_id}: {self.remote_url}>" # Removed #self.id as id is None
225 def set_delivered(self):
226 self.delivery_status = DeliveryStatus.delivered
227 self.last_delivery = datetime.now()
228 self.retries = 0
230 def set_failed_attempt(self, msg: str):
231 self.delivery_status = DeliveryStatus.failing
232 self.error_message = msg
233 self.retries += 1
234 if self.retries >= self.MAX_ATTEMPTS:
235 self.set_aborted()
237 def set_aborted(self):
238 self.delivery_status = DeliveryStatus.aborted
239 if not self.error_message:
240 m = f"Given up trying after {self.MAX_ATTEMPTS} attempts"
241 self.error_message = m
243 def reset_status(self):
244 self.delivery_status = DeliveryStatus.untried
245 self.retries = 0
246 self.error_message = None
248 @property
249 def max_tries_exceeded(self) -> bool:
250 return self.retries >= self.MAX_ATTEMPTS
252 @property
253 def is_aborted(self) -> bool:
254 return self.delivery_status == DeliveryStatus.aborted
256 @validates("http_headers")
257 def validate_headers(self, key, value):
258 if value is not None:
259 if not isinstance(value, list):
260 raise ValueError(
261 "DB Validation: http_headers must be a list of JSON objects"
262 )
263 if len(value) > 5:
264 raise ValueError(
265 "DB Validation: http_headers cannot have more than 5 entries"
266 )
267 for item in value:
268 if not isinstance(item, dict):
269 raise ValueError(
270 "DB Validation: http_headers must be a list of JSON objects"
271 )
272 if "header" not in item or "value" not in item:
273 raise ValueError(
274 "DB Validation: http_headers must contain 'header' and 'value' keys"
275 )
276 if not isinstance(item["header"], str) or not isinstance(
277 item["value"], str
278 ):
279 raise ValueError(
280 "DB Validation: http_headers keys and values must be strings"
281 )
282 return value