Coverage for postrfp/buyer/api/endpoints/webhooks.py: 96%
56 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2Manage Webhook Subscriptions - register external URLs to receive notification of system events.
3"""
5from sqlalchemy.orm import Session
7from postrfp.authorisation import perms
8from ....shared import fetch
9from postrfp.shared import serial
10from postrfp.shared.decorators import http
11from postrfp.model.notify import WebhookSubscription
12from postrfp.model.humans import Organisation, User
14from .. import authorise
17@http
18def post_webhook(session: Session, user: User, webhook_doc: serial.NewWebhook):
19 """
20 Save a new Webhook subscribing to the event_type given in the body JSON document.
22 The will sent a POST request to the provided `url` with a JSON encoded body for each event
23 of the given `event_type`.
24 """
25 target_org = user.organisation
26 if webhook_doc.org_id != user.org_id:
27 target_org = fetch.organisation(session, webhook_doc.org_id)
28 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
29 whd = webhook_doc.model_dump()
30 whs = WebhookSubscription(**whd)
31 session.add(whs)
34@http
35def put_webhook(session: Session, user: User, webhook_doc: serial.NewWebhook):
36 """
37 Update the `remote_url` or `http_header` for the Webhook with the given event_type / org_id.
39 delivery_status and retries are reset to default values
40 """
41 target_org = user.organisation
42 if webhook_doc.org_id != user.org_id:
43 target_org = fetch.organisation(session, webhook_doc.org_id)
44 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
45 whd = webhook_doc
46 wh = session.get(WebhookSubscription, (whd.org_id, whd.event_type))
47 if wh is None:
48 raise ValueError(
49 f"No Subscription found for org {whd.org_id} and {whd.event_type}"
50 )
51 wh.remote_url = str(webhook_doc.remote_url)
52 wh.guard_policy = webhook_doc.guard_policy
53 wh.transform_expression = webhook_doc.transform_expression
54 wh.http_headers = webhook_doc.model_dump()["http_headers"]
56 wh.reset_status()
59@http
60def get_webhooks(session: Session, user: User, q_org_id: str) -> list[serial.Webhook]:
61 """
62 Fetch an array of Webhook objects for the current users' organisation, or, if given, the
63 organisation indicated by query param `orgId`.
64 """
65 target_org = user.organisation
66 if q_org_id is not None:
67 target_org = fetch.organisation(session, q_org_id)
68 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
69 return [
70 serial.Webhook.model_validate(wh) for wh in target_org.webhook_subscriptions
71 ]
74@http
75def delete_webhook(session: Session, user: User, webhook_doc: serial.NewWebhook):
76 """
77 Delete the webhook for the given `event_type` - `org_id` combination
78 """
79 if webhook_doc.org_id != user.org_id:
80 target_org = session.get(Organisation, webhook_doc.org_id)
81 if target_org is None:
82 raise ValueError(f"Organisation {webhook_doc.org_id} not found")
83 else:
84 target_org = user.organisation
86 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
88 whd = webhook_doc
89 whs = session.get(WebhookSubscription, (whd.org_id, whd.event_type))
90 if whs is None:
91 raise ValueError(
92 f"No Subscription found for org {whd.org_id} and {whd.event_type}"
93 )
94 session.delete(whs)
97@http
98def get_webhook_events(session: Session, user: User) -> list[str]:
99 """
100 Fetch an array of event types which accept webhook subscriptions
101 """
102 from postrfp.jobs.events.action import webhook_evt_types
104 return webhook_evt_types()