Coverage for postrfp / buyer / api / endpoints / datafeeds.py: 96%

81 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2Manage Datafeeds - configuration for fetching external data. 

3""" 

4 

5from sqlalchemy.orm import Session, subqueryload 

6 

7from postrfp.authorisation import perms 

8from postrfp.shared import serial, fetch 

9from postrfp.shared.decorators import http 

10from postrfp.model.datafeeds import Datafeed 

11from postrfp.model.jobs import JobExecution, JobStatus 

12from postrfp.model.humans import User 

13from postrfp.shared.pager import Pager 

14from postrfp.jobs import dagu 

15 

16from .. import authorise 

17 

18 

19def _check_can_manage(session: Session, feed_org_id: str, user: User): 

20 target_org = user.organisation 

21 if feed_org_id != user.org_id: 

22 target_org = fetch.organisation(session, feed_org_id) 

23 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

24 

25 

26@http 

27def post_feed(session: Session, user: User, feed_doc: serial.Datafeed) -> serial.Id: 

28 """ 

29 Create a new Datafeed configuration. 

30 

31 Configures an external data source that can be fetched and transformed into Content items. 

32 The source URL supports templating (e.g. {project_id}) and the transform expression 

33 uses CEL to map the external JSON format to the internal schema. 

34 """ 

35 _check_can_manage(session, feed_doc.org_id, user) 

36 

37 feed_data = feed_doc.model_dump(exclude={"id", "uid"}) 

38 feed = Datafeed(**feed_data) 

39 session.add(feed) 

40 session.flush() 

41 

42 # Create corresponding Dagu DAG 

43 dagu.create_datafeed_dag(feed) 

44 

45 return serial.Id(id=feed.id) 

46 

47 

48@http 

49def put_feed( 

50 session: Session, user: User, feed_id: int, feed_doc: serial.Datafeed 

51) -> None: 

52 """ 

53 Update an existing Datafeed configuration. 

54 

55 Modifies the source URL, transform expression, or other settings for the specified Datafeed. 

56 Updates the underlying execution DAG to reflect the new configuration. 

57 """ 

58 feed = session.get_one(Datafeed, feed_id) 

59 

60 _check_can_manage(session, feed.org_id, user) 

61 

62 feed.name = feed_doc.name 

63 feed.description = feed_doc.description 

64 feed.source_url = str(feed_doc.source_url) 

65 feed.transform_expression = feed_doc.transform_expression 

66 feed.http_headers = feed_doc.model_dump()["http_headers"] 

67 

68 # Update Dagu DAG 

69 dagu.update_datafeed_dag(feed) 

70 

71 

72@http 

73def get_feeds(session: Session, user: User, q_org_id: str) -> list[serial.Datafeed]: 

74 """ 

75 Fetch Datafeeds for the organisation. 

76 

77 Returns a list of all configured Datafeeds for the current user's organisation, 

78 or the organisation specified by `q_org_id` if the user has permission. 

79 """ 

80 target_org = user.organisation 

81 if q_org_id is not None: 

82 target_org = fetch.organisation(session, q_org_id) 

83 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org) 

84 

85 feeds = session.query(Datafeed).filter(Datafeed.org_id == target_org.id).all() 

86 

87 return [serial.Datafeed.model_validate(f, from_attributes=True) for f in feeds] 

88 

89 

90@http 

91def delete_feed(session: Session, user: User, feed_id: int) -> None: 

92 """ 

93 Delete a Datafeed. 

94 

95 Removes the Datafeed configuration and its associated execution DAG. 

96 Past execution records are preserved but the feed can no longer be triggered. 

97 """ 

98 feed = session.get_one(Datafeed, feed_id) 

99 

100 _check_can_manage(session, feed.org_id, user) 

101 

102 dag_name = dagu.generate_datafeed_dag_name(feed) 

103 dagu.delete_datafeed_dag(dag_name) 

104 

105 session.delete(feed) 

106 

107 

108@http 

109def post_feed_trigger(session: Session, user: User, feed_id: int) -> serial.UID: 

110 """ 

111 Trigger a Datafeed execution. 

112 

113 Initiates an asynchronous job to fetch data from the configured source URL. 

114 Returns the Job ID and UID which can be used to track the execution status. 

115 """ 

116 feed = session.get_one(Datafeed, feed_id) 

117 

118 _check_can_manage(session, feed.org_id, user) 

119 

120 # Create JobExecution record 

121 execution = JobExecution( 

122 job_type="datafeed", 

123 status=JobStatus.pending, 

124 datafeed=feed, 

125 ) 

126 session.add(execution) 

127 session.flush() 

128 

129 # Trigger DAG 

130 run_id = dagu.trigger_datafeed_dag(feed, execution.id) 

131 

132 execution.dagu_run_id = run_id 

133 if not run_id: 

134 execution.status = JobStatus.failed 

135 execution.log = "Failed to trigger DAG" 

136 else: 

137 execution.status = JobStatus.running 

138 

139 return serial.UID(uid=execution.uid) 

140 

141 

142@http 

143def get_feed_executions( 

144 session: Session, user: User, feed_id: int, pager: Pager | None = None 

145) -> serial.DatafeedExecList: 

146 """ 

147 Fetch a paginated list of execution runs for the given datafeed. 

148 

149 Returns the history of job executions for this Datafeed, including status, 

150 timestamps, and any error messages or results. 

151 """ 

152 feed = session.get_one(Datafeed, feed_id) 

153 

154 _check_can_manage(session, feed.org_id, user) 

155 

156 lq = ( 

157 session.query(JobExecution) 

158 .filter(JobExecution.datafeed == feed) 

159 .options(subqueryload(JobExecution.datafeed)) 

160 ) 

161 

162 if pager is None: 

163 pager = Pager(page=1, page_size=50) 

164 

165 total_records = lq.count() 

166 exec_validate = serial.DatafeedExec.model_validate 

167 

168 lq = lq.order_by(JobExecution.created_at.desc()) 

169 lq = lq.slice(pager.startfrom, pager.goto) 

170 

171 records = [exec_validate(r, from_attributes=True) for r in lq] 

172 

173 return serial.DatafeedExecList( 

174 data=records, pagination=pager.as_pagination(total_records, len(records)) 

175 ) 

176 

177 

178@http 

179def get_feed(session: Session, user: User, feed_id: int) -> serial.Datafeed: 

180 """ 

181 Fetch a single Datafeed configuration. 

182 

183 Returns the full configuration details for the specified Datafeed. 

184 """ 

185 feed = session.get_one(Datafeed, feed_id) 

186 

187 _check_can_manage(session, feed.org_id, user) 

188 

189 return serial.Datafeed.model_validate(feed, from_attributes=True)