Coverage for postrfp / jobs / executor.py: 93%

45 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1"""Task execution abstraction. 

2 

3Two execution modes are supported: 

4 

51. InlineExecutor (default for tests / local dev) executes the task immediately 

6 in-process. 

72. DaguExecutor triggers an external Dagu workflow via HTTP API. Only the 

8 minimal event id is sent; the workflow container runs the same code path 

9 using the CLI entrypoint. 

10 

11Execution mode is selected via configuration (``conf.CONF.task_executor``) 

12with values ``inline`` (default) or ``dagu``. Additional Dagu settings are 

13``conf.CONF.dagu_base_url`` and ``conf.CONF.dagu_api_token``. The executor 

14interface is intentionally minimal to keep the migration surface small. 

15""" 

16 

17from dataclasses import dataclass 

18import logging 

19from typing import Protocol 

20 

21 

22from postrfp import conf 

23 

24log = logging.getLogger(__name__) 

25 

26 

27class Executor(Protocol): 

28 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - Protocol 

29 ... 

30 

31 

32class InlineExecutor: 

33 """Executes tasks synchronously in-process. 

34 

35 This keeps unit tests simple and avoids the need for a running workflow 

36 engine. It is also adequate for small deployments. 

37 """ 

38 

39 def enqueue_event(self, event_id: int) -> None: # pragma: no cover - trivial 

40 from .offload import process_event_task 

41 

42 log.debug("InlineExecutor running event %s immediately", event_id) 

43 process_event_task(event_id) 

44 

45 

46@dataclass 

47class DaguExecutor: 

48 base_url: str = "http://dagu:8080" 

49 token: str | None = None 

50 dag_name: str = "process_event" 

51 

52 def __init__(self): 

53 self.base_url = conf.CONF.dagu_base_url or self.base_url 

54 self.token = conf.CONF.dagu_api_token 

55 

56 def enqueue_event(self, event_id: int) -> None: 

57 """Trigger the Dagu workflow using pydagu library. 

58 

59 Uses DaguHttpClient to trigger the process_event DAG with the event_id parameter. 

60 """ 

61 from pydagu.http import DaguHttpClient 

62 from pydagu.models.request import StartDagRun 

63 

64 client = DaguHttpClient(self.dag_name, self.base_url) 

65 # Note: params is a string that Dagu will parse. Pass as space-separated key=value pairs 

66 start_request = StartDagRun(params=f"event_id={event_id}") 

67 

68 try: 

69 response = client.start_dag_run(start_request) 

70 log.info( 

71 "Enqueued event %s via Dagu dag=%s, response=%s", 

72 event_id, 

73 self.dag_name, 

74 response, 

75 ) 

76 except Exception: 

77 # Let the exception propagate – upstream caller may choose to retry. 

78 log.exception("Error communicating with Dagu for event %s", event_id) 

79 raise 

80 

81 

82_executor: Executor | None = None 

83 

84 

85def init_jobs_executor() -> None: 

86 """ 

87 Initialise the global jobs executor instance. 

88 This must be called once during application startup, after configuration 

89 has been loaded. 

90 """ 

91 if conf.CONF is None: 

92 raise RuntimeError("Configuration not initialised") 

93 

94 global _executor 

95 if _executor is not None: 

96 return 

97 

98 if conf.CONF.task_executor == "dagu": 

99 _executor = DaguExecutor() 

100 else: 

101 _executor = InlineExecutor() 

102 log.info( 

103 "Task executor initialised: %s (mode=%s)", 

104 _executor.__class__.__name__, 

105 conf.CONF.run_mode, 

106 ) 

107 

108 

109def get_executor() -> Executor: 

110 global _executor 

111 if _executor is None: 

112 raise RuntimeError("Executor not initialised, call init_executor() first") 

113 

114 return _executor 

115 

116 

117def reset_executor_for_tests(): 

118 global _executor 

119 _executor = None 

120 init_jobs_executor() 

121 print("\n\n ***************executor reset to: ", get_executor()) 

122 print("\n\n")