Coverage for postrfp / jobs / events / listeners.py: 92%
26 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1import logging
3from sqlalchemy import event
5from postrfp.jobs.events.action import handler_exists_for, registry
6from postrfp.jobs.executor import get_executor
7from postrfp.model.audit.event import AuditEvent
10log = logging.getLogger(__name__)
13_listeners_registered = False
16def init_event_listeners() -> None:
17 """
18 Register SQLAlchemy event listeners for automatic event processing.
20 Must be called during application startup AFTER:
21 - Configuration is loaded (conf.CONF exists)
22 - Executor is initialized (init_jobs_executor called)
23 - Models are imported (AuditEvent class exists)
25 This function is idempotent - safe to call multiple times.
26 """
27 global _listeners_registered
28 if _listeners_registered:
29 log.debug("Event listeners already registered, skipping")
30 return
32 from postrfp.model.audit.event import AuditEvent
34 event.listen(
35 AuditEvent, "before_insert", set_status_done_if_no_handler, propagate=True
36 )
37 event.listen(AuditEvent, "after_insert", enqueue_event_job)
39 _listeners_registered = True
41 # Log what we just did
42 handler_count = len(registry)
43 handler_types = list(registry.keys())
44 log.info(
45 "✓ Event processing system initialized:\n"
46 " - Executor: %s\n"
47 " - Handlers registered: %d event types\n"
48 " - Webhook-enabled: %d event types\n"
49 " - Event types: %s",
50 get_executor().__class__.__name__,
51 handler_count,
52 len([k for k, v in registry.items() if v.webhooks]),
53 ", ".join(handler_types[:5]) + ("..." if len(handler_types) > 5 else ""),
54 )
57# Move the actual listener functions here (no longer decorators)
58def set_status_done_if_no_handler(mapper, connection, target: AuditEvent):
59 if not handler_exists_for(target):
60 target.set_done()
63def enqueue_event_job(mapper, connection, target: AuditEvent):
64 if handler_exists_for(target):
65 get_executor().enqueue_event(int(target.id))
66 log.debug("Event %s (%s) enqueued for processing", target.id, target.event_type)
69__all__ = ["init_event_listeners"]