Coverage for postrfp/jobs/executor.py: 95%
62 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""Task execution abstraction.
3Two execution modes are supported:
51. InlineExecutor (default for tests / local dev) executes the task immediately
6 in-process.
72. DaguExecutor triggers an external Dagu workflow via HTTP API. Only the
8 minimal event id is sent; the workflow container runs the same code path
9 using the CLI entrypoint.
11Execution mode is selected via configuration (``conf.CONF.task_executor``)
12with values ``inline`` (default) or ``dagu``. Additional Dagu settings are
13``conf.CONF.dagu_base_url`` and ``conf.CONF.dagu_api_token``. The executor
14interface is intentionally minimal to keep the migration surface small.
15"""
17from dataclasses import dataclass
18import logging
19from typing import Protocol
21import requests
22from sqlalchemy import event
24from postrfp import conf
25from postrfp.model.audit.event import AuditEvent
26from .events.action import handler_exists_for
28log = logging.getLogger(__name__)
31class Executor(Protocol):
32 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - Protocol
33 ...
36class InlineExecutor:
37 """Executes tasks synchronously in-process.
39 This keeps unit tests simple and avoids the need for a running workflow
40 engine. It is also adequate for small deployments.
41 """
43 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - trivial
44 from .offload import process_event_task
46 log.debug("InlineExecutor running event %s immediately", event_id)
47 process_event_task(event_id)
50@dataclass
51class DaguExecutor:
52 base_url: str = "http://dagu:8080"
53 token: str | None = None
54 dag_name: str = "process_event"
56 def __init__(self):
57 self.base_url = conf.CONF.dagu_base_url or self.base_url
58 self.token = conf.CONF.dagu_api_token
60 def enqueue_event(self, event_id: int) -> None:
61 """Trigger the Dagu workflow.
63 Dagu's API (v1) is expected to accept a POST to:
64 {base_url}/api/v1/dags/{dag_name}/start
65 with JSON body containing 'params'. We keep the contract minimal; if
66 Dagu changes, adapt here without touching business code.
67 """
68 url = f"{self.base_url.rstrip('/')}/api/v1/dags/{self.dag_name}/start"
69 headers = {"Content-Type": "application/json"}
70 if self.token:
71 headers["Authorization"] = f"Bearer {self.token}"
72 payload = {"params": {"event_id": event_id}}
73 try:
74 resp = requests.post(url, json=payload, timeout=5, headers=headers)
75 if resp.status_code >= 400:
76 log.error(
77 "Failed to enqueue event %s with Dagu (%s): %s",
78 event_id,
79 resp.status_code,
80 resp.text[:200],
81 )
82 resp.raise_for_status()
83 log.info("Enqueued event %s via Dagu dag=%s", event_id, self.dag_name)
84 except Exception:
85 # Let the exception propagate – upstream caller may choose to retry.
86 log.exception("Error communicating with Dagu for event %s", event_id)
87 raise
90_executor: Executor | None = None
93def init_jobs_executor() -> None:
94 """
95 Initialise the global jobs executor instance.
96 This must be called once during application startup, after configuration
97 has been loaded.
98 """
99 if conf.CONF is None:
100 raise RuntimeError("Configuration not initialised")
102 global _executor
103 if _executor is not None:
104 return
106 if conf.CONF.task_executor == "dagu":
107 _executor = DaguExecutor()
108 else:
109 _executor = InlineExecutor()
110 log.info(
111 "Task executor initialised: %s (mode=%s)",
112 _executor.__class__.__name__,
113 conf.CONF.run_mode,
114 )
117def get_executor() -> Executor:
118 global _executor
119 if _executor is None:
120 raise RuntimeError("Executor not initialised, call init_executor() first")
122 return _executor
125def reset_executor_for_tests():
126 global _executor
127 _executor = None
128 init_jobs_executor()
129 print("\n\n ***************executor reset to: ", get_executor())
130 print("\n\n")
133@event.listens_for(AuditEvent, "before_insert", propagate=True)
134def set_status_done_if_no_handler(mapper, connection, target: AuditEvent):
135 if not handler_exists_for(target):
136 target.set_done()
139@event.listens_for(AuditEvent, "after_insert")
140def enqueue_event_job(mapper, connection, target: AuditEvent):
141 if handler_exists_for(target):
142 get_executor().enqueue_event(int(target.id))
143 log.info("Event %s enqueued for async processing", target.id)