Coverage for postrfp / jobs / events / listeners.py: 92%

26 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1import logging 

2 

3from sqlalchemy import event 

4 

5from postrfp.jobs.events.action import handler_exists_for, registry 

6from postrfp.jobs.executor import get_executor 

7from postrfp.model.audit.event import AuditEvent 

8 

9 

10log = logging.getLogger(__name__) 

11 

12 

13_listeners_registered = False 

14 

15 

16def init_event_listeners() -> None: 

17 """ 

18 Register SQLAlchemy event listeners for automatic event processing. 

19 

20 Must be called during application startup AFTER: 

21 - Configuration is loaded (conf.CONF exists) 

22 - Executor is initialized (init_jobs_executor called) 

23 - Models are imported (AuditEvent class exists) 

24 

25 This function is idempotent - safe to call multiple times. 

26 """ 

27 global _listeners_registered 

28 if _listeners_registered: 

29 log.debug("Event listeners already registered, skipping") 

30 return 

31 

32 from postrfp.model.audit.event import AuditEvent 

33 

34 event.listen( 

35 AuditEvent, "before_insert", set_status_done_if_no_handler, propagate=True 

36 ) 

37 event.listen(AuditEvent, "after_insert", enqueue_event_job) 

38 

39 _listeners_registered = True 

40 

41 # Log what we just did 

42 handler_count = len(registry) 

43 handler_types = list(registry.keys()) 

44 log.info( 

45 "✓ Event processing system initialized:\n" 

46 " - Executor: %s\n" 

47 " - Handlers registered: %d event types\n" 

48 " - Webhook-enabled: %d event types\n" 

49 " - Event types: %s", 

50 get_executor().__class__.__name__, 

51 handler_count, 

52 len([k for k, v in registry.items() if v.webhooks]), 

53 ", ".join(handler_types[:5]) + ("..." if len(handler_types) > 5 else ""), 

54 ) 

55 

56 

57# Move the actual listener functions here (no longer decorators) 

58def set_status_done_if_no_handler(mapper, connection, target: AuditEvent): 

59 if not handler_exists_for(target): 

60 target.set_done() 

61 

62 

63def enqueue_event_job(mapper, connection, target: AuditEvent): 

64 if handler_exists_for(target): 

65 get_executor().enqueue_event(int(target.id)) 

66 log.debug("Event %s (%s) enqueued for processing", target.id, target.event_type) 

67 

68 

69__all__ = ["init_event_listeners"]