Coverage for postrfp/jobs/offload.py: 88%
34 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""Background task implementation.
3``process_event_task`` encapsulates the logic previously executed by the
4spooled function. It is now invoked either directly (InlineExecutor) or via a
5CLI command launched by Dagu (DaguExecutor).
6"""
8import logging
9from sqlalchemy.orm import sessionmaker
11from postrfp.shared.init.dbconfig import build_engine
12from postrfp import conf
13from postrfp.conf.settings import RunMode
14from postrfp.jobs.events.action import handle_event, handler_exists_for
15from postrfp.model.audit.event import AuditEvent
16from postrfp.mail import stub
18log = logging.getLogger(__name__)
21def process_event_task(event_id: int):
22 """Load an AuditEvent and process it.
24 Safe to run multiple times (idempotent) – if an event is already marked
25 done, handlers should detect that state (business logic responsibility) or
26 side effects (e.g. duplicate emails) should be mitigated by DB constraints.
27 """
28 engine = build_engine()
29 Session = sessionmaker(engine, future=True)
30 session = Session()
31 try:
32 evt = session.get(AuditEvent, event_id)
33 if evt is None:
34 if conf.CONF.run_mode == RunMode.test:
35 log.debug(
36 "process_event_task event ID not found - assuming due to session isolation"
37 )
38 else:
39 log.error("Unable to handle event, ID %s not found", event_id)
40 return
42 if not handler_exists_for(evt):
43 if evt.status.name == "pending":
44 evt.set_done()
45 session.commit()
46 log.debug(
47 "Marked unhandled event %s (%s) 'done' in offload processor",
48 evt.id,
49 evt.event_type,
50 )
51 return
53 try:
54 handle_event(evt, session, stub)
55 session.commit()
56 except Exception:
57 session.rollback()
58 log.exception("Failure processing Event #%s", event_id)
59 raise
60 finally:
61 session.close()