Coverage for postrfp/jobs/offload.py: 88%

34 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1"""Background task implementation. 

2 

3``process_event_task`` encapsulates the logic previously executed by the 

4spooled function. It is now invoked either directly (InlineExecutor) or via a 

5CLI command launched by Dagu (DaguExecutor). 

6""" 

7 

8import logging 

9from sqlalchemy.orm import sessionmaker 

10 

11from postrfp.shared.init.dbconfig import build_engine 

12from postrfp import conf 

13from postrfp.conf.settings import RunMode 

14from postrfp.jobs.events.action import handle_event, handler_exists_for 

15from postrfp.model.audit.event import AuditEvent 

16from postrfp.mail import stub 

17 

18log = logging.getLogger(__name__) 

19 

20 

21def process_event_task(event_id: int): 

22 """Load an AuditEvent and process it. 

23 

24 Safe to run multiple times (idempotent) – if an event is already marked 

25 done, handlers should detect that state (business logic responsibility) or 

26 side effects (e.g. duplicate emails) should be mitigated by DB constraints. 

27 """ 

28 engine = build_engine() 

29 Session = sessionmaker(engine, future=True) 

30 session = Session() 

31 try: 

32 evt = session.get(AuditEvent, event_id) 

33 if evt is None: 

34 if conf.CONF.run_mode == RunMode.test: 

35 log.debug( 

36 "process_event_task event ID not found - assuming due to session isolation" 

37 ) 

38 else: 

39 log.error("Unable to handle event, ID %s not found", event_id) 

40 return 

41 

42 if not handler_exists_for(evt): 

43 if evt.status.name == "pending": 

44 evt.set_done() 

45 session.commit() 

46 log.debug( 

47 "Marked unhandled event %s (%s) 'done' in offload processor", 

48 evt.id, 

49 evt.event_type, 

50 ) 

51 return 

52 

53 try: 

54 handle_event(evt, session, stub) 

55 session.commit() 

56 except Exception: 

57 session.rollback() 

58 log.exception("Failure processing Event #%s", event_id) 

59 raise 

60 finally: 

61 session.close()