Coverage for postrfp/jobs/events/webhooks.py: 98%
44 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import logging
3import requests
4from retry_requests import retry # type: ignore[import]
5from sqlalchemy import select
6from sqlalchemy.orm import Session as AlchemySession
7from sqlalchemy.orm.exc import NoResultFound
9from postrfp.model.notify import WebhookSubscription, DeliveryStatus
12log = logging.getLogger(__name__)
15def fetch_and_ping_webhook(
16 session: AlchemySession, org_id: str, event_type: str, ev_model_bytes: bytes
17):
18 """Convenience helper retained (non-async) to look up a subscription and invoke ping.
20 Callers can either:
21 * Invoke directly (synchronous)
22 * Enqueue a workflow task (e.g. via Dagu) that subsequently calls this helper
23 """
24 stmt = select(WebhookSubscription).where(
25 WebhookSubscription.org_id == org_id,
26 WebhookSubscription.event_type == event_type,
27 )
28 try:
29 wh = session.execute(stmt).scalars().one()
30 if wh.delivery_status != DeliveryStatus.aborted:
31 ping_webhook(wh, ev_model_bytes)
32 session.commit()
33 except NoResultFound:
34 log.error(
35 "Ping webhook failed - cannot find WebhookSubscription for org %s, event %s",
36 org_id,
37 event_type,
38 )
39 finally:
40 session.close()
43def ping_webhook(wh: WebhookSubscription, ev_model_bytes: bytes):
44 """
45 POST a json document of Event details to an external url configured by a user of the system
47 On failure retry until the maximum number of tries defined in WebhookSubscription has been
48 reached
49 """
50 headers = {"Content-type": "application/json", "Accept": "text/plain"}
52 if wh.http_headers:
53 for header in wh.http_headers:
54 headers[header["header"]] = header["value"]
56 try:
57 http_session = retry(
58 requests.Session(), retries=wh.MAX_ATTEMPTS, backoff_factor=0.5
59 )
60 with http_session.post(
61 wh.remote_url, ev_model_bytes, timeout=3, headers=headers, stream=True
62 ) as resp:
63 buff = ""
64 for chunk in resp.iter_content(chunk_size=1024):
65 if chunk is None:
66 break
67 buff += chunk.decode("utf-8")
68 if len(buff) > 1024 * 10:
69 log.warning("Webhook response too large, truncating")
70 resp.close()
71 break
72 if resp.status_code == 200:
73 wh.set_delivered()
74 else:
75 msg = buff[:99] if buff else "No response text"
76 wh.set_failed_attempt(f"HTTP status code {resp.status_code}, {msg}")
77 resp.raise_for_status()
78 except Exception as e:
79 log.exception("Error sending webhook to %s", wh.remote_url)
80 wh.delivery_status = DeliveryStatus.failing # Ensure status is updated
81 wh.error_message = str(e)