Coverage for postrfp/jobs/executor.py: 95%

62 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1"""Task execution abstraction. 

2 

3Two execution modes are supported: 

4 

51. InlineExecutor (default for tests / local dev) executes the task immediately 

6 in-process. 

72. DaguExecutor triggers an external Dagu workflow via HTTP API. Only the 

8 minimal event id is sent; the workflow container runs the same code path 

9 using the CLI entrypoint. 

10 

11Execution mode is selected via configuration (``conf.CONF.task_executor``) 

12with values ``inline`` (default) or ``dagu``. Additional Dagu settings are 

13``conf.CONF.dagu_base_url`` and ``conf.CONF.dagu_api_token``. The executor 

14interface is intentionally minimal to keep the migration surface small. 

15""" 

16 

17from dataclasses import dataclass 

18import logging 

19from typing import Protocol 

20 

21import requests 

22from sqlalchemy import event 

23 

24from postrfp import conf 

25from postrfp.model.audit.event import AuditEvent 

26from .events.action import handler_exists_for 

27 

28log = logging.getLogger(__name__) 

29 

30 

31class Executor(Protocol): 

32 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - Protocol 

33 ... 

34 

35 

36class InlineExecutor: 

37 """Executes tasks synchronously in-process. 

38 

39 This keeps unit tests simple and avoids the need for a running workflow 

40 engine. It is also adequate for small deployments. 

41 """ 

42 

43 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - trivial 

44 from .offload import process_event_task 

45 

46 log.debug("InlineExecutor running event %s immediately", event_id) 

47 process_event_task(event_id) 

48 

49 

50@dataclass 

51class DaguExecutor: 

52 base_url: str = "http://dagu:8080" 

53 token: str | None = None 

54 dag_name: str = "process_event" 

55 

56 def __init__(self): 

57 self.base_url = conf.CONF.dagu_base_url or self.base_url 

58 self.token = conf.CONF.dagu_api_token 

59 

60 def enqueue_event(self, event_id: int) -> None: 

61 """Trigger the Dagu workflow. 

62 

63 Dagu's API (v1) is expected to accept a POST to: 

64 {base_url}/api/v1/dags/{dag_name}/start 

65 with JSON body containing 'params'. We keep the contract minimal; if 

66 Dagu changes, adapt here without touching business code. 

67 """ 

68 url = f"{self.base_url.rstrip('/')}/api/v1/dags/{self.dag_name}/start" 

69 headers = {"Content-Type": "application/json"} 

70 if self.token: 

71 headers["Authorization"] = f"Bearer {self.token}" 

72 payload = {"params": {"event_id": event_id}} 

73 try: 

74 resp = requests.post(url, json=payload, timeout=5, headers=headers) 

75 if resp.status_code >= 400: 

76 log.error( 

77 "Failed to enqueue event %s with Dagu (%s): %s", 

78 event_id, 

79 resp.status_code, 

80 resp.text[:200], 

81 ) 

82 resp.raise_for_status() 

83 log.info("Enqueued event %s via Dagu dag=%s", event_id, self.dag_name) 

84 except Exception: 

85 # Let the exception propagate – upstream caller may choose to retry. 

86 log.exception("Error communicating with Dagu for event %s", event_id) 

87 raise 

88 

89 

90_executor: Executor | None = None 

91 

92 

93def init_jobs_executor() -> None: 

94 """ 

95 Initialise the global jobs executor instance. 

96 This must be called once during application startup, after configuration 

97 has been loaded. 

98 """ 

99 if conf.CONF is None: 

100 raise RuntimeError("Configuration not initialised") 

101 

102 global _executor 

103 if _executor is not None: 

104 return 

105 

106 if conf.CONF.task_executor == "dagu": 

107 _executor = DaguExecutor() 

108 else: 

109 _executor = InlineExecutor() 

110 log.info( 

111 "Task executor initialised: %s (mode=%s)", 

112 _executor.__class__.__name__, 

113 conf.CONF.run_mode, 

114 ) 

115 

116 

117def get_executor() -> Executor: 

118 global _executor 

119 if _executor is None: 

120 raise RuntimeError("Executor not initialised, call init_executor() first") 

121 

122 return _executor 

123 

124 

125def reset_executor_for_tests(): 

126 global _executor 

127 _executor = None 

128 init_jobs_executor() 

129 print("\n\n ***************executor reset to: ", get_executor()) 

130 print("\n\n") 

131 

132 

133@event.listens_for(AuditEvent, "before_insert", propagate=True) 

134def set_status_done_if_no_handler(mapper, connection, target: AuditEvent): 

135 if not handler_exists_for(target): 

136 target.set_done() 

137 

138 

139@event.listens_for(AuditEvent, "after_insert") 

140def enqueue_event_job(mapper, connection, target: AuditEvent): 

141 if handler_exists_for(target): 

142 get_executor().enqueue_event(int(target.id)) 

143 log.info("Event %s enqueued for async processing", target.id)