Coverage for postrfp / jobs / executor.py: 93%
45 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1"""Task execution abstraction.
3Two execution modes are supported:
51. InlineExecutor (default for tests / local dev) executes the task immediately
6 in-process.
72. DaguExecutor triggers an external Dagu workflow via HTTP API. Only the
8 minimal event id is sent; the workflow container runs the same code path
9 using the CLI entrypoint.
11Execution mode is selected via configuration (``conf.CONF.task_executor``)
12with values ``inline`` (default) or ``dagu``. Additional Dagu settings are
13``conf.CONF.dagu_base_url`` and ``conf.CONF.dagu_api_token``. The executor
14interface is intentionally minimal to keep the migration surface small.
15"""
17from dataclasses import dataclass
18import logging
19from typing import Protocol
22from postrfp import conf
24log = logging.getLogger(__name__)
27class Executor(Protocol):
28 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - Protocol
29 ...
32class InlineExecutor:
33 """Executes tasks synchronously in-process.
35 This keeps unit tests simple and avoids the need for a running workflow
36 engine. It is also adequate for small deployments.
37 """
39 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - trivial
40 from .offload import process_event_task
42 log.debug("InlineExecutor running event %s immediately", event_id)
43 process_event_task(event_id)
46@dataclass
47class DaguExecutor:
48 base_url: str = "http://dagu:8080"
49 token: str | None = None
50 dag_name: str = "process_event"
52 def __init__(self):
53 self.base_url = conf.CONF.dagu_base_url or self.base_url
54 self.token = conf.CONF.dagu_api_token
56 def enqueue_event(self, event_id: int) -> None:
57 """Trigger the Dagu workflow using pydagu library.
59 Uses DaguHttpClient to trigger the process_event DAG with the event_id parameter.
60 """
61 from pydagu.http import DaguHttpClient
62 from pydagu.models.request import StartDagRun
64 client = DaguHttpClient(self.dag_name, self.base_url)
65 # Note: params is a string that Dagu will parse. Pass as space-separated key=value pairs
66 start_request = StartDagRun(params=f"event_id={event_id}")
68 try:
69 response = client.start_dag_run(start_request)
70 log.info(
71 "Enqueued event %s via Dagu dag=%s, response=%s",
72 event_id,
73 self.dag_name,
74 response,
75 )
76 except Exception:
77 # Let the exception propagate – upstream caller may choose to retry.
78 log.exception("Error communicating with Dagu for event %s", event_id)
79 raise
82_executor: Executor | None = None
85def init_jobs_executor() -> None:
86 """
87 Initialise the global jobs executor instance.
88 This must be called once during application startup, after configuration
89 has been loaded.
90 """
91 if conf.CONF is None:
92 raise RuntimeError("Configuration not initialised")
94 global _executor
95 if _executor is not None:
96 return
98 if conf.CONF.task_executor == "dagu":
99 _executor = DaguExecutor()
100 else:
101 _executor = InlineExecutor()
102 log.info(
103 "Task executor initialised: %s (mode=%s)",
104 _executor.__class__.__name__,
105 conf.CONF.run_mode,
106 )
109def get_executor() -> Executor:
110 global _executor
111 if _executor is None:
112 raise RuntimeError("Executor not initialised, call init_executor() first")
114 return _executor
117def reset_executor_for_tests():
118 global _executor
119 _executor = None
120 init_jobs_executor()
121 print("\n\n ***************executor reset to: ", get_executor())
122 print("\n\n")