Coverage for postrfp / jobs / dagu.py: 61%

116 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1import logging 

2 

3from pydagu.models import Dag 

4from pydagu.builder import DagBuilder, StepBuilder 

5from pydagu.http import DaguHttpClient 

6from pydagu.models.request import StartDagRun 

7from pydagu.models.executor import ExecutorConfig, HTTPExecutorConfig 

8 

9 

10from postrfp import conf 

11from postrfp.model.notify import WebhookSubscription 

12from postrfp.model.datafeeds import Datafeed 

13from postrfp.jobs.internal.schemas import JobStatusUpdate 

14 

15 

16log = logging.getLogger(__name__) 

17 

18 

19def generate_dag_name(webhook: WebhookSubscription) -> str: 

20 """Generate a unique DAG name for the given webhook subscription.""" 

21 return f"{webhook.id}-{webhook.event_type}" 

22 

23 

24def create_webhook_dag(webhook: WebhookSubscription) -> None | str: 

25 """ 

26 Create the Dagu DAG for the given webhook subscription and save it the Dagu server. 

27 """ 

28 dag = build_webhook_dag(webhook) 

29 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url) 

30 response = client.post_dag(dag) 

31 if response is not None: 

32 return response.message 

33 return None 

34 

35 

36def delete_webhook_dag(dag_name: str) -> None: 

37 """ 

38 Delete the Dagu DAG for the given webhook subscription. 

39 """ 

40 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url) 

41 client.delete_dag() 

42 return None 

43 

44 

45def update_webhook_dag(webhook: WebhookSubscription) -> None: 

46 """ 

47 Update the Dagu DAG for the given webhook subscription. 

48 """ 

49 dag = build_webhook_dag(webhook) 

50 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url) 

51 client.update_dag(dag) 

52 return None 

53 

54 

55def build_webhook_dag(webhook: WebhookSubscription) -> Dag: 

56 """ 

57 Create a Dagu DAG definition for the given WebhookSubscription. 

58 

59 The DAG will execute a webhook POST request to the configured remote_url 

60 with the event payload whenever triggered. 

61 

62 Expected DAG parameters when triggered: 

63 - execution_id: JobExecution record ID 

64 - event_payload: JSON string of the event data 

65 """ 

66 

67 headers = { 

68 header_name: header_value 

69 for header in (webhook.http_headers or []) 

70 for header_name, header_value in header.items() 

71 } 

72 

73 # Step 1: Deliver webhook to remote URL 

74 ping_step = ( 

75 StepBuilder("deliver-webhook") 

76 .command(f"POST {webhook.remote_url}") 

77 .http_executor( 

78 headers=headers, 

79 body="${event_payload}", 

80 timeout=10, 

81 ) 

82 .retry(limit=3, interval=5) 

83 .output("WEBHOOK_RESPONSE") 

84 .build() 

85 ) 

86 

87 # Step 2: Report delivery status back to PostRFP 

88 callback_url = f"{conf.CONF.app_base_url}/internal/api/jobstatus" 

89 

90 # Build callback body using jobstatusResult model for type safety 

91 # Note: We use model_construct to bypass validation since these are template 

92 # strings that will be replaced by Dagu at runtime, not actual values 

93 # - execution_id: DAG parameter (int) passed when triggering 

94 # - DAG_RUN_ID: Dagu runtime environment variable (string) 

95 # - WEBHOOK_RESPONSE: Output from deliver-webhook step (string) 

96 # Use plain strings for status to avoid Pydantic serialization warnings 

97 success_result = JobStatusUpdate.model_construct( 

98 execution_id="${execution_id}", # Template: will be int at runtime 

99 status="success", # Plain string avoids enum serialization warning 

100 job_type="webhook_delivery", 

101 http_status_code=None, 

102 response_body="${WEBHOOK_RESPONSE}", 

103 error_message=None, 

104 ) 

105 

106 # Serialize to JSON for the HTTP body 

107 callback_body = success_result.model_dump_json() 

108 

109 report_step = ( 

110 StepBuilder("report-delivery-status") 

111 .depends_on("deliver-webhook") 

112 .command(f"POST {callback_url}") 

113 .http_executor( 

114 headers={"Content-Type": "application/json"}, 

115 body=callback_body, 

116 timeout=5, 

117 ) 

118 .retry(limit=2, interval=3) 

119 .build() 

120 ) 

121 

122 # Step 3: Handle failure case - report when webhook delivery fails 

123 # This step runs conditionally when deliver-webhook fails after all retries 

124 failure_result = JobStatusUpdate.model_construct( 

125 execution_id="${execution_id}", 

126 status="failed", # Plain string avoids enum serialization warning 

127 job_type="webhook_delivery", 

128 http_status_code=None, 

129 response_body="${DAG_RUN_LOG_FILE}", 

130 error_message="Webhook delivery failed after retries", 

131 ) 

132 

133 failure_callback_body = failure_result.model_dump_json() 

134 

135 # Create HTTP executor config for the failure handler 

136 failure_executor = ExecutorConfig( 

137 type="http", 

138 config=HTTPExecutorConfig( 

139 headers={"Content-Type": "application/json"}, 

140 body=failure_callback_body, 

141 timeout=5, 

142 silent=None, 

143 query=None, 

144 skipTLSVerify=None, 

145 ), 

146 ) 

147 

148 dag_name = generate_dag_name(webhook) 

149 

150 return ( 

151 DagBuilder(dag_name) 

152 .description(f"Webhook DAG for {webhook.event_type} to {webhook.remote_url}") 

153 .add_tag("webhook") 

154 .add_param("execution_id", "") # Parameter - JobExecution record ID 

155 .add_param("event_payload", "") # Parameter - JSON event data 

156 .add_step_models(ping_step, report_step) 

157 .on_failure(command=f"POST {callback_url}", executor=failure_executor) 

158 .build() 

159 ) 

160 

161 

162def trigger_webhook_dag( 

163 webhook: WebhookSubscription, execution_id: int, event_payload: str 

164) -> str | None: 

165 """ 

166 Trigger a webhook DAG execution with the given parameters. 

167 

168 Args: 

169 webhook: WebhookSubscription model containing DAG configuration 

170 execution_id: JobExecution record ID for tracking 

171 event_payload: JSON-serialized event data to send to webhook 

172 

173 Returns: 

174 The Dagu run ID for tracking execution status 

175 

176 Raises: 

177 Exception: If DAG triggering fails 

178 """ 

179 dag_name = generate_dag_name(webhook) 

180 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url) 

181 

182 # Build params string: Dagu expects space-separated key=value pairs 

183 # For complex values like JSON, we need to pass them as environment variables 

184 # or use Dagu's params format. 

185 params_str = f"execution_id={execution_id} event_payload={event_payload}" 

186 

187 start_request = StartDagRun(params=params_str) 

188 response = client.start_dag_run(start_request) 

189 

190 # Response can be either DagRunId or DagResponseMessage 

191 # DagRunId has a .runId attribute we can return 

192 if hasattr(response, "runId"): 

193 return response.runId 

194 elif hasattr(response, "dagRunId"): 

195 return response.dagRunId 

196 else: 

197 # If it's a message, log it and return empty string 

198 # (caller should handle empty run_id appropriately) 

199 log.warning(f"DAG start response provided no Run Id: {response}") 

200 return None 

201 

202 

203def build_process_event_dag() -> Dag: 

204 """ 

205 Create the Dagu DAG definition for processing AuditEvents. 

206 

207 This DAG is triggered by DaguExecutor.enqueue_event() to process 

208 events asynchronously via the internal API endpoint. 

209 

210 The DAG uses the configured base URL to ensure it works across 

211 different deployment environments (dev/staging/production). 

212 

213 Expected DAG parameters when triggered: 

214 - event_id: The AuditEvent ID to process 

215 """ 

216 # Use configuration for base URL - ensures consistency across environments 

217 process_url = f"{conf.CONF.app_base_url}/internal/api/event/${{event_id}}/process" 

218 

219 process_step = ( 

220 StepBuilder("process") 

221 .command(f"POST {process_url}") 

222 .http_executor(timeout=300) 

223 .retry(limit=3, interval=30) 

224 .build() 

225 ) 

226 

227 return ( 

228 DagBuilder("process_event") 

229 .description("Process AuditEvent by calling internal API endpoint") 

230 .add_param("event_id", "") # Parameter definition - value provided at runtime 

231 .add_step_models(process_step) 

232 .build() 

233 ) 

234 

235 

236def deploy_process_event_dag() -> None: 

237 """ 

238 Deploy or update the process_event DAG to the Dagu server. 

239 

240 This should be called during application startup when using the Dagu executor 

241 to ensure the DAG definition is up-to-date with the current configuration. 

242 

243 The DAG is created dynamically to inject the correct base URL from configuration, 

244 avoiding hardcoded values that may not work across environments. 

245 """ 

246 import logging 

247 

248 log = logging.getLogger(__name__) 

249 

250 dag = build_process_event_dag() 

251 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url) 

252 

253 try: 

254 # Try to update first (in case it already exists) 

255 client.update_dag(dag) 

256 log.info("Updated process_event DAG with base URL: %s", conf.CONF.app_base_url) 

257 except Exception: 

258 # If update fails, try to create it 

259 try: 

260 response = client.post_dag(dag) 

261 if response is not None: 

262 log.info("Created process_event DAG: %s", response.message) 

263 else: 

264 log.info( 

265 "Created process_event DAG with base URL: %s", 

266 conf.CONF.app_base_url, 

267 ) 

268 except Exception as e: 

269 log.error("Failed to deploy process_event DAG: %s", e) 

270 raise 

271 

272 

273def generate_datafeed_dag_name(datafeed: Datafeed) -> str: 

274 """Generate a unique DAG name for the given datafeed.""" 

275 return f"feed-{datafeed.id}-{datafeed.uid}" 

276 

277 

278def create_datafeed_dag(datafeed: Datafeed) -> None | str: 

279 """ 

280 Create the Dagu DAG for the given datafeed and save it the Dagu server. 

281 """ 

282 dag = build_datafeed_dag(datafeed) 

283 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url) 

284 response = client.post_dag(dag) 

285 if response is not None: 

286 return response.message 

287 return None 

288 

289 

290def delete_datafeed_dag(dag_name: str) -> None: 

291 """ 

292 Delete the Dagu DAG for the given datafeed. 

293 """ 

294 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url) 

295 client.delete_dag() 

296 return None 

297 

298 

299def update_datafeed_dag(datafeed: Datafeed) -> None: 

300 """ 

301 Update the Dagu DAG for the given datafeed. 

302 """ 

303 dag = build_datafeed_dag(datafeed) 

304 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url) 

305 client.update_dag(dag) 

306 return None 

307 

308 

309def build_datafeed_dag(datafeed: Datafeed) -> Dag: 

310 """ 

311 Create a Dagu DAG definition for the given Datafeed. 

312 """ 

313 headers = { 

314 header_name: header_value 

315 for header in (datafeed.http_headers or []) 

316 for header_name, header_value in header.items() 

317 } 

318 

319 # Step 1: Fetch data from source URL 

320 # We use GET by default for fetching data 

321 fetch_step = ( 

322 StepBuilder("fetch-data") 

323 .command(f"GET {datafeed.source_url}") 

324 .http_executor( 

325 headers=headers, 

326 timeout=30, 

327 ) 

328 .retry(limit=3, interval=5) 

329 .output("FETCH_RESPONSE") 

330 .build() 

331 ) 

332 

333 # Step 2: Report status back to PostRFP 

334 callback_url = f"{conf.CONF.app_base_url}/internal/api/jobstatus" 

335 

336 success_result = JobStatusUpdate.model_construct( 

337 execution_id="${execution_id}", 

338 status="success", 

339 job_type="datafeed_fetch", 

340 http_status_code=None, 

341 response_body="${FETCH_RESPONSE}", 

342 error_message=None, 

343 ) 

344 

345 callback_body = success_result.model_dump_json() 

346 

347 report_step = ( 

348 StepBuilder("report-status") 

349 .depends_on("fetch-data") 

350 .command(f"POST {callback_url}") 

351 .http_executor( 

352 headers={"Content-Type": "application/json"}, 

353 body=callback_body, 

354 timeout=30, # Larger timeout for potentially large payloads 

355 ) 

356 .retry(limit=2, interval=3) 

357 .build() 

358 ) 

359 

360 # Step 3: Handle failure 

361 failure_result = JobStatusUpdate.model_construct( 

362 execution_id="${execution_id}", 

363 status="failed", 

364 job_type="datafeed_fetch", 

365 http_status_code=None, 

366 response_body="${DAG_RUN_LOG_FILE}", 

367 error_message="Datafeed fetch failed after retries", 

368 ) 

369 

370 failure_callback_body = failure_result.model_dump_json() 

371 

372 failure_executor = ExecutorConfig( 

373 type="http", 

374 config=HTTPExecutorConfig( 

375 headers={"Content-Type": "application/json"}, 

376 body=failure_callback_body, 

377 timeout=5, 

378 silent=None, 

379 query=None, 

380 skipTLSVerify=None, 

381 ), 

382 ) 

383 

384 dag_name = generate_datafeed_dag_name(datafeed) 

385 

386 return ( 

387 DagBuilder(dag_name) 

388 .description(f"Datafeed DAG for {datafeed.name}") 

389 .add_tag("datafeed") 

390 .add_param("execution_id", "") 

391 # Add other params if source_url has templates? 

392 # For now just execution_id 

393 .add_step_models(fetch_step, report_step) 

394 .on_failure(command=f"POST {callback_url}", executor=failure_executor) 

395 .build() 

396 ) 

397 

398 

399def trigger_datafeed_dag(datafeed: Datafeed, execution_id: int) -> str | None: 

400 """ 

401 Trigger a datafeed DAG execution. 

402 """ 

403 dag_name = generate_datafeed_dag_name(datafeed) 

404 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url) 

405 

406 params_str = f"execution_id={execution_id}" 

407 

408 start_request = StartDagRun(params=params_str) 

409 response = client.start_dag_run(start_request) 

410 

411 if hasattr(response, "runId"): 

412 return response.runId 

413 elif hasattr(response, "dagRunId"): 

414 return response.dagRunId 

415 else: 

416 log.warning(f"DAG start response provided no Run Id: {response}") 

417 return None