Coverage for postrfp/jobs/events/action.py: 97%
251 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from typing import Iterable, Type, Optional, Self, Any, Iterator
2from contextlib import contextmanager
3import logging
5import orjson
6from sqlalchemy.orm import Session
7from sqlalchemy.orm.exc import NoResultFound
8from overrides import overrides, final, EnforceOverrides
11from postrfp.model.humans import User
12from postrfp.mail.schemas import (
13 EvtSchema,
14 build_template_model,
15 ProjNote,
16 ScoreSchema,
17 TemplateEmailModel,
18)
19from postrfp.mail.protocol import MailerProtocol
20from postrfp.mail.factory import get_mailer
21from postrfp.jobs.events.webhooks import ping_webhook
22from postrfp.model.notify import EmailNotification
23from postrfp.model.audit.event import AuditEvent, Status as EvtStatus
24from postrfp.model.audit import evt_types as e
25from postrfp.model.issue import Issue
26from postrfp.shared.utils import json_default
27from postrfp.shared import fetch
30log = logging.getLogger(__name__)
32registry: dict[str, Type["EventProcessor"]] = {}
35def debug_model(model: dict, event_id: int) -> None:
36 if log.isEnabledFor(logging.DEBUG):
37 mod_str = orjson.dumps(model, default=json_default, option=orjson.OPT_INDENT_2)
39 log.debug(
40 "Data model for user %s, event %s: \n %s",
41 model.get("To", "no email address given"),
42 event_id,
43 mod_str.decode("utf8"),
44 )
47def webhook_evt_types() -> list[str]:
48 """Get a list of event types which accept webhook subscriptions"""
49 return [k for k, v in registry.items() if v.webhooks]
52class handles:
53 """
54 Decorator for associating an EventProcessor subclass with an event type.
55 """
57 def __init__(self, *evt_types: str):
58 self.evt_types = evt_types
60 def __call__(self, cls: Type["EventProcessor"]) -> Type["EventProcessor"]:
61 for evt_type in self.evt_types:
62 if evt_type in registry:
63 raise ValueError(f"{evt_type} is already assigned")
64 registry[evt_type] = cls
65 return cls
68def handle_event(
69 event: AuditEvent, session: Session, mailer: Optional[MailerProtocol] = None
70) -> Any:
71 """
72 Process email notifications, webhooks and any additional tasks for the given event.
74 Delegates to a subclass of EventProcessor
75 """
76 if mailer is None:
77 mailer = get_mailer()
78 if event.event_type not in registry:
79 raise ValueError(f"No Handler found for AuditEvent {event.event_type}")
80 handler_cls = registry[event.event_type]
81 handler = handler_cls(event, session, mailer)
82 return handler.process()
85def handler_exists_for(event: AuditEvent) -> bool:
86 return event.event_type in registry
89def run_event_handlers(session: Session) -> None:
90 """Call handle_event for all events at status pending"""
91 for evt in session.query(AuditEvent).filter(AuditEvent.status == EvtStatus.pending):
92 handle_event(evt, session)
95def logging_context(event: AuditEvent) -> Any:
96 @contextmanager
97 def _cm(action: str) -> Iterator[None]:
98 try:
99 yield
100 log.info(
101 "%s completed for event %s # %s", action, event.event_type, event.id
102 )
103 except Exception:
104 event.status = EvtStatus.error
105 log.exception(
106 "%s failure for event %s # %s", action, event.event_type, event.id
107 )
109 return _cm
112class EventProcessor(EnforceOverrides):
113 exclude_initiator = True
114 issue_watchers = False
115 project_watchers = False
116 webhooks = False
118 def __init__(
119 self, event: AuditEvent, session: Session, mailer: MailerProtocol
120 ) -> None:
121 self.session = session
122 self.mailer = mailer
123 self.event = event
124 self.evt_model: Optional[EvtSchema] = None
125 self.result = None
127 @final
128 def process(self) -> Optional[object]:
129 """Process an audit event through notification pipeline.
131 Executes the following steps in order:
132 1. Generates event model from audit data
133 2. Moderates event access control list
134 3. Sends email notifications if configured
135 4. Triggers webhooks if enabled
136 5. Runs any extra actions defined by subclasses
138 All steps are wrapped in error logging contexts. Failures in
139 individual steps are logged but don't prevent subsequent steps
140 from executing.
142 Side Effects:
143 - Updates event.status to EvtStatus.done on success
144 - Updates event.status to EvtStatus.error on failure
145 - Logs completion/failure state
147 Returns:
148 None
149 """
150 self.generate_evt_model()
151 self.moderate_event_acl()
153 errors_logged = logging_context(self.event)
155 with errors_logged("Email notifications"):
156 self.send_emails()
158 with errors_logged("Webhooks"):
159 self.ping_webhooks()
161 with errors_logged("Extra actions"):
162 self.extra()
164 if self.event.status != EvtStatus.error:
165 self.event.status = EvtStatus.done
166 log.info("Event processing completed for %s", self.event)
167 else:
168 log.warning("Event processing failed for %s", self.event)
170 return self.result
172 def generate_evt_model(self) -> None:
173 # Extracted from __init__ method to permit easier testing
174 self.evt_model = EvtSchema.model_validate(self.event)
176 def moderate_event_acl(self) -> None:
177 """Allow subclass handlers to customise the EventOrgACL"""
178 pass
180 def _iter_watchers(self) -> Iterable[User]:
181 self.session.add(self.event)
182 if self.issue_watchers:
183 yield from self.event.issue.watchers
184 if self.project_watchers:
185 yield from self.event.project.participant_watchers
187 def recipients(self) -> Iterable[User]:
188 """
189 An iterable of users watching the related Project or Issue and which belong
190 to an organisation assigned the Event's ACL. If self.initiator is False then
191 the event author is excluded
192 """
193 acl_org_ids = {acl.org_id for acl in self.event.acl}
195 for w in self._iter_watchers():
196 if w.org_id in acl_org_ids:
197 if self.exclude_initiator and w is self.event.user:
198 continue
199 yield w
201 def assign_notifications(self) -> list[EmailNotification]:
202 en_list = []
203 for user in self.recipients():
204 en = EmailNotification(
205 user_id=user.id,
206 org_id=user.org_id,
207 organisation=user.organisation,
208 email=user.email,
209 event_id=self.event.id,
210 )
211 self.session.add(en)
212 en_list.append(en)
213 self.session.commit()
214 return en_list
216 @final
217 def get_model(self, user: User, email: str) -> TemplateEmailModel:
218 assert self.evt_model is not None
219 pydantic_model = build_template_model(self.evt_model, user, email)
220 self.augment_model(pydantic_model.TemplateModel.event)
221 debug_model(pydantic_model.model_dump(), self.event.id)
222 return pydantic_model
224 def augment_model(self, model: EvtSchema) -> None:
225 """
226 Subclasses can override this to add data to the 'TemplateModel' of the mail
227 data package
228 """
229 pass
231 def send_emails(self) -> None:
232 for n in self.assign_notifications():
233 try:
234 if n.email is not None:
235 email_model = self.get_model(n.user, n.email)
236 n.message_id = self.mailer.send_email(email_model)
237 log.info(
238 "Email notification sent to %s for event ID %s - %s",
239 n.email,
240 self.event.id,
241 self.event.event_type,
242 )
243 n.set_sent()
244 else:
245 log.error(
246 "Email notification skipped - no email address provided for user"
247 )
248 n.set_failed()
249 n.message_id = "No email address"
250 except Exception as exc:
251 n.set_failed()
252 n.message_id = str(exc)[:255]
253 log.exception(
254 "Failed to send message to %s for event #%s", n.email, self.event.id
255 )
256 self.session.commit()
258 def ping_webhooks(self) -> None:
259 if self.webhooks is False:
260 return
261 if self.evt_model is not None:
262 ev_model_bytes = orjson.dumps(
263 self.evt_model.model_dump(), default=json_default
264 )
265 for wh in fetch.webhooks_for_event(self.event):
266 ping_webhook(wh, ev_model_bytes)
267 self.session.commit()
269 def extra(self) -> None:
270 """Custom actions that may be defined by subclasses"""
271 pass
274@handles(
275 e.ISSUE_ACCEPTED, e.ISSUE_DECLINED, e.ISSUE_REVERTED_TO_ACCEPTED, e.ISSUE_SUBMITTED
276)
277class VendorStatusAction(EventProcessor):
278 issue_watchers = True
279 project_watchers = True
280 webhooks = True
282 @overrides
283 def extra(self) -> None:
284 if self.event.event_type == e.ISSUE_ACCEPTED:
285 self.assign_accepting_admin_to_watchlist()
287 def assign_accepting_admin_to_watchlist(self) -> None:
288 issue = self.session.query(Issue).filter_by(id=self.event.issue_id).one()
289 issue.add_watcher(self.event.user)
292@handles(e.ISSUE_RELEASED, e.ISSUE_RELEASED_UPDATEABLE)
293class IssueReleased(EventProcessor):
294 webhooks = True
296 @overrides
297 def assign_notifications(self) -> list[EmailNotification]:
298 issue: Issue = self.event.issue
299 en_list = []
300 if issue.respondent is not None:
301 for user in (u for u in issue.respondent.users if u.is_administrator()):
302 en = EmailNotification(
303 user_id=user.id,
304 org_id=user.org_id,
305 organisation=user.organisation,
306 email=user.email,
307 event_id=self.event.id,
308 )
309 self.session.add(en)
310 self.session.commit()
311 en_list.append(en)
313 elif issue.respondent_email is not None:
314 en = EmailNotification(email=issue.respondent_email, event_id=self.event.id)
315 self.session.add(en)
316 self.session.commit()
317 en_list.append(en)
319 if not en_list:
320 msg = (
321 f"Event #{self.event.id} - cannot send email notifications for Issue Released: "
322 "respondent_email not set and no respondent user Administrators found"
323 )
324 log.error(msg)
325 return en_list
328@handles(e.SECTION_ACCESS_UPDATED)
329class SectionAccess(EventProcessor):
330 @overrides
331 def recipients(self) -> Iterable[User]:
332 user_id_set = set()
333 for pc in self.event.changes:
334 if pc["name"] == "Granted To":
335 user_id = pc.get("new", "").strip()
336 if user_id in user_id_set:
337 continue
338 user_id_set.add(user_id)
339 grant_user = self.session.get(User, user_id)
340 if grant_user is not None:
341 yield grant_user
344@handles(e.PROJECT_NOTE_ADDED)
345class ProjectNoteHandler(EventProcessor):
346 webhooks = True
348 @overrides
349 def moderate_event_acl(self: Self) -> None:
350 from postrfp.model.notes import ProjectNote, Distribution
351 from postrfp.model.project import Project
353 event = self.event
354 note: ProjectNote = (
355 self.session.query(ProjectNote).filter_by(id=self.event.object_id).one()
356 )
357 project: Project = note.project
359 if note.distribution != Distribution.RESPONDENT_INTERNAL_MEMO:
360 for participant in project.participants:
361 if participant.org_id != note.org_id:
362 self.event.add_to_acl(participant.org_id)
364 if note.distribution == Distribution.BROADCAST_NOTICE:
365 for issue in project.published_issues:
366 if issue.respondent_id is not None:
367 event.add_to_acl(issue.respondent_id)
368 elif (
369 note.distribution == Distribution.TARGETED
370 and note.target_org_id is not None
371 ):
372 event.add_to_acl(note.target_org_id)
374 @overrides
375 def recipients(self) -> Iterable[User]:
376 acl_org_ids = {a.org_id for a in self.event.acl}
377 for user in set(self.event.project.iter_all_watchers()):
378 if user.org_id in acl_org_ids and user.id != self.event.user_id:
379 yield user
381 @overrides
382 def augment_model(self, model: EvtSchema) -> None:
383 from postrfp.model.notes import ProjectNote
385 note = self.session.get(ProjectNote, self.event.object_id)
386 if note is not None:
387 model.note = ProjNote.model_validate(note)
390@handles(e.SCORE_COMMENT_ADDED)
391class ScoreCommentHandler(EventProcessor):
392 webhooks = True
394 @overrides
395 def recipients(self) -> Iterable[User]:
396 from postrfp.model.issue import ScoreComment, Score
398 event = self.event
399 score_comment: ScoreComment = (
400 self.session.query(ScoreComment).filter_by(id=event.object_id).one()
401 )
402 score: Score = score_comment.score
403 users_to_notify = {sc.user for sc in score.comments}
404 users_to_notify = users_to_notify.union(set(event.project.participant_watchers))
405 try:
406 # Try to add the original scorer to the recipients list
407 score_created_event = (
408 self.session.query(AuditEvent)
409 .filter(AuditEvent.event_type == e.SCORE_CREATED)
410 .filter(AuditEvent.object_id == score.id)
411 .one()
412 )
413 users_to_notify.add(score_created_event.user)
414 except NoResultFound:
415 # Ignore - score could be created by autoscore or imported answer
416 pass
417 # AuditEvent.object_id is a bit vague. To prevent info leaking to the wrong people
418 # do a sanity check for all recipients
419 parti_orgs = {p.org_id for p in score.issue.project.participants}
420 for user in users_to_notify:
421 if user.org_id in parti_orgs and user is not event.user:
422 yield user
424 @overrides
425 def augment_model(self, model: EvtSchema) -> None:
426 from postrfp.model.issue import ScoreComment
428 sc = self.session.get(ScoreComment, self.event.object_id)
429 if sc is not None:
430 score = sc.score
431 if score.score is None:
432 sv = None
433 else:
434 sv = score.score
436 model.score = ScoreSchema(
437 score_value=sv,
438 comment=sc.comment_text,
439 question_number=score.question.number,
440 respondent_name=score.issue.respondent.name,
441 issue_id=score.issue_id,
442 )
445if __name__ == "__main__": # pragma: no cover
446 for k, v in registry.items():
447 print(f"{k} : {v.__name__}")
448 print("Can be webhooked:")
449 for ev in webhook_evt_types():
450 print(ev)