Coverage for postrfp/model/notify.py: 81%

118 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import enum # Don't import Enum here - shadows the SQLAlchemy type 

2from datetime import datetime 

3from typing import Optional, TYPE_CHECKING 

4 

5from sqlalchemy import ( 

6 JSON, 

7 DateTime, 

8 text, 

9 Integer, 

10 ForeignKey, 

11 func, 

12 UniqueConstraint, 

13) 

14from sqlalchemy.types import INTEGER, VARCHAR, Enum, TEXT, SMALLINT 

15from sqlalchemy.orm import Mapped, mapped_column, relationship, validates 

16 

17from postrfp.model.meta import Base 

18 

19if TYPE_CHECKING: 

20 from postrfp.model.project import Project 

21 from postrfp.model.issue import Issue 

22 from postrfp.model.humans import User, Organisation 

23 from postrfp.model.audit import AuditEvent 

24 

25 

26class ProjectWatchList(Base): 

27 __tablename__ = "project_watch_list" 

28 __table_args__ = ( 

29 UniqueConstraint( 

30 "user_id", "project_id" 

31 ), # Removed name "_user_project_wl_uc" from Index 

32 ) 

33 

34 project_id: Mapped[Optional[int]] = mapped_column( 

35 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=True 

36 ) 

37 user_id: Mapped[Optional[str]] = mapped_column( 

38 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE"), nullable=True 

39 ) 

40 date_created: Mapped[Optional[datetime]] = mapped_column( 

41 DateTime, default=func.utc_timestamp(), nullable=True 

42 ) 

43 

44 project: Mapped["Project"] = relationship("Project", back_populates="watch_list") 

45 user: Mapped["User"] = relationship( 

46 "User", lazy="joined", back_populates="project_watches" 

47 ) 

48 

49 def __repr__(self): 

50 return f"<ProjectWatchList: [{self.user_id}] watching {self.project}>" 

51 

52 

53class IssueWatchList(Base): 

54 __tablename__ = "issue_watch_list" 

55 __table_args__ = ( 

56 UniqueConstraint( 

57 "user_id", "issue_id" 

58 ), # Removed name "_user_issue_wl_uc" from Index 

59 ) 

60 

61 issue_id: Mapped[int] = mapped_column( 

62 INTEGER, ForeignKey("issues.id", ondelete="CASCADE") 

63 ) 

64 user_id: Mapped[str] = mapped_column( 

65 VARCHAR(length=50), ForeignKey("users.id", ondelete="CASCADE") 

66 ) 

67 date_created: Mapped[datetime] = mapped_column( 

68 DateTime, default=func.utc_timestamp() 

69 ) 

70 

71 issue: Mapped["Issue"] = relationship( 

72 "Issue", 

73 back_populates="watch_list", 

74 ) 

75 user: Mapped["User"] = relationship( 

76 "User", 

77 lazy="joined", 

78 back_populates="issue_watches", 

79 ) 

80 

81 def __repr__(self) -> str: 

82 return "Issue WatchList: [%s] watching %s" % (self.user_id, self.issue) 

83 

84 

85class EmailNotification(Base): 

86 __tablename__ = "notification_queue" 

87 __table_args__ = ( 

88 UniqueConstraint( 

89 "user_id", "event_id" 

90 ), # Removed name "_user_event_em_uc" from Index 

91 ) 

92 

93 class Status(enum.Enum): 

94 enqueued = 1 

95 sent = 2 

96 delivered = 3 

97 opened = 4 

98 bounced = 5 

99 failed = 6 

100 

101 template_name: Mapped[str] = mapped_column( 

102 VARCHAR(length=100), nullable=False, default=text("''") 

103 ) 

104 user_id: Mapped[Optional[str]] = mapped_column( 

105 VARCHAR(length=50), 

106 ForeignKey("users.id", ondelete="SET NULL", onupdate="CASCADE"), 

107 nullable=True, 

108 ) 

109 org_id: Mapped[Optional[str]] = mapped_column( 

110 VARCHAR(length=50), 

111 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

112 nullable=True, 

113 ) 

114 event_id: Mapped[int] = mapped_column( 

115 Integer, 

116 ForeignKey("audit_events.id", ondelete="CASCADE"), 

117 nullable=False, 

118 index=True, 

119 server_default=text("'0'"), 

120 ) 

121 email: Mapped[Optional[str]] = mapped_column(VARCHAR(length=50)) 

122 updated_date: Mapped[datetime] = mapped_column( 

123 DateTime, nullable=False, default=func.utc_timestamp() 

124 ) 

125 sent_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

126 delivered_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

127 opened_date: Mapped[Optional[datetime]] = mapped_column(DateTime) 

128 error: Mapped[Optional[str]] = mapped_column(TEXT()) 

129 message_id: Mapped[Optional[str]] = mapped_column(VARCHAR(length=255), index=True) 

130 delivery_status: Mapped[Status] = mapped_column( 

131 Enum(Status), default=Status.enqueued.name, nullable=True 

132 ) 

133 

134 organisation: Mapped["Organisation"] = relationship("Organisation") 

135 user: Mapped["User"] = relationship("User") 

136 event: Mapped["AuditEvent"] = relationship( 

137 "AuditEvent", back_populates="notifications" 

138 ) 

139 

140 def __repr__(self): 

141 return ( 

142 f"<EmailNotification #{self.id} ({self.delivery_status}) " 

143 f"for {self.email} Evt {self.event_id}>" 

144 ) 

145 

146 def set_sent(self): 

147 self.delivery_status = self.Status.sent 

148 self.sent_date = datetime.now() 

149 

150 def set_delivered(self): 

151 self.delivery_status = self.Status.delivered 

152 self.delivered_date = datetime.now() 

153 # self.last_delivery = datetime.now() # Assuming this was a typo/leftover, removed 

154 # self.retries = 0 # Assuming this was a typo/leftover, removed 

155 

156 def set_bounced(self): 

157 self.delivery_status = self.Status.bounced 

158 

159 def set_failed(self): 

160 self.delivery_status = self.Status.failed 

161 

162 

163class DeliveryStatus(enum.Enum): 

164 untried = "untried" 

165 delivered = "delivered" 

166 failing = "failing" 

167 aborted = "aborted" 

168 

169 

170class WebhookSubscription(Base): 

171 """ 

172 Represents a webhook subscription for an organisation. Each subscription 

173 is for a specific event type and remote URL. The delivery status is used 

174 to track the success of the last delivery attempt. 

175 """ 

176 

177 __tablename__ = "webhook_subscriptions" 

178 MAX_ATTEMPTS = 4 

179 

180 id = None # type: ignore 

181 

182 org_id: Mapped[str] = mapped_column( 

183 VARCHAR(length=50), 

184 ForeignKey("organisations.id", ondelete="CASCADE", onupdate="CASCADE"), 

185 primary_key=True, 

186 nullable=False, 

187 ) 

188 event_type: Mapped[str] = mapped_column( 

189 VARCHAR(length=50), nullable=False, primary_key=True 

190 ) 

191 remote_url: Mapped[str] = mapped_column(VARCHAR(length=150), nullable=False) 

192 delivery_status: Mapped[DeliveryStatus] = mapped_column( 

193 Enum(DeliveryStatus), default=DeliveryStatus.untried, nullable=False 

194 ) 

195 http_headers: Mapped[Optional[list[dict[str, str]]]] = mapped_column( 

196 JSON(), 

197 nullable=True, 

198 comment="Optional HTTP headers to include in the webhook request, as a list of header/value JSON objects", 

199 ) 

200 last_delivery: Mapped[Optional[datetime]] = mapped_column(DateTime, nullable=True) 

201 

202 error_message: Mapped[Optional[str]] = mapped_column( 

203 VARCHAR(length=100), nullable=True 

204 ) 

205 retries: Mapped[int] = mapped_column(SMALLINT(), default=0, nullable=False) 

206 

207 guard_policy: Mapped[Optional[str]] = mapped_column( 

208 VARCHAR(length=4000), 

209 nullable=True, 

210 comment="Optional CEL guard policy to determine if webhook should be sent", 

211 ) 

212 transform_expression: Mapped[Optional[str]] = mapped_column( 

213 VARCHAR(length=4000), 

214 nullable=True, 

215 comment="Optional CEL expression to transform the payload before sending", 

216 ) 

217 

218 organisation: Mapped["Organisation"] = relationship( 

219 "Organisation", back_populates="webhook_subscriptions" 

220 ) 

221 

222 def __repr__(self): 

223 return f"<WebhookSubscription for {self.org_id}: {self.remote_url}>" # Removed #self.id as id is None 

224 

225 def set_delivered(self): 

226 self.delivery_status = DeliveryStatus.delivered 

227 self.last_delivery = datetime.now() 

228 self.retries = 0 

229 

230 def set_failed_attempt(self, msg: str): 

231 self.delivery_status = DeliveryStatus.failing 

232 self.error_message = msg 

233 self.retries += 1 

234 if self.retries >= self.MAX_ATTEMPTS: 

235 self.set_aborted() 

236 

237 def set_aborted(self): 

238 self.delivery_status = DeliveryStatus.aborted 

239 if not self.error_message: 

240 m = f"Given up trying after {self.MAX_ATTEMPTS} attempts" 

241 self.error_message = m 

242 

243 def reset_status(self): 

244 self.delivery_status = DeliveryStatus.untried 

245 self.retries = 0 

246 self.error_message = None 

247 

248 @property 

249 def max_tries_exceeded(self) -> bool: 

250 return self.retries >= self.MAX_ATTEMPTS 

251 

252 @property 

253 def is_aborted(self) -> bool: 

254 return self.delivery_status == DeliveryStatus.aborted 

255 

256 @validates("http_headers") 

257 def validate_headers(self, key, value): 

258 if value is not None: 

259 if not isinstance(value, list): 

260 raise ValueError( 

261 "DB Validation: http_headers must be a list of JSON objects" 

262 ) 

263 if len(value) > 5: 

264 raise ValueError( 

265 "DB Validation: http_headers cannot have more than 5 entries" 

266 ) 

267 for item in value: 

268 if not isinstance(item, dict): 

269 raise ValueError( 

270 "DB Validation: http_headers must be a list of JSON objects" 

271 ) 

272 if "header" not in item or "value" not in item: 

273 raise ValueError( 

274 "DB Validation: http_headers must contain 'header' and 'value' keys" 

275 ) 

276 if not isinstance(item["header"], str) or not isinstance( 

277 item["value"], str 

278 ): 

279 raise ValueError( 

280 "DB Validation: http_headers keys and values must be strings" 

281 ) 

282 return value