Coverage for postrfp / buyer / api / endpoints / webhooks.py: 100%

64 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2Manage Webhook Subscriptions - register external URLs to receive notification of system events. 

3""" 

4 

5from sqlalchemy.orm import Session 

6 

7from postrfp.authorisation import perms 

8from postrfp.shared import serial, fetch 

9from postrfp.shared.decorators import http 

10from postrfp.model.notify import WebhookSubscription 

11from postrfp.model.jobs import JobExecution 

12from postrfp.model.humans import User 

13from postrfp.shared.pager import Pager 

14from postrfp.jobs import dagu 

15 

16from .. import authorise 

17 

18 

19def _check_can_manage(session: Session, webhook_org_id: str, user: User): 

20 target_org = user.organisation 

21 if webhook_org_id != user.org_id: 

22 target_org = fetch.organisation(session, webhook_org_id) 

23 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

24 

25 

26@http 

27def post_webhook( 

28 session: Session, user: User, webhook_doc: serial.Webhook 

29) -> serial.Id: 

30 """ 

31 Save a new Webhook subscribing to the event_type given in the body JSON document. 

32 

33 The will sent a POST request to the provided `url` with a JSON encoded body for 

34 each event of the given `event_type`. 

35 """ 

36 _check_can_manage(session, webhook_doc.org_id, user) 

37 whd = webhook_doc.model_dump() 

38 whs = WebhookSubscription(**whd) 

39 session.add(whs) 

40 session.flush() # Ensure webhook.id is populated before creating DAG 

41 

42 # Create corresponding Dagu DAG for this webhook subscription 

43 dagu.create_webhook_dag(whs) 

44 

45 return serial.Id(id=whs.id) 

46 

47 

48@http 

49def put_webhook( 

50 session: Session, user: User, webhook_id: int, webhook_doc: serial.Webhook 

51): 

52 """ 

53 Update the `remote_url` or `http_header` for the Webhook with the given event_type / org_id. 

54 

55 delivery_status and retries are reset to default values 

56 """ 

57 wh = session.get_one(WebhookSubscription, webhook_id) 

58 _check_can_manage(session, wh.org_id, user) 

59 

60 wh.remote_url = str(webhook_doc.remote_url) 

61 wh.guard_policy = webhook_doc.guard_policy 

62 wh.transform_expression = webhook_doc.transform_expression 

63 wh.http_headers = webhook_doc.model_dump()["http_headers"] 

64 

65 # Update the corresponding Dagu DAG with the new configuration 

66 dagu.update_webhook_dag(wh) 

67 

68 

69@http 

70def get_webhooks(session: Session, user: User, q_org_id: str) -> list[serial.Webhook]: 

71 """ 

72 Fetch an array of Webhook objects for the current users' organisation, or, if given, the 

73 organisation indicated by query param `orgId`. 

74 """ 

75 target_org = user.organisation 

76 if q_org_id is not None: 

77 target_org = fetch.organisation(session, q_org_id) 

78 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

79 

80 return [ 

81 serial.Webhook.model_validate(wh, from_attributes=True) 

82 for wh in target_org.webhook_subscriptions 

83 ] 

84 

85 

86@http 

87def delete_webhook(session: Session, user: User, webhook_id: int): 

88 """ 

89 Delete the webhook for the given `event_type` - `org_id` combination 

90 """ 

91 whs = session.get_one(WebhookSubscription, webhook_id) 

92 

93 _check_can_manage(session, whs.org_id, user) 

94 

95 # Delete the corresponding Dagu DAG before removing the subscription 

96 dag_name = dagu.generate_dag_name(whs) 

97 dagu.delete_webhook_dag(dag_name) 

98 

99 session.delete(whs) 

100 

101 

102@http 

103def get_webhook_events(session: Session, user: User) -> list[str]: 

104 """ 

105 Fetch an array of event types which accept webhook subscriptions 

106 """ 

107 from postrfp.jobs.events.action import webhook_evt_types 

108 

109 return webhook_evt_types() 

110 

111 

112@http 

113def get_webhook_executions( 

114 session: Session, user: User, webhook_id: int, pager: Pager | None = None 

115) -> serial.WebhookExecList: 

116 """ 

117 Fetch a paginated list of execution runs for the given webhook 

118 """ 

119 whs = session.get_one(WebhookSubscription, webhook_id) 

120 

121 _check_can_manage(session, whs.org_id, user) 

122 

123 lq = session.query(JobExecution).filter(JobExecution.webhook == whs) 

124 

125 if pager is None: 

126 pager = Pager(page=1, page_size=50) 

127 

128 total_records = lq.count() 

129 exec_validate = serial.WebhookExec.model_validate 

130 

131 lq = lq.order_by(JobExecution.created_at.desc()) 

132 lq = lq.slice(pager.startfrom, pager.goto) 

133 

134 records = [exec_validate(r, from_attributes=True) for r in lq] 

135 

136 return serial.WebhookExecList( 

137 data=records, pagination=pager.as_pagination(total_records, len(records)) 

138 )