Coverage for postrfp / buyer / api / endpoints / webhooks.py: 100%
64 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1"""
2Manage Webhook Subscriptions - register external URLs to receive notification of system events.
3"""
5from sqlalchemy.orm import Session
7from postrfp.authorisation import perms
8from postrfp.shared import serial, fetch
9from postrfp.shared.decorators import http
10from postrfp.model.notify import WebhookSubscription
11from postrfp.model.jobs import JobExecution
12from postrfp.model.humans import User
13from postrfp.shared.pager import Pager
14from postrfp.jobs import dagu
16from .. import authorise
19def _check_can_manage(session: Session, webhook_org_id: str, user: User):
20 target_org = user.organisation
21 if webhook_org_id != user.org_id:
22 target_org = fetch.organisation(session, webhook_org_id)
23 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
26@http
27def post_webhook(
28 session: Session, user: User, webhook_doc: serial.Webhook
29) -> serial.Id:
30 """
31 Save a new Webhook subscribing to the event_type given in the body JSON document.
33 The will sent a POST request to the provided `url` with a JSON encoded body for
34 each event of the given `event_type`.
35 """
36 _check_can_manage(session, webhook_doc.org_id, user)
37 whd = webhook_doc.model_dump()
38 whs = WebhookSubscription(**whd)
39 session.add(whs)
40 session.flush() # Ensure webhook.id is populated before creating DAG
42 # Create corresponding Dagu DAG for this webhook subscription
43 dagu.create_webhook_dag(whs)
45 return serial.Id(id=whs.id)
48@http
49def put_webhook(
50 session: Session, user: User, webhook_id: int, webhook_doc: serial.Webhook
51):
52 """
53 Update the `remote_url` or `http_header` for the Webhook with the given event_type / org_id.
55 delivery_status and retries are reset to default values
56 """
57 wh = session.get_one(WebhookSubscription, webhook_id)
58 _check_can_manage(session, wh.org_id, user)
60 wh.remote_url = str(webhook_doc.remote_url)
61 wh.guard_policy = webhook_doc.guard_policy
62 wh.transform_expression = webhook_doc.transform_expression
63 wh.http_headers = webhook_doc.model_dump()["http_headers"]
65 # Update the corresponding Dagu DAG with the new configuration
66 dagu.update_webhook_dag(wh)
69@http
70def get_webhooks(session: Session, user: User, q_org_id: str) -> list[serial.Webhook]:
71 """
72 Fetch an array of Webhook objects for the current users' organisation, or, if given, the
73 organisation indicated by query param `orgId`.
74 """
75 target_org = user.organisation
76 if q_org_id is not None:
77 target_org = fetch.organisation(session, q_org_id)
78 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
80 return [
81 serial.Webhook.model_validate(wh, from_attributes=True)
82 for wh in target_org.webhook_subscriptions
83 ]
86@http
87def delete_webhook(session: Session, user: User, webhook_id: int):
88 """
89 Delete the webhook for the given `event_type` - `org_id` combination
90 """
91 whs = session.get_one(WebhookSubscription, webhook_id)
93 _check_can_manage(session, whs.org_id, user)
95 # Delete the corresponding Dagu DAG before removing the subscription
96 dag_name = dagu.generate_dag_name(whs)
97 dagu.delete_webhook_dag(dag_name)
99 session.delete(whs)
102@http
103def get_webhook_events(session: Session, user: User) -> list[str]:
104 """
105 Fetch an array of event types which accept webhook subscriptions
106 """
107 from postrfp.jobs.events.action import webhook_evt_types
109 return webhook_evt_types()
112@http
113def get_webhook_executions(
114 session: Session, user: User, webhook_id: int, pager: Pager | None = None
115) -> serial.WebhookExecList:
116 """
117 Fetch a paginated list of execution runs for the given webhook
118 """
119 whs = session.get_one(WebhookSubscription, webhook_id)
121 _check_can_manage(session, whs.org_id, user)
123 lq = session.query(JobExecution).filter(JobExecution.webhook == whs)
125 if pager is None:
126 pager = Pager(page=1, page_size=50)
128 total_records = lq.count()
129 exec_validate = serial.WebhookExec.model_validate
131 lq = lq.order_by(JobExecution.created_at.desc())
132 lq = lq.slice(pager.startfrom, pager.goto)
134 records = [exec_validate(r, from_attributes=True) for r in lq]
136 return serial.WebhookExecList(
137 data=records, pagination=pager.as_pagination(total_records, len(records))
138 )