Coverage for postrfp / jobs / dagu.py: 61%
116 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1import logging
3from pydagu.models import Dag
4from pydagu.builder import DagBuilder, StepBuilder
5from pydagu.http import DaguHttpClient
6from pydagu.models.request import StartDagRun
7from pydagu.models.executor import ExecutorConfig, HTTPExecutorConfig
10from postrfp import conf
11from postrfp.model.notify import WebhookSubscription
12from postrfp.model.datafeeds import Datafeed
13from postrfp.jobs.internal.schemas import JobStatusUpdate
16log = logging.getLogger(__name__)
19def generate_dag_name(webhook: WebhookSubscription) -> str:
20 """Generate a unique DAG name for the given webhook subscription."""
21 return f"{webhook.id}-{webhook.event_type}"
24def create_webhook_dag(webhook: WebhookSubscription) -> None | str:
25 """
26 Create the Dagu DAG for the given webhook subscription and save it the Dagu server.
27 """
28 dag = build_webhook_dag(webhook)
29 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url)
30 response = client.post_dag(dag)
31 if response is not None:
32 return response.message
33 return None
36def delete_webhook_dag(dag_name: str) -> None:
37 """
38 Delete the Dagu DAG for the given webhook subscription.
39 """
40 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url)
41 client.delete_dag()
42 return None
45def update_webhook_dag(webhook: WebhookSubscription) -> None:
46 """
47 Update the Dagu DAG for the given webhook subscription.
48 """
49 dag = build_webhook_dag(webhook)
50 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url)
51 client.update_dag(dag)
52 return None
55def build_webhook_dag(webhook: WebhookSubscription) -> Dag:
56 """
57 Create a Dagu DAG definition for the given WebhookSubscription.
59 The DAG will execute a webhook POST request to the configured remote_url
60 with the event payload whenever triggered.
62 Expected DAG parameters when triggered:
63 - execution_id: JobExecution record ID
64 - event_payload: JSON string of the event data
65 """
67 headers = {
68 header_name: header_value
69 for header in (webhook.http_headers or [])
70 for header_name, header_value in header.items()
71 }
73 # Step 1: Deliver webhook to remote URL
74 ping_step = (
75 StepBuilder("deliver-webhook")
76 .command(f"POST {webhook.remote_url}")
77 .http_executor(
78 headers=headers,
79 body="${event_payload}",
80 timeout=10,
81 )
82 .retry(limit=3, interval=5)
83 .output("WEBHOOK_RESPONSE")
84 .build()
85 )
87 # Step 2: Report delivery status back to PostRFP
88 callback_url = f"{conf.CONF.app_base_url}/internal/api/jobstatus"
90 # Build callback body using jobstatusResult model for type safety
91 # Note: We use model_construct to bypass validation since these are template
92 # strings that will be replaced by Dagu at runtime, not actual values
93 # - execution_id: DAG parameter (int) passed when triggering
94 # - DAG_RUN_ID: Dagu runtime environment variable (string)
95 # - WEBHOOK_RESPONSE: Output from deliver-webhook step (string)
96 # Use plain strings for status to avoid Pydantic serialization warnings
97 success_result = JobStatusUpdate.model_construct(
98 execution_id="${execution_id}", # Template: will be int at runtime
99 status="success", # Plain string avoids enum serialization warning
100 job_type="webhook_delivery",
101 http_status_code=None,
102 response_body="${WEBHOOK_RESPONSE}",
103 error_message=None,
104 )
106 # Serialize to JSON for the HTTP body
107 callback_body = success_result.model_dump_json()
109 report_step = (
110 StepBuilder("report-delivery-status")
111 .depends_on("deliver-webhook")
112 .command(f"POST {callback_url}")
113 .http_executor(
114 headers={"Content-Type": "application/json"},
115 body=callback_body,
116 timeout=5,
117 )
118 .retry(limit=2, interval=3)
119 .build()
120 )
122 # Step 3: Handle failure case - report when webhook delivery fails
123 # This step runs conditionally when deliver-webhook fails after all retries
124 failure_result = JobStatusUpdate.model_construct(
125 execution_id="${execution_id}",
126 status="failed", # Plain string avoids enum serialization warning
127 job_type="webhook_delivery",
128 http_status_code=None,
129 response_body="${DAG_RUN_LOG_FILE}",
130 error_message="Webhook delivery failed after retries",
131 )
133 failure_callback_body = failure_result.model_dump_json()
135 # Create HTTP executor config for the failure handler
136 failure_executor = ExecutorConfig(
137 type="http",
138 config=HTTPExecutorConfig(
139 headers={"Content-Type": "application/json"},
140 body=failure_callback_body,
141 timeout=5,
142 silent=None,
143 query=None,
144 skipTLSVerify=None,
145 ),
146 )
148 dag_name = generate_dag_name(webhook)
150 return (
151 DagBuilder(dag_name)
152 .description(f"Webhook DAG for {webhook.event_type} to {webhook.remote_url}")
153 .add_tag("webhook")
154 .add_param("execution_id", "") # Parameter - JobExecution record ID
155 .add_param("event_payload", "") # Parameter - JSON event data
156 .add_step_models(ping_step, report_step)
157 .on_failure(command=f"POST {callback_url}", executor=failure_executor)
158 .build()
159 )
162def trigger_webhook_dag(
163 webhook: WebhookSubscription, execution_id: int, event_payload: str
164) -> str | None:
165 """
166 Trigger a webhook DAG execution with the given parameters.
168 Args:
169 webhook: WebhookSubscription model containing DAG configuration
170 execution_id: JobExecution record ID for tracking
171 event_payload: JSON-serialized event data to send to webhook
173 Returns:
174 The Dagu run ID for tracking execution status
176 Raises:
177 Exception: If DAG triggering fails
178 """
179 dag_name = generate_dag_name(webhook)
180 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url)
182 # Build params string: Dagu expects space-separated key=value pairs
183 # For complex values like JSON, we need to pass them as environment variables
184 # or use Dagu's params format.
185 params_str = f"execution_id={execution_id} event_payload={event_payload}"
187 start_request = StartDagRun(params=params_str)
188 response = client.start_dag_run(start_request)
190 # Response can be either DagRunId or DagResponseMessage
191 # DagRunId has a .runId attribute we can return
192 if hasattr(response, "runId"):
193 return response.runId
194 elif hasattr(response, "dagRunId"):
195 return response.dagRunId
196 else:
197 # If it's a message, log it and return empty string
198 # (caller should handle empty run_id appropriately)
199 log.warning(f"DAG start response provided no Run Id: {response}")
200 return None
203def build_process_event_dag() -> Dag:
204 """
205 Create the Dagu DAG definition for processing AuditEvents.
207 This DAG is triggered by DaguExecutor.enqueue_event() to process
208 events asynchronously via the internal API endpoint.
210 The DAG uses the configured base URL to ensure it works across
211 different deployment environments (dev/staging/production).
213 Expected DAG parameters when triggered:
214 - event_id: The AuditEvent ID to process
215 """
216 # Use configuration for base URL - ensures consistency across environments
217 process_url = f"{conf.CONF.app_base_url}/internal/api/event/${{event_id}}/process"
219 process_step = (
220 StepBuilder("process")
221 .command(f"POST {process_url}")
222 .http_executor(timeout=300)
223 .retry(limit=3, interval=30)
224 .build()
225 )
227 return (
228 DagBuilder("process_event")
229 .description("Process AuditEvent by calling internal API endpoint")
230 .add_param("event_id", "") # Parameter definition - value provided at runtime
231 .add_step_models(process_step)
232 .build()
233 )
236def deploy_process_event_dag() -> None:
237 """
238 Deploy or update the process_event DAG to the Dagu server.
240 This should be called during application startup when using the Dagu executor
241 to ensure the DAG definition is up-to-date with the current configuration.
243 The DAG is created dynamically to inject the correct base URL from configuration,
244 avoiding hardcoded values that may not work across environments.
245 """
246 import logging
248 log = logging.getLogger(__name__)
250 dag = build_process_event_dag()
251 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url)
253 try:
254 # Try to update first (in case it already exists)
255 client.update_dag(dag)
256 log.info("Updated process_event DAG with base URL: %s", conf.CONF.app_base_url)
257 except Exception:
258 # If update fails, try to create it
259 try:
260 response = client.post_dag(dag)
261 if response is not None:
262 log.info("Created process_event DAG: %s", response.message)
263 else:
264 log.info(
265 "Created process_event DAG with base URL: %s",
266 conf.CONF.app_base_url,
267 )
268 except Exception as e:
269 log.error("Failed to deploy process_event DAG: %s", e)
270 raise
273def generate_datafeed_dag_name(datafeed: Datafeed) -> str:
274 """Generate a unique DAG name for the given datafeed."""
275 return f"feed-{datafeed.id}-{datafeed.uid}"
278def create_datafeed_dag(datafeed: Datafeed) -> None | str:
279 """
280 Create the Dagu DAG for the given datafeed and save it the Dagu server.
281 """
282 dag = build_datafeed_dag(datafeed)
283 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url)
284 response = client.post_dag(dag)
285 if response is not None:
286 return response.message
287 return None
290def delete_datafeed_dag(dag_name: str) -> None:
291 """
292 Delete the Dagu DAG for the given datafeed.
293 """
294 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url)
295 client.delete_dag()
296 return None
299def update_datafeed_dag(datafeed: Datafeed) -> None:
300 """
301 Update the Dagu DAG for the given datafeed.
302 """
303 dag = build_datafeed_dag(datafeed)
304 client = DaguHttpClient(dag.name, conf.CONF.dagu_base_url)
305 client.update_dag(dag)
306 return None
309def build_datafeed_dag(datafeed: Datafeed) -> Dag:
310 """
311 Create a Dagu DAG definition for the given Datafeed.
312 """
313 headers = {
314 header_name: header_value
315 for header in (datafeed.http_headers or [])
316 for header_name, header_value in header.items()
317 }
319 # Step 1: Fetch data from source URL
320 # We use GET by default for fetching data
321 fetch_step = (
322 StepBuilder("fetch-data")
323 .command(f"GET {datafeed.source_url}")
324 .http_executor(
325 headers=headers,
326 timeout=30,
327 )
328 .retry(limit=3, interval=5)
329 .output("FETCH_RESPONSE")
330 .build()
331 )
333 # Step 2: Report status back to PostRFP
334 callback_url = f"{conf.CONF.app_base_url}/internal/api/jobstatus"
336 success_result = JobStatusUpdate.model_construct(
337 execution_id="${execution_id}",
338 status="success",
339 job_type="datafeed_fetch",
340 http_status_code=None,
341 response_body="${FETCH_RESPONSE}",
342 error_message=None,
343 )
345 callback_body = success_result.model_dump_json()
347 report_step = (
348 StepBuilder("report-status")
349 .depends_on("fetch-data")
350 .command(f"POST {callback_url}")
351 .http_executor(
352 headers={"Content-Type": "application/json"},
353 body=callback_body,
354 timeout=30, # Larger timeout for potentially large payloads
355 )
356 .retry(limit=2, interval=3)
357 .build()
358 )
360 # Step 3: Handle failure
361 failure_result = JobStatusUpdate.model_construct(
362 execution_id="${execution_id}",
363 status="failed",
364 job_type="datafeed_fetch",
365 http_status_code=None,
366 response_body="${DAG_RUN_LOG_FILE}",
367 error_message="Datafeed fetch failed after retries",
368 )
370 failure_callback_body = failure_result.model_dump_json()
372 failure_executor = ExecutorConfig(
373 type="http",
374 config=HTTPExecutorConfig(
375 headers={"Content-Type": "application/json"},
376 body=failure_callback_body,
377 timeout=5,
378 silent=None,
379 query=None,
380 skipTLSVerify=None,
381 ),
382 )
384 dag_name = generate_datafeed_dag_name(datafeed)
386 return (
387 DagBuilder(dag_name)
388 .description(f"Datafeed DAG for {datafeed.name}")
389 .add_tag("datafeed")
390 .add_param("execution_id", "")
391 # Add other params if source_url has templates?
392 # For now just execution_id
393 .add_step_models(fetch_step, report_step)
394 .on_failure(command=f"POST {callback_url}", executor=failure_executor)
395 .build()
396 )
399def trigger_datafeed_dag(datafeed: Datafeed, execution_id: int) -> str | None:
400 """
401 Trigger a datafeed DAG execution.
402 """
403 dag_name = generate_datafeed_dag_name(datafeed)
404 client = DaguHttpClient(dag_name, conf.CONF.dagu_base_url)
406 params_str = f"execution_id={execution_id}"
408 start_request = StartDagRun(params=params_str)
409 response = client.start_dag_run(start_request)
411 if hasattr(response, "runId"):
412 return response.runId
413 elif hasattr(response, "dagRunId"):
414 return response.dagRunId
415 else:
416 log.warning(f"DAG start response provided no Run Id: {response}")
417 return None