Coverage for postrfp/jobs/events/webhooks.py: 98%

44 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2 

3import requests 

4from retry_requests import retry # type: ignore[import] 

5from sqlalchemy import select 

6from sqlalchemy.orm import Session as AlchemySession 

7from sqlalchemy.orm.exc import NoResultFound 

8 

9from postrfp.model.notify import WebhookSubscription, DeliveryStatus 

10 

11 

12log = logging.getLogger(__name__) 

13 

14 

15def fetch_and_ping_webhook( 

16 session: AlchemySession, org_id: str, event_type: str, ev_model_bytes: bytes 

17): 

18 """Convenience helper retained (non-async) to look up a subscription and invoke ping. 

19 

20 Callers can either: 

21 * Invoke directly (synchronous) 

22 * Enqueue a workflow task (e.g. via Dagu) that subsequently calls this helper 

23 """ 

24 stmt = select(WebhookSubscription).where( 

25 WebhookSubscription.org_id == org_id, 

26 WebhookSubscription.event_type == event_type, 

27 ) 

28 try: 

29 wh = session.execute(stmt).scalars().one() 

30 if wh.delivery_status != DeliveryStatus.aborted: 

31 ping_webhook(wh, ev_model_bytes) 

32 session.commit() 

33 except NoResultFound: 

34 log.error( 

35 "Ping webhook failed - cannot find WebhookSubscription for org %s, event %s", 

36 org_id, 

37 event_type, 

38 ) 

39 finally: 

40 session.close() 

41 

42 

43def ping_webhook(wh: WebhookSubscription, ev_model_bytes: bytes): 

44 """ 

45 POST a json document of Event details to an external url configured by a user of the system 

46 

47 On failure retry until the maximum number of tries defined in WebhookSubscription has been 

48 reached 

49 """ 

50 headers = {"Content-type": "application/json", "Accept": "text/plain"} 

51 

52 if wh.http_headers: 

53 for header in wh.http_headers: 

54 headers[header["header"]] = header["value"] 

55 

56 try: 

57 http_session = retry( 

58 requests.Session(), retries=wh.MAX_ATTEMPTS, backoff_factor=0.5 

59 ) 

60 with http_session.post( 

61 wh.remote_url, ev_model_bytes, timeout=3, headers=headers, stream=True 

62 ) as resp: 

63 buff = "" 

64 for chunk in resp.iter_content(chunk_size=1024): 

65 if chunk is None: 

66 break 

67 buff += chunk.decode("utf-8") 

68 if len(buff) > 1024 * 10: 

69 log.warning("Webhook response too large, truncating") 

70 resp.close() 

71 break 

72 if resp.status_code == 200: 

73 wh.set_delivered() 

74 else: 

75 msg = buff[:99] if buff else "No response text" 

76 wh.set_failed_attempt(f"HTTP status code {resp.status_code}, {msg}") 

77 resp.raise_for_status() 

78 except Exception as e: 

79 log.exception("Error sending webhook to %s", wh.remote_url) 

80 wh.delivery_status = DeliveryStatus.failing # Ensure status is updated 

81 wh.error_message = str(e)