Coverage for postrfp / jobs / events / action.py: 98%

280 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from typing import Iterable, Type, Optional, Self, Any, Iterator 

2from contextlib import contextmanager 

3import logging 

4 

5import orjson 

6from sqlalchemy.orm import Session 

7from sqlalchemy.orm.exc import NoResultFound 

8from overrides import overrides, final, EnforceOverrides 

9 

10 

11from postrfp.model.humans import User 

12from postrfp.mail.schemas import ( 

13 EvtSchema, 

14 build_template_model, 

15 ProjNote, 

16 ScoreSchema, 

17 TemplateEmailModel, 

18) 

19from postrfp.mail.protocol import MailerProtocol 

20from postrfp.mail.factory import get_mailer 

21from postrfp.model.notify import EmailNotification 

22from postrfp.model.audit.event import AuditEvent, Status as EvtStatus 

23from postrfp.model.audit import evt_types as e 

24from postrfp.model.issue import Issue 

25from postrfp.shared.utils import json_default 

26from postrfp.shared import fetch 

27 

28 

29log = logging.getLogger(__name__) 

30 

31registry: dict[str, Type["EventProcessor"]] = {} 

32 

33 

34def debug_model(model: dict, event_id: int) -> None: 

35 if log.isEnabledFor(logging.DEBUG): 

36 mod_str = orjson.dumps(model, default=json_default, option=orjson.OPT_INDENT_2) 

37 

38 log.debug( 

39 "Data model for user %s, event %s: \n %s", 

40 model.get("To", "no email address given"), 

41 event_id, 

42 mod_str.decode("utf8"), 

43 ) 

44 

45 

46def webhook_evt_types() -> list[str]: 

47 """Get a list of event types which accept webhook subscriptions""" 

48 return [k for k, v in registry.items() if v.webhooks] 

49 

50 

51class handles: 

52 """ 

53 Decorator for associating an EventProcessor subclass with an event type. 

54 """ 

55 

56 def __init__(self, *evt_types: str): 

57 self.evt_types = evt_types 

58 

59 def __call__(self, cls: Type["EventProcessor"]) -> Type["EventProcessor"]: 

60 for evt_type in self.evt_types: 

61 if evt_type in registry: 

62 raise ValueError(f"{evt_type} is already assigned") 

63 registry[evt_type] = cls 

64 return cls 

65 

66 

67def handle_event( 

68 event: AuditEvent, session: Session, mailer: Optional[MailerProtocol] = None 

69) -> Any: 

70 """ 

71 Process email notifications, webhooks and any additional tasks for the given event. 

72 

73 Delegates to a subclass of EventProcessor 

74 """ 

75 if mailer is None: 

76 mailer = get_mailer() 

77 if event.event_type not in registry: 

78 raise ValueError(f"No Handler found for AuditEvent {event.event_type}") 

79 handler_cls = registry[event.event_type] 

80 handler = handler_cls(event, session, mailer) 

81 return handler.process() 

82 

83 

84def handler_exists_for(event: AuditEvent) -> bool: 

85 return event.event_type in registry 

86 

87 

88def run_event_handlers(session: Session) -> None: 

89 """Call handle_event for all events at status pending""" 

90 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending): 

91 handle_event(evt, session) 

92 

93 

94def logging_context(event: AuditEvent) -> Any: 

95 @contextmanager 

96 def _cm(action: str) -> Iterator[None]: 

97 try: 

98 yield 

99 log.info( 

100 "%s completed for event %s # %s", action, event.event_type, event.id 

101 ) 

102 except Exception: 

103 event.status = EvtStatus.error 

104 log.exception( 

105 "%s failure for event %s # %s", action, event.event_type, event.id 

106 ) 

107 

108 return _cm 

109 

110 

111class EventProcessor(EnforceOverrides): 

112 exclude_initiator = True 

113 issue_watchers = False 

114 project_watchers = False 

115 webhooks = False 

116 

117 def __init__( 

118 self, event: AuditEvent, session: Session, mailer: MailerProtocol 

119 ) -> None: 

120 self.session = session 

121 self.mailer = mailer 

122 self.event = event 

123 self.evt_model: Optional[EvtSchema] = None 

124 self.result = None 

125 

126 @final 

127 def process(self) -> Optional[object]: 

128 """Process an audit event through notification pipeline. 

129 

130 Executes the following steps in order: 

131 1. Generates event model from audit data 

132 2. Moderates event access control list 

133 3. Sends email notifications if configured 

134 4. Triggers webhooks if enabled 

135 5. Runs any extra actions defined by subclasses 

136 

137 All steps are wrapped in error logging contexts. Failures in 

138 individual steps are logged but don't prevent subsequent steps 

139 from executing. 

140 

141 Side Effects: 

142 - Updates event.status to EvtStatus.done on success 

143 - Updates event.status to EvtStatus.error on failure 

144 - Logs completion/failure state 

145 

146 Returns: 

147 None 

148 """ 

149 self.generate_evt_model() 

150 self.moderate_event_acl() 

151 

152 errors_logged = logging_context(self.event) 

153 

154 with errors_logged("Email notifications"): 

155 self.send_emails() 

156 

157 with errors_logged("Webhooks"): 

158 self.ping_webhooks() 

159 

160 with errors_logged("Extra actions"): 

161 self.extra() 

162 

163 if self.event.status != EvtStatus.error: 

164 self.event.status = EvtStatus.done 

165 log.info("Event processing completed for %s", self.event) 

166 else: 

167 log.warning("Event processing failed for %s", self.event) 

168 

169 return self.result 

170 

171 def generate_evt_model(self) -> None: 

172 # Extracted from __init__ method to permit easier testing 

173 self.evt_model = EvtSchema.model_validate(self.event) 

174 

175 def moderate_event_acl(self) -> None: 

176 """Allow subclass handlers to customise the EventOrgACL""" 

177 pass 

178 

179 def _iter_watchers(self) -> Iterable[User]: 

180 self.session.add(self.event) 

181 if self.issue_watchers: 

182 yield from self.event.issue.watchers 

183 if self.project_watchers: 

184 yield from self.event.project.participant_watchers 

185 

186 def recipients(self) -> Iterable[User]: 

187 """ 

188 An iterable of users watching the related Project or Issue and which belong 

189 to an organisation assigned the Event's ACL. If self.initiator is False then 

190 the event author is excluded 

191 """ 

192 acl_org_ids = {acl.org_id for acl in self.event.acl} 

193 

194 for w in self._iter_watchers(): 

195 if w.org_id in acl_org_ids: 

196 if self.exclude_initiator and w is self.event.user: 

197 continue 

198 yield w 

199 

200 def assign_notifications(self) -> list[EmailNotification]: 

201 en_list = [] 

202 for user in self.recipients(): 

203 en = EmailNotification( 

204 user_id=user.id, 

205 org_id=user.org_id, 

206 organisation=user.organisation, 

207 email=user.email, 

208 event_id=self.event.id, 

209 ) 

210 self.session.add(en) 

211 en_list.append(en) 

212 self.session.commit() 

213 return en_list 

214 

215 @final 

216 def get_model(self, user: User, email: str) -> TemplateEmailModel: 

217 assert self.evt_model is not None 

218 pydantic_model = build_template_model(self.evt_model, user, email) 

219 self.augment_model(pydantic_model.TemplateModel.event) 

220 debug_model(pydantic_model.model_dump(), self.event.id) 

221 return pydantic_model 

222 

223 def augment_model(self, model: EvtSchema) -> None: 

224 """ 

225 Subclasses can override this to add data to the 'TemplateModel' of the mail 

226 data package 

227 """ 

228 pass 

229 

230 def send_emails(self) -> None: 

231 for n in self.assign_notifications(): 

232 try: 

233 if n.email is not None: 

234 email_model = self.get_model(n.user, n.email) 

235 n.message_id = self.mailer.send_email(email_model) 

236 log.info( 

237 "Email notification sent to %s for event ID %s - %s", 

238 n.email, 

239 self.event.id, 

240 self.event.event_type, 

241 ) 

242 n.set_sent() 

243 else: 

244 log.error( 

245 "Email notification skipped - no email address provided for user" 

246 ) 

247 n.set_failed() 

248 n.message_id = "No email address" 

249 except Exception as exc: 

250 n.set_failed() 

251 n.message_id = str(exc)[:255] 

252 log.exception( 

253 "Failed to send message to %s for event #%s", n.email, self.event.id 

254 ) 

255 self.session.commit() 

256 

257 def ping_webhooks(self) -> None: 

258 if self.webhooks is False: 

259 return 

260 if self.evt_model is None: 

261 log.warning( 

262 "Cannot ping webhooks: evt_model is None for event %s", self.event.id 

263 ) 

264 return 

265 

266 from postrfp.model.jobs import JobExecution, JobStatus 

267 from postrfp.shared.expression import evaluate_expression, transform_payload 

268 from postrfp.jobs.dagu import trigger_webhook_dag, generate_dag_name 

269 from postrfp.shared.utils import utcnow 

270 

271 # Serialize event model to dict for CEL evaluation and webhook delivery 

272 event_data = self.evt_model.model_dump() 

273 

274 for webhook in fetch.webhooks_for_event(self.event): 

275 try: 

276 # Step 1: Evaluate guard policy (if configured) 

277 if webhook.guard_policy: 

278 should_fire = evaluate_expression(webhook.guard_policy, event_data) 

279 if not should_fire: 

280 log.info( 

281 "Webhook %s guard policy blocked event %s", 

282 generate_dag_name(webhook), 

283 self.event.id, 

284 ) 

285 continue 

286 

287 # Step 2: Transform payload (if configured) 

288 payload: dict | list = event_data 

289 if webhook.transform_expression: 

290 payload = transform_payload( 

291 webhook.transform_expression, event_data 

292 ) 

293 

294 # Step 3: Serialize payload to JSON string 

295 payload_json = orjson.dumps(payload, default=json_default).decode( 

296 "utf-8" 

297 ) 

298 

299 # Step 4: Create JobExecution record 

300 execution = JobExecution( 

301 trigger_event_id=self.event.id, 

302 webhook_id=webhook.id, 

303 job_type="webhook", 

304 status=JobStatus.pending, 

305 created_at=utcnow(), 

306 ) 

307 self.session.add(execution) 

308 self.session.flush() # Get execution.id without committing 

309 

310 # Step 5: Trigger Dagu DAG 

311 try: 

312 run_id = trigger_webhook_dag(webhook, execution.id, payload_json) 

313 execution.dagu_run_id = run_id or f"unknown-{execution.id}" 

314 execution.status = JobStatus.running 

315 log.info( 

316 "Triggered webhook DAG %s for event %s, execution %s, run_id %s", 

317 generate_dag_name(webhook), 

318 self.event.id, 

319 execution.id, 

320 run_id, 

321 ) 

322 except Exception: 

323 log.exception( 

324 "Failed to trigger webhook DAG %s for event %s", 

325 generate_dag_name(webhook), 

326 self.event.id, 

327 ) 

328 execution.status = JobStatus.failed 

329 execution.dagu_run_id = f"failed-trigger-{execution.id}" 

330 # Don't re-raise - continue with other webhooks 

331 

332 except Exception: 

333 log.exception( 

334 "Error processing webhook %s for event %s", 

335 webhook.id if hasattr(webhook, "id") else "unknown", 

336 self.event.id, 

337 ) 

338 # Continue with remaining webhooks 

339 

340 # Commit all execution records 

341 self.session.commit() 

342 

343 def extra(self) -> None: 

344 """Custom actions that may be defined by subclasses""" 

345 pass 

346 

347 

348@handles( 

349 e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED 

350) 

351class VendorStatusAction(EventProcessor): 

352 issue_watchers = True 

353 project_watchers = True 

354 webhooks = True 

355 

356 @overrides 

357 def extra(self) -> None: 

358 if self.event.event_type == e.ISSUE_ACCEPTED: 

359 self.assign_accepting_admin_to_watchlist() 

360 

361 def assign_accepting_admin_to_watchlist(self) -> None: 

362 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one() 

363 issue.add_watcher(self.event.user) 

364 

365 

366@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE) 

367class IssueReleased(EventProcessor): 

368 webhooks = True 

369 

370 @overrides 

371 def assign_notifications(self) -> list[EmailNotification]: 

372 issue: Issue = self.event.issue 

373 en_list = [] 

374 if issue.respondent is not None: 

375 for user in (u for u in issue.respondent.users if u.is_administrator()): 

376 en = EmailNotification( 

377 user_id=user.id, 

378 org_id=user.org_id, 

379 organisation=user.organisation, 

380 email=user.email, 

381 event_id=self.event.id, 

382 ) 

383 self.session.add(en) 

384 self.session.commit() 

385 en_list.append(en) 

386 

387 elif issue.respondent_email is not None: 

388 en = EmailNotification(email=issue.respondent_email, event_id=self.event.id) 

389 self.session.add(en) 

390 self.session.commit() 

391 en_list.append(en) 

392 

393 if not en_list: 

394 msg = ( 

395 f"Event #{self.event.id} - cannot send email notifications for Issue Released: " 

396 "respondent_email not set and no respondent user Administrators found" 

397 ) 

398 log.error(msg) 

399 return en_list 

400 

401 

402@handles(e.SECTION_ACCESS_UPDATED) 

403class SectionAccess(EventProcessor): 

404 @overrides 

405 def recipients(self) -> Iterable[User]: 

406 user_id_set = set() 

407 for pc in self.event.changes: 

408 if pc["name"] == "Granted To": 

409 user_id = pc.get("new", "").strip() 

410 if user_id in user_id_set: 

411 continue 

412 user_id_set.add(user_id) 

413 grant_user = self.session.get(User, user_id) 

414 if grant_user is not None: 

415 yield grant_user 

416 

417 

418@handles(e.PROJECT_NOTE_ADDED) 

419class ProjectNoteHandler(EventProcessor): 

420 webhooks = True 

421 

422 @overrides 

423 def moderate_event_acl(self: Self) -> None: 

424 from postrfp.model.notes import ProjectNote, Distribution 

425 from postrfp.model.project import Project 

426 

427 event = self.event 

428 note: ProjectNote = ( 

429 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one() 

430 ) 

431 project: Project = note.project 

432 

433 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO: 

434 for participant in project.participants: 

435 if participant.org_id != note.org_id: 

436 self.event.add_to_acl(participant.org_id) 

437 

438 if note.distribution == Distribution.BROADCAST_NOTICE: 

439 for issue in project.published_issues: 

440 if issue.respondent_id is not None: 

441 event.add_to_acl(issue.respondent_id) 

442 elif ( 

443 note.distribution == Distribution.TARGETED 

444 and note.target_org_id is not None 

445 ): 

446 event.add_to_acl(note.target_org_id) 

447 

448 @overrides 

449 def recipients(self) -> Iterable[User]: 

450 acl_org_ids = {a.org_id for a in self.event.acl} 

451 for user in set(self.event.project.iter_all_watchers()): 

452 if user.org_id in acl_org_ids and user.id != self.event.user_id: 

453 yield user 

454 

455 @overrides 

456 def augment_model(self, model: EvtSchema) -> None: 

457 from postrfp.model.notes import ProjectNote 

458 

459 note = self.session.get(ProjectNote, self.event.object_id) 

460 if note is not None: 

461 model.note = ProjNote.model_validate(note) 

462 

463 

464@handles(e.SCORE_COMMENT_ADDED) 

465class ScoreCommentHandler(EventProcessor): 

466 webhooks = True 

467 

468 @overrides 

469 def recipients(self) -> Iterable[User]: 

470 from postrfp.model.issue import ScoreComment, Score 

471 

472 event = self.event 

473 score_comment: ScoreComment = ( 

474 self.session.query(ScoreComment).filter_by(id=event.object_id).one() 

475 ) 

476 score: Score = score_comment.score 

477 users_to_notify = {sc.user for sc in score.comments} 

478 users_to_notify = users_to_notify.union(set(event.project.participant_watchers)) 

479 try: 

480 # Try to add the original scorer to the recipients list 

481 score_created_event = ( 

482 self.session.query(AuditEvent) 

483 .filter(AuditEvent.event_type == e.SCORE_CREATED) 

484 .filter(AuditEvent.object_id == score.id) 

485 .one() 

486 ) 

487 users_to_notify.add(score_created_event.user) 

488 except NoResultFound: 

489 # Ignore - score could be created by autoscore or imported answer 

490 pass 

491 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people 

492 # do a sanity check for all recipients 

493 parti_orgs = {p.org_id for p in score.issue.project.participants} 

494 for user in users_to_notify: 

495 if user.org_id in parti_orgs and user is not event.user: 

496 yield user 

497 

498 @overrides 

499 def augment_model(self, model: EvtSchema) -> None: 

500 from postrfp.model.issue import ScoreComment 

501 

502 sc = self.session.get(ScoreComment, self.event.object_id) 

503 if sc is not None: 

504 score = sc.score 

505 if score.score is None: 

506 sv = None 

507 else: 

508 sv = score.score 

509 

510 model.score = ScoreSchema( 

511 score_value=sv, 

512 comment=sc.comment_text, 

513 question_number=score.question.number, 

514 respondent_name=score.issue.respondent.name, 

515 issue_id=score.issue_id, 

516 ) 

517 

518 

519__all__ = [ 

520 "EventProcessor", 

521 "handle_event", 

522 "handler_exists_for", 

523 "registry", 

524 "webhook_evt_types", 

525] 

526 

527 

528if __name__ == "__main__": # pragma: no cover 

529 for k, v in registry.items(): 

530 print(f"{k} : {v.__name__}") 

531 print("Can be webhooked:") 

532 for ev in webhook_evt_types(): 

533 print(ev)