Coverage for postrfp/buyer/api/endpoints/webhooks.py: 96%

56 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Manage Webhook Subscriptions - register external URLs to receive notification of system events. 

3""" 

4 

5from sqlalchemy.orm import Session 

6 

7from postrfp.authorisation import perms 

8from ....shared import fetch 

9from postrfp.shared import serial 

10from postrfp.shared.decorators import http 

11from postrfp.model.notify import WebhookSubscription 

12from postrfp.model.humans import Organisation, User 

13 

14from .. import authorise 

15 

16 

17@http 

18def post_webhook(session: Session, user: User, webhook_doc: serial.NewWebhook): 

19 """ 

20 Save a new Webhook subscribing to the event_type given in the body JSON document. 

21 

22 The will sent a POST request to the provided `url` with a JSON encoded body for each event 

23 of the given `event_type`. 

24 """ 

25 target_org = user.organisation 

26 if webhook_doc.org_id != user.org_id: 

27 target_org = fetch.organisation(session, webhook_doc.org_id) 

28 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

29 whd = webhook_doc.model_dump() 

30 whs = WebhookSubscription(**whd) 

31 session.add(whs) 

32 

33 

34@http 

35def put_webhook(session: Session, user: User, webhook_doc: serial.NewWebhook): 

36 """ 

37 Update the `remote_url` or `http_header` for the Webhook with the given event_type / org_id. 

38 

39 delivery_status and retries are reset to default values 

40 """ 

41 target_org = user.organisation 

42 if webhook_doc.org_id != user.org_id: 

43 target_org = fetch.organisation(session, webhook_doc.org_id) 

44 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

45 whd = webhook_doc 

46 wh = session.get(WebhookSubscription, (whd.org_id, whd.event_type)) 

47 if wh is None: 

48 raise ValueError( 

49 f"No Subscription found for org {whd.org_id} and {whd.event_type}" 

50 ) 

51 wh.remote_url = str(webhook_doc.remote_url) 

52 wh.guard_policy = webhook_doc.guard_policy 

53 wh.transform_expression = webhook_doc.transform_expression 

54 wh.http_headers = webhook_doc.model_dump()["http_headers"] 

55 

56 wh.reset_status() 

57 

58 

59@http 

60def get_webhooks(session: Session, user: User, q_org_id: str) -> list[serial.Webhook]: 

61 """ 

62 Fetch an array of Webhook objects for the current users' organisation, or, if given, the 

63 organisation indicated by query param `orgId`. 

64 """ 

65 target_org = user.organisation 

66 if q_org_id is not None: 

67 target_org = fetch.organisation(session, q_org_id) 

68 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

69 return [ 

70 serial.Webhook.model_validate(wh) for wh in target_org.webhook_subscriptions 

71 ] 

72 

73 

74@http 

75def delete_webhook(session: Session, user: User, webhook_doc: serial.NewWebhook): 

76 """ 

77 Delete the webhook for the given `event_type` - `org_id` combination 

78 """ 

79 if webhook_doc.org_id != user.org_id: 

80 target_org = session.get(Organisation, webhook_doc.org_id) 

81 if target_org is None: 

82 raise ValueError(f"Organisation {webhook_doc.org_id} not found") 

83 else: 

84 target_org = user.organisation 

85 

86 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

87 

88 whd = webhook_doc 

89 whs = session.get(WebhookSubscription, (whd.org_id, whd.event_type)) 

90 if whs is None: 

91 raise ValueError( 

92 f"No Subscription found for org {whd.org_id} and {whd.event_type}" 

93 ) 

94 session.delete(whs) 

95 

96 

97@http 

98def get_webhook_events(session: Session, user: User) -> list[str]: 

99 """ 

100 Fetch an array of event types which accept webhook subscriptions 

101 """ 

102 from postrfp.jobs.events.action import webhook_evt_types 

103 

104 return webhook_evt_types()