Coverage for postrfp / jobs / events / action.py: 98%
280 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1from typing import Iterable, Type, Optional, Self, Any, Iterator
2from contextlib import contextmanager
3import logging
5import orjson
6from sqlalchemy.orm import Session
7from sqlalchemy.orm.exc import NoResultFound
8from overrides import overrides, final, EnforceOverrides
11from postrfp.model.humans import User
12from postrfp.mail.schemas import (
13 EvtSchema,
14 build_template_model,
15 ProjNote,
16 ScoreSchema,
17 TemplateEmailModel,
18)
19from postrfp.mail.protocol import MailerProtocol
20from postrfp.mail.factory import get_mailer
21from postrfp.model.notify import EmailNotification
22from postrfp.model.audit.event import AuditEvent, Status as EvtStatus
23from postrfp.model.audit import evt_types as e
24from postrfp.model.issue import Issue
25from postrfp.shared.utils import json_default
26from postrfp.shared import fetch
29log = logging.getLogger(__name__)
31registry: dict[str, Type["EventProcessor"]] = {}
34def debug_model(model: dict, event_id: int) -> None:
35 if log.isEnabledFor(logging.DEBUG):
36 mod_str = orjson.dumps(model, default=json_default, option=orjson.OPT_INDENT_2)
38 log.debug(
39 "Data model for user %s, event %s: \n %s",
40 model.get("To", "no email address given"),
41 event_id,
42 mod_str.decode("utf8"),
43 )
46def webhook_evt_types() -> list[str]:
47 """Get a list of event types which accept webhook subscriptions"""
48 return [k for k, v in registry.items() if v.webhooks]
51class handles:
52 """
53 Decorator for associating an EventProcessor subclass with an event type.
54 """
56 def __init__(self, *evt_types: str):
57 self.evt_types = evt_types
59 def __call__(self, cls: Type["EventProcessor"]) -> Type["EventProcessor"]:
60 for evt_type in self.evt_types:
61 if evt_type in registry:
62 raise ValueError(f"{evt_type} is already assigned")
63 registry[evt_type] = cls
64 return cls
67def handle_event(
68 event: AuditEvent, session: Session, mailer: Optional[MailerProtocol] = None
69) -> Any:
70 """
71 Process email notifications, webhooks and any additional tasks for the given event.
73 Delegates to a subclass of EventProcessor
74 """
75 if mailer is None:
76 mailer = get_mailer()
77 if event.event_type not in registry:
78 raise ValueError(f"No Handler found for AuditEvent {event.event_type}")
79 handler_cls = registry[event.event_type]
80 handler = handler_cls(event, session, mailer)
81 return handler.process()
84def handler_exists_for(event: AuditEvent) -> bool:
85 return event.event_type in registry
88def run_event_handlers(session: Session) -> None:
89 """Call handle_event for all events at status pending"""
90 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending):
91 handle_event(evt, session)
94def logging_context(event: AuditEvent) -> Any:
95 @contextmanager
96 def _cm(action: str) -> Iterator[None]:
97 try:
98 yield
99 log.info(
100 "%s completed for event %s # %s", action, event.event_type, event.id
101 )
102 except Exception:
103 event.status = EvtStatus.error
104 log.exception(
105 "%s failure for event %s # %s", action, event.event_type, event.id
106 )
108 return _cm
111class EventProcessor(EnforceOverrides):
112 exclude_initiator = True
113 issue_watchers = False
114 project_watchers = False
115 webhooks = False
117 def __init__(
118 self, event: AuditEvent, session: Session, mailer: MailerProtocol
119 ) -> None:
120 self.session = session
121 self.mailer = mailer
122 self.event = event
123 self.evt_model: Optional[EvtSchema] = None
124 self.result = None
126 @final
127 def process(self) -> Optional[object]:
128 """Process an audit event through notification pipeline.
130 Executes the following steps in order:
131 1. Generates event model from audit data
132 2. Moderates event access control list
133 3. Sends email notifications if configured
134 4. Triggers webhooks if enabled
135 5. Runs any extra actions defined by subclasses
137 All steps are wrapped in error logging contexts. Failures in
138 individual steps are logged but don't prevent subsequent steps
139 from executing.
141 Side Effects:
142 - Updates event.status to EvtStatus.done on success
143 - Updates event.status to EvtStatus.error on failure
144 - Logs completion/failure state
146 Returns:
147 None
148 """
149 self.generate_evt_model()
150 self.moderate_event_acl()
152 errors_logged = logging_context(self.event)
154 with errors_logged("Email notifications"):
155 self.send_emails()
157 with errors_logged("Webhooks"):
158 self.ping_webhooks()
160 with errors_logged("Extra actions"):
161 self.extra()
163 if self.event.status != EvtStatus.error:
164 self.event.status = EvtStatus.done
165 log.info("Event processing completed for %s", self.event)
166 else:
167 log.warning("Event processing failed for %s", self.event)
169 return self.result
171 def generate_evt_model(self) -> None:
172 # Extracted from __init__ method to permit easier testing
173 self.evt_model = EvtSchema.model_validate(self.event)
175 def moderate_event_acl(self) -> None:
176 """Allow subclass handlers to customise the EventOrgACL"""
177 pass
179 def _iter_watchers(self) -> Iterable[User]:
180 self.session.add(self.event)
181 if self.issue_watchers:
182 yield from self.event.issue.watchers
183 if self.project_watchers:
184 yield from self.event.project.participant_watchers
186 def recipients(self) -> Iterable[User]:
187 """
188 An iterable of users watching the related Project or Issue and which belong
189 to an organisation assigned the Event's ACL. If self.initiator is False then
190 the event author is excluded
191 """
192 acl_org_ids = {acl.org_id for acl in self.event.acl}
194 for w in self._iter_watchers():
195 if w.org_id in acl_org_ids:
196 if self.exclude_initiator and w is self.event.user:
197 continue
198 yield w
200 def assign_notifications(self) -> list[EmailNotification]:
201 en_list = []
202 for user in self.recipients():
203 en = EmailNotification(
204 user_id=user.id,
205 org_id=user.org_id,
206 organisation=user.organisation,
207 email=user.email,
208 event_id=self.event.id,
209 )
210 self.session.add(en)
211 en_list.append(en)
212 self.session.commit()
213 return en_list
215 @final
216 def get_model(self, user: User, email: str) -> TemplateEmailModel:
217 assert self.evt_model is not None
218 pydantic_model = build_template_model(self.evt_model, user, email)
219 self.augment_model(pydantic_model.TemplateModel.event)
220 debug_model(pydantic_model.model_dump(), self.event.id)
221 return pydantic_model
223 def augment_model(self, model: EvtSchema) -> None:
224 """
225 Subclasses can override this to add data to the 'TemplateModel' of the mail
226 data package
227 """
228 pass
230 def send_emails(self) -> None:
231 for n in self.assign_notifications():
232 try:
233 if n.email is not None:
234 email_model = self.get_model(n.user, n.email)
235 n.message_id = self.mailer.send_email(email_model)
236 log.info(
237 "Email notification sent to %s for event ID %s - %s",
238 n.email,
239 self.event.id,
240 self.event.event_type,
241 )
242 n.set_sent()
243 else:
244 log.error(
245 "Email notification skipped - no email address provided for user"
246 )
247 n.set_failed()
248 n.message_id = "No email address"
249 except Exception as exc:
250 n.set_failed()
251 n.message_id = str(exc)[:255]
252 log.exception(
253 "Failed to send message to %s for event #%s", n.email, self.event.id
254 )
255 self.session.commit()
257 def ping_webhooks(self) -> None:
258 if self.webhooks is False:
259 return
260 if self.evt_model is None:
261 log.warning(
262 "Cannot ping webhooks: evt_model is None for event %s", self.event.id
263 )
264 return
266 from postrfp.model.jobs import JobExecution, JobStatus
267 from postrfp.shared.expression import evaluate_expression, transform_payload
268 from postrfp.jobs.dagu import trigger_webhook_dag, generate_dag_name
269 from postrfp.shared.utils import utcnow
271 # Serialize event model to dict for CEL evaluation and webhook delivery
272 event_data = self.evt_model.model_dump()
274 for webhook in fetch.webhooks_for_event(self.event):
275 try:
276 # Step 1: Evaluate guard policy (if configured)
277 if webhook.guard_policy:
278 should_fire = evaluate_expression(webhook.guard_policy, event_data)
279 if not should_fire:
280 log.info(
281 "Webhook %s guard policy blocked event %s",
282 generate_dag_name(webhook),
283 self.event.id,
284 )
285 continue
287 # Step 2: Transform payload (if configured)
288 payload: dict | list = event_data
289 if webhook.transform_expression:
290 payload = transform_payload(
291 webhook.transform_expression, event_data
292 )
294 # Step 3: Serialize payload to JSON string
295 payload_json = orjson.dumps(payload, default=json_default).decode(
296 "utf-8"
297 )
299 # Step 4: Create JobExecution record
300 execution = JobExecution(
301 trigger_event_id=self.event.id,
302 webhook_id=webhook.id,
303 job_type="webhook",
304 status=JobStatus.pending,
305 created_at=utcnow(),
306 )
307 self.session.add(execution)
308 self.session.flush() # Get execution.id without committing
310 # Step 5: Trigger Dagu DAG
311 try:
312 run_id = trigger_webhook_dag(webhook, execution.id, payload_json)
313 execution.dagu_run_id = run_id or f"unknown-{execution.id}"
314 execution.status = JobStatus.running
315 log.info(
316 "Triggered webhook DAG %s for event %s, execution %s, run_id %s",
317 generate_dag_name(webhook),
318 self.event.id,
319 execution.id,
320 run_id,
321 )
322 except Exception:
323 log.exception(
324 "Failed to trigger webhook DAG %s for event %s",
325 generate_dag_name(webhook),
326 self.event.id,
327 )
328 execution.status = JobStatus.failed
329 execution.dagu_run_id = f"failed-trigger-{execution.id}"
330 # Don't re-raise - continue with other webhooks
332 except Exception:
333 log.exception(
334 "Error processing webhook %s for event %s",
335 webhook.id if hasattr(webhook, "id") else "unknown",
336 self.event.id,
337 )
338 # Continue with remaining webhooks
340 # Commit all execution records
341 self.session.commit()
343 def extra(self) -> None:
344 """Custom actions that may be defined by subclasses"""
345 pass
348@handles(
349 e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED
350)
351class VendorStatusAction(EventProcessor):
352 issue_watchers = True
353 project_watchers = True
354 webhooks = True
356 @overrides
357 def extra(self) -> None:
358 if self.event.event_type == e.ISSUE_ACCEPTED:
359 self.assign_accepting_admin_to_watchlist()
361 def assign_accepting_admin_to_watchlist(self) -> None:
362 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one()
363 issue.add_watcher(self.event.user)
366@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE)
367class IssueReleased(EventProcessor):
368 webhooks = True
370 @overrides
371 def assign_notifications(self) -> list[EmailNotification]:
372 issue: Issue = self.event.issue
373 en_list = []
374 if issue.respondent is not None:
375 for user in (u for u in issue.respondent.users if u.is_administrator()):
376 en = EmailNotification(
377 user_id=user.id,
378 org_id=user.org_id,
379 organisation=user.organisation,
380 email=user.email,
381 event_id=self.event.id,
382 )
383 self.session.add(en)
384 self.session.commit()
385 en_list.append(en)
387 elif issue.respondent_email is not None:
388 en = EmailNotification(email=issue.respondent_email, event_id=self.event.id)
389 self.session.add(en)
390 self.session.commit()
391 en_list.append(en)
393 if not en_list:
394 msg = (
395 f"Event #{self.event.id} - cannot send email notifications for Issue Released: "
396 "respondent_email not set and no respondent user Administrators found"
397 )
398 log.error(msg)
399 return en_list
402@handles(e.SECTION_ACCESS_UPDATED)
403class SectionAccess(EventProcessor):
404 @overrides
405 def recipients(self) -> Iterable[User]:
406 user_id_set = set()
407 for pc in self.event.changes:
408 if pc["name"] == "Granted To":
409 user_id = pc.get("new", "").strip()
410 if user_id in user_id_set:
411 continue
412 user_id_set.add(user_id)
413 grant_user = self.session.get(User, user_id)
414 if grant_user is not None:
415 yield grant_user
418@handles(e.PROJECT_NOTE_ADDED)
419class ProjectNoteHandler(EventProcessor):
420 webhooks = True
422 @overrides
423 def moderate_event_acl(self: Self) -> None:
424 from postrfp.model.notes import ProjectNote, Distribution
425 from postrfp.model.project import Project
427 event = self.event
428 note: ProjectNote = (
429 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one()
430 )
431 project: Project = note.project
433 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO:
434 for participant in project.participants:
435 if participant.org_id != note.org_id:
436 self.event.add_to_acl(participant.org_id)
438 if note.distribution == Distribution.BROADCAST_NOTICE:
439 for issue in project.published_issues:
440 if issue.respondent_id is not None:
441 event.add_to_acl(issue.respondent_id)
442 elif (
443 note.distribution == Distribution.TARGETED
444 and note.target_org_id is not None
445 ):
446 event.add_to_acl(note.target_org_id)
448 @overrides
449 def recipients(self) -> Iterable[User]:
450 acl_org_ids = {a.org_id for a in self.event.acl}
451 for user in set(self.event.project.iter_all_watchers()):
452 if user.org_id in acl_org_ids and user.id != self.event.user_id:
453 yield user
455 @overrides
456 def augment_model(self, model: EvtSchema) -> None:
457 from postrfp.model.notes import ProjectNote
459 note = self.session.get(ProjectNote, self.event.object_id)
460 if note is not None:
461 model.note = ProjNote.model_validate(note)
464@handles(e.SCORE_COMMENT_ADDED)
465class ScoreCommentHandler(EventProcessor):
466 webhooks = True
468 @overrides
469 def recipients(self) -> Iterable[User]:
470 from postrfp.model.issue import ScoreComment, Score
472 event = self.event
473 score_comment: ScoreComment = (
474 self.session.query(ScoreComment).filter_by(id=event.object_id).one()
475 )
476 score: Score = score_comment.score
477 users_to_notify = {sc.user for sc in score.comments}
478 users_to_notify = users_to_notify.union(set(event.project.participant_watchers))
479 try:
480 # Try to add the original scorer to the recipients list
481 score_created_event = (
482 self.session.query(AuditEvent)
483 .filter(AuditEvent.event_type == e.SCORE_CREATED)
484 .filter(AuditEvent.object_id == score.id)
485 .one()
486 )
487 users_to_notify.add(score_created_event.user)
488 except NoResultFound:
489 # Ignore - score could be created by autoscore or imported answer
490 pass
491 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people
492 # do a sanity check for all recipients
493 parti_orgs = {p.org_id for p in score.issue.project.participants}
494 for user in users_to_notify:
495 if user.org_id in parti_orgs and user is not event.user:
496 yield user
498 @overrides
499 def augment_model(self, model: EvtSchema) -> None:
500 from postrfp.model.issue import ScoreComment
502 sc = self.session.get(ScoreComment, self.event.object_id)
503 if sc is not None:
504 score = sc.score
505 if score.score is None:
506 sv = None
507 else:
508 sv = score.score
510 model.score = ScoreSchema(
511 score_value=sv,
512 comment=sc.comment_text,
513 question_number=score.question.number,
514 respondent_name=score.issue.respondent.name,
515 issue_id=score.issue_id,
516 )
519__all__ = [
520 "EventProcessor",
521 "handle_event",
522 "handler_exists_for",
523 "registry",
524 "webhook_evt_types",
525]
528if __name__ == "__main__": # pragma: no cover
529 for k, v in registry.items():
530 print(f"{k} : {v.__name__}")
531 print("Can be webhooked:")
532 for ev in webhook_evt_types():
533 print(ev)