Coverage for postrfp / jobs / internal / endpoints.py: 76%
34 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1import json
2import logging
4from webob.request import Request
5from sqlalchemy.orm import Session
7from postrfp.shared.decorators import http
8from postrfp.jobs.offload import process_event_task
9from postrfp.model.jobs import JobExecution, JobStatus
10from postrfp.shared import utils
11from .schemas import JobStatusUpdate
14log = logging.getLogger(__name__)
17@http
18def post_webhooklogger(request: Request):
19 """Health check endpoint for webhook logger."""
21 try:
22 # Attempt to parse JSON body
23 pbody = json.dumps(request.json_body, indent=2)
24 print("Received webhook logger payload:", pbody)
25 return {"received": request.json_body}
26 except Exception as e:
27 print("Error parsing JSON body:", e)
28 return {"error": "Invalid JSON body"}
31@http
32def post_event_process(event_id: int):
33 """Process a single AuditEvent. Called by Dagu process_event DAG."""
35 process_event_task(event_id)
36 return {"status": "completed", "event_id": event_id}
39@http
40def post_jobstatus(
41 session: Session,
42 job_status_update: JobStatusUpdate,
43):
44 """Called by webhook DAGs to report delivery status."""
46 execution = session.get(JobExecution, job_status_update.execution_id)
48 if execution is None:
49 raise ValueError(
50 f"JobExecution with id '{job_status_update.execution_id}' was not found"
51 )
53 # Map the result status to the JobStatus enum
54 execution.status = JobStatus[job_status_update.status]
55 execution.job_type = job_status_update.job_type
57 if job_status_update.error_message:
58 execution.error_message = job_status_update.error_message[:500]
60 execution.completed_at = utils.utcnow()
62 return {"status": "recorded", "execution_id": job_status_update.execution_id}