Coverage for postrfp/jobs/events/action.py: 97%

251 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from typing import Iterable, Type, Optional, Self, Any, Iterator 

2from contextlib import contextmanager 

3import logging 

4 

5import orjson 

6from sqlalchemy.orm import Session 

7from sqlalchemy.orm.exc import NoResultFound 

8from overrides import overrides, final, EnforceOverrides 

9 

10 

11from postrfp.model.humans import User 

12from postrfp.mail.schemas import ( 

13 EvtSchema, 

14 build_template_model, 

15 ProjNote, 

16 ScoreSchema, 

17 TemplateEmailModel, 

18) 

19from postrfp.mail.protocol import MailerProtocol 

20from postrfp.mail.factory import get_mailer 

21from postrfp.jobs.events.webhooks import ping_webhook 

22from postrfp.model.notify import EmailNotification 

23from postrfp.model.audit.event import AuditEvent, Status as EvtStatus 

24from postrfp.model.audit import evt_types as e 

25from postrfp.model.issue import Issue 

26from postrfp.shared.utils import json_default 

27from postrfp.shared import fetch 

28 

29 

30log = logging.getLogger(__name__) 

31 

32registry: dict[str, Type["EventProcessor"]] = {} 

33 

34 

35def debug_model(model: dict, event_id: int) -> None: 

36 if log.isEnabledFor(logging.DEBUG): 

37 mod_str = orjson.dumps(model, default=json_default, option=orjson.OPT_INDENT_2) 

38 

39 log.debug( 

40 "Data model for user %s, event %s: \n %s", 

41 model.get("To", "no email address given"), 

42 event_id, 

43 mod_str.decode("utf8"), 

44 ) 

45 

46 

47def webhook_evt_types() -> list[str]: 

48 """Get a list of event types which accept webhook subscriptions""" 

49 return [k for k, v in registry.items() if v.webhooks] 

50 

51 

52class handles: 

53 """ 

54 Decorator for associating an EventProcessor subclass with an event type. 

55 """ 

56 

57 def __init__(self, *evt_types: str): 

58 self.evt_types = evt_types 

59 

60 def __call__(self, cls: Type["EventProcessor"]) -> Type["EventProcessor"]: 

61 for evt_type in self.evt_types: 

62 if evt_type in registry: 

63 raise ValueError(f"{evt_type} is already assigned") 

64 registry[evt_type] = cls 

65 return cls 

66 

67 

68def handle_event( 

69 event: AuditEvent, session: Session, mailer: Optional[MailerProtocol] = None 

70) -> Any: 

71 """ 

72 Process email notifications, webhooks and any additional tasks for the given event. 

73 

74 Delegates to a subclass of EventProcessor 

75 """ 

76 if mailer is None: 

77 mailer = get_mailer() 

78 if event.event_type not in registry: 

79 raise ValueError(f"No Handler found for AuditEvent {event.event_type}") 

80 handler_cls = registry[event.event_type] 

81 handler = handler_cls(event, session, mailer) 

82 return handler.process() 

83 

84 

85def handler_exists_for(event: AuditEvent) -> bool: 

86 return event.event_type in registry 

87 

88 

89def run_event_handlers(session: Session) -> None: 

90 """Call handle_event for all events at status pending""" 

91 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending): 

92 handle_event(evt, session) 

93 

94 

95def logging_context(event: AuditEvent) -> Any: 

96 @contextmanager 

97 def _cm(action: str) -> Iterator[None]: 

98 try: 

99 yield 

100 log.info( 

101 "%s completed for event %s # %s", action, event.event_type, event.id 

102 ) 

103 except Exception: 

104 event.status = EvtStatus.error 

105 log.exception( 

106 "%s failure for event %s # %s", action, event.event_type, event.id 

107 ) 

108 

109 return _cm 

110 

111 

112class EventProcessor(EnforceOverrides): 

113 exclude_initiator = True 

114 issue_watchers = False 

115 project_watchers = False 

116 webhooks = False 

117 

118 def __init__( 

119 self, event: AuditEvent, session: Session, mailer: MailerProtocol 

120 ) -> None: 

121 self.session = session 

122 self.mailer = mailer 

123 self.event = event 

124 self.evt_model: Optional[EvtSchema] = None 

125 self.result = None 

126 

127 @final 

128 def process(self) -> Optional[object]: 

129 """Process an audit event through notification pipeline. 

130 

131 Executes the following steps in order: 

132 1. Generates event model from audit data 

133 2. Moderates event access control list 

134 3. Sends email notifications if configured 

135 4. Triggers webhooks if enabled 

136 5. Runs any extra actions defined by subclasses 

137 

138 All steps are wrapped in error logging contexts. Failures in 

139 individual steps are logged but don't prevent subsequent steps 

140 from executing. 

141 

142 Side Effects: 

143 - Updates event.status to EvtStatus.done on success 

144 - Updates event.status to EvtStatus.error on failure 

145 - Logs completion/failure state 

146 

147 Returns: 

148 None 

149 """ 

150 self.generate_evt_model() 

151 self.moderate_event_acl() 

152 

153 errors_logged = logging_context(self.event) 

154 

155 with errors_logged("Email notifications"): 

156 self.send_emails() 

157 

158 with errors_logged("Webhooks"): 

159 self.ping_webhooks() 

160 

161 with errors_logged("Extra actions"): 

162 self.extra() 

163 

164 if self.event.status != EvtStatus.error: 

165 self.event.status = EvtStatus.done 

166 log.info("Event processing completed for %s", self.event) 

167 else: 

168 log.warning("Event processing failed for %s", self.event) 

169 

170 return self.result 

171 

172 def generate_evt_model(self) -> None: 

173 # Extracted from __init__ method to permit easier testing 

174 self.evt_model = EvtSchema.model_validate(self.event) 

175 

176 def moderate_event_acl(self) -> None: 

177 """Allow subclass handlers to customise the EventOrgACL""" 

178 pass 

179 

180 def _iter_watchers(self) -> Iterable[User]: 

181 self.session.add(self.event) 

182 if self.issue_watchers: 

183 yield from self.event.issue.watchers 

184 if self.project_watchers: 

185 yield from self.event.project.participant_watchers 

186 

187 def recipients(self) -> Iterable[User]: 

188 """ 

189 An iterable of users watching the related Project or Issue and which belong 

190 to an organisation assigned the Event's ACL. If self.initiator is False then 

191 the event author is excluded 

192 """ 

193 acl_org_ids = {acl.org_id for acl in self.event.acl} 

194 

195 for w in self._iter_watchers(): 

196 if w.org_id in acl_org_ids: 

197 if self.exclude_initiator and w is self.event.user: 

198 continue 

199 yield w 

200 

201 def assign_notifications(self) -> list[EmailNotification]: 

202 en_list = [] 

203 for user in self.recipients(): 

204 en = EmailNotification( 

205 user_id=user.id, 

206 org_id=user.org_id, 

207 organisation=user.organisation, 

208 email=user.email, 

209 event_id=self.event.id, 

210 ) 

211 self.session.add(en) 

212 en_list.append(en) 

213 self.session.commit() 

214 return en_list 

215 

216 @final 

217 def get_model(self, user: User, email: str) -> TemplateEmailModel: 

218 assert self.evt_model is not None 

219 pydantic_model = build_template_model(self.evt_model, user, email) 

220 self.augment_model(pydantic_model.TemplateModel.event) 

221 debug_model(pydantic_model.model_dump(), self.event.id) 

222 return pydantic_model 

223 

224 def augment_model(self, model: EvtSchema) -> None: 

225 """ 

226 Subclasses can override this to add data to the 'TemplateModel' of the mail 

227 data package 

228 """ 

229 pass 

230 

231 def send_emails(self) -> None: 

232 for n in self.assign_notifications(): 

233 try: 

234 if n.email is not None: 

235 email_model = self.get_model(n.user, n.email) 

236 n.message_id = self.mailer.send_email(email_model) 

237 log.info( 

238 "Email notification sent to %s for event ID %s - %s", 

239 n.email, 

240 self.event.id, 

241 self.event.event_type, 

242 ) 

243 n.set_sent() 

244 else: 

245 log.error( 

246 "Email notification skipped - no email address provided for user" 

247 ) 

248 n.set_failed() 

249 n.message_id = "No email address" 

250 except Exception as exc: 

251 n.set_failed() 

252 n.message_id = str(exc)[:255] 

253 log.exception( 

254 "Failed to send message to %s for event #%s", n.email, self.event.id 

255 ) 

256 self.session.commit() 

257 

258 def ping_webhooks(self) -> None: 

259 if self.webhooks is False: 

260 return 

261 if self.evt_model is not None: 

262 ev_model_bytes = orjson.dumps( 

263 self.evt_model.model_dump(), default=json_default 

264 ) 

265 for wh in fetch.webhooks_for_event(self.event): 

266 ping_webhook(wh, ev_model_bytes) 

267 self.session.commit() 

268 

269 def extra(self) -> None: 

270 """Custom actions that may be defined by subclasses""" 

271 pass 

272 

273 

274@handles( 

275 e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED 

276) 

277class VendorStatusAction(EventProcessor): 

278 issue_watchers = True 

279 project_watchers = True 

280 webhooks = True 

281 

282 @overrides 

283 def extra(self) -> None: 

284 if self.event.event_type == e.ISSUE_ACCEPTED: 

285 self.assign_accepting_admin_to_watchlist() 

286 

287 def assign_accepting_admin_to_watchlist(self) -> None: 

288 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one() 

289 issue.add_watcher(self.event.user) 

290 

291 

292@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE) 

293class IssueReleased(EventProcessor): 

294 webhooks = True 

295 

296 @overrides 

297 def assign_notifications(self) -> list[EmailNotification]: 

298 issue: Issue = self.event.issue 

299 en_list = [] 

300 if issue.respondent is not None: 

301 for user in (u for u in issue.respondent.users if u.is_administrator()): 

302 en = EmailNotification( 

303 user_id=user.id, 

304 org_id=user.org_id, 

305 organisation=user.organisation, 

306 email=user.email, 

307 event_id=self.event.id, 

308 ) 

309 self.session.add(en) 

310 self.session.commit() 

311 en_list.append(en) 

312 

313 elif issue.respondent_email is not None: 

314 en = EmailNotification(email=issue.respondent_email, event_id=self.event.id) 

315 self.session.add(en) 

316 self.session.commit() 

317 en_list.append(en) 

318 

319 if not en_list: 

320 msg = ( 

321 f"Event #{self.event.id} - cannot send email notifications for Issue Released: " 

322 "respondent_email not set and no respondent user Administrators found" 

323 ) 

324 log.error(msg) 

325 return en_list 

326 

327 

328@handles(e.SECTION_ACCESS_UPDATED) 

329class SectionAccess(EventProcessor): 

330 @overrides 

331 def recipients(self) -> Iterable[User]: 

332 user_id_set = set() 

333 for pc in self.event.changes: 

334 if pc["name"] == "Granted To": 

335 user_id = pc.get("new", "").strip() 

336 if user_id in user_id_set: 

337 continue 

338 user_id_set.add(user_id) 

339 grant_user = self.session.get(User, user_id) 

340 if grant_user is not None: 

341 yield grant_user 

342 

343 

344@handles(e.PROJECT_NOTE_ADDED) 

345class ProjectNoteHandler(EventProcessor): 

346 webhooks = True 

347 

348 @overrides 

349 def moderate_event_acl(self: Self) -> None: 

350 from postrfp.model.notes import ProjectNote, Distribution 

351 from postrfp.model.project import Project 

352 

353 event = self.event 

354 note: ProjectNote = ( 

355 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one() 

356 ) 

357 project: Project = note.project 

358 

359 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO: 

360 for participant in project.participants: 

361 if participant.org_id != note.org_id: 

362 self.event.add_to_acl(participant.org_id) 

363 

364 if note.distribution == Distribution.BROADCAST_NOTICE: 

365 for issue in project.published_issues: 

366 if issue.respondent_id is not None: 

367 event.add_to_acl(issue.respondent_id) 

368 elif ( 

369 note.distribution == Distribution.TARGETED 

370 and note.target_org_id is not None 

371 ): 

372 event.add_to_acl(note.target_org_id) 

373 

374 @overrides 

375 def recipients(self) -> Iterable[User]: 

376 acl_org_ids = {a.org_id for a in self.event.acl} 

377 for user in set(self.event.project.iter_all_watchers()): 

378 if user.org_id in acl_org_ids and user.id != self.event.user_id: 

379 yield user 

380 

381 @overrides 

382 def augment_model(self, model: EvtSchema) -> None: 

383 from postrfp.model.notes import ProjectNote 

384 

385 note = self.session.get(ProjectNote, self.event.object_id) 

386 if note is not None: 

387 model.note = ProjNote.model_validate(note) 

388 

389 

390@handles(e.SCORE_COMMENT_ADDED) 

391class ScoreCommentHandler(EventProcessor): 

392 webhooks = True 

393 

394 @overrides 

395 def recipients(self) -> Iterable[User]: 

396 from postrfp.model.issue import ScoreComment, Score 

397 

398 event = self.event 

399 score_comment: ScoreComment = ( 

400 self.session.query(ScoreComment).filter_by(id=event.object_id).one() 

401 ) 

402 score: Score = score_comment.score 

403 users_to_notify = {sc.user for sc in score.comments} 

404 users_to_notify = users_to_notify.union(set(event.project.participant_watchers)) 

405 try: 

406 # Try to add the original scorer to the recipients list 

407 score_created_event = ( 

408 self.session.query(AuditEvent) 

409 .filter(AuditEvent.event_type == e.SCORE_CREATED) 

410 .filter(AuditEvent.object_id == score.id) 

411 .one() 

412 ) 

413 users_to_notify.add(score_created_event.user) 

414 except NoResultFound: 

415 # Ignore - score could be created by autoscore or imported answer 

416 pass 

417 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people 

418 # do a sanity check for all recipients 

419 parti_orgs = {p.org_id for p in score.issue.project.participants} 

420 for user in users_to_notify: 

421 if user.org_id in parti_orgs and user is not event.user: 

422 yield user 

423 

424 @overrides 

425 def augment_model(self, model: EvtSchema) -> None: 

426 from postrfp.model.issue import ScoreComment 

427 

428 sc = self.session.get(ScoreComment, self.event.object_id) 

429 if sc is not None: 

430 score = sc.score 

431 if score.score is None: 

432 sv = None 

433 else: 

434 sv = score.score 

435 

436 model.score = ScoreSchema( 

437 score_value=sv, 

438 comment=sc.comment_text, 

439 question_number=score.question.number, 

440 respondent_name=score.issue.respondent.name, 

441 issue_id=score.issue_id, 

442 ) 

443 

444 

445if __name__ == "__main__": # pragma: no cover 

446 for k, v in registry.items(): 

447 print(f"{k} : {v.__name__}") 

448 print("Can be webhooked:") 

449 for ev in webhook_evt_types(): 

450 print(ev)