Coverage for postrfp / buyer / api / endpoints / datafeeds.py: 96%
81 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1"""
2Manage Datafeeds - configuration for fetching external data.
3"""
5from sqlalchemy.orm import Session, subqueryload
7from postrfp.authorisation import perms
8from postrfp.shared import serial, fetch
9from postrfp.shared.decorators import http
10from postrfp.model.datafeeds import Datafeed
11from postrfp.model.jobs import JobExecution, JobStatus
12from postrfp.model.humans import User
13from postrfp.shared.pager import Pager
14from postrfp.jobs import dagu
16from .. import authorise
19def _check_can_manage(session: Session, feed_org_id: str, user: User):
20 target_org = user.organisation
21 if feed_org_id != user.org_id:
22 target_org = fetch.organisation(session, feed_org_id)
23 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
26@http
27def post_feed(session: Session, user: User, feed_doc: serial.Datafeed) -> serial.Id:
28 """
29 Create a new Datafeed configuration.
31 Configures an external data source that can be fetched and transformed into Content items.
32 The source URL supports templating (e.g. {project_id}) and the transform expression
33 uses CEL to map the external JSON format to the internal schema.
34 """
35 _check_can_manage(session, feed_doc.org_id, user)
37 feed_data = feed_doc.model_dump(exclude={"id", "uid"})
38 feed = Datafeed(**feed_data)
39 session.add(feed)
40 session.flush()
42 # Create corresponding Dagu DAG
43 dagu.create_datafeed_dag(feed)
45 return serial.Id(id=feed.id)
48@http
49def put_feed(
50 session: Session, user: User, feed_id: int, feed_doc: serial.Datafeed
51) -> None:
52 """
53 Update an existing Datafeed configuration.
55 Modifies the source URL, transform expression, or other settings for the specified Datafeed.
56 Updates the underlying execution DAG to reflect the new configuration.
57 """
58 feed = session.get_one(Datafeed, feed_id)
60 _check_can_manage(session, feed.org_id, user)
62 feed.name = feed_doc.name
63 feed.description = feed_doc.description
64 feed.source_url = str(feed_doc.source_url)
65 feed.transform_expression = feed_doc.transform_expression
66 feed.http_headers = feed_doc.model_dump()["http_headers"]
68 # Update Dagu DAG
69 dagu.update_datafeed_dag(feed)
72@http
73def get_feeds(session: Session, user: User, q_org_id: str) -> list[serial.Datafeed]:
74 """
75 Fetch Datafeeds for the organisation.
77 Returns a list of all configured Datafeeds for the current user's organisation,
78 or the organisation specified by `q_org_id` if the user has permission.
79 """
80 target_org = user.organisation
81 if q_org_id is not None:
82 target_org = fetch.organisation(session, q_org_id)
83 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=target_org)
85 feeds = session.query(Datafeed).filter(Datafeed.org_id == target_org.id).all()
87 return [serial.Datafeed.model_validate(f, from_attributes=True) for f in feeds]
90@http
91def delete_feed(session: Session, user: User, feed_id: int) -> None:
92 """
93 Delete a Datafeed.
95 Removes the Datafeed configuration and its associated execution DAG.
96 Past execution records are preserved but the feed can no longer be triggered.
97 """
98 feed = session.get_one(Datafeed, feed_id)
100 _check_can_manage(session, feed.org_id, user)
102 dag_name = dagu.generate_datafeed_dag_name(feed)
103 dagu.delete_datafeed_dag(dag_name)
105 session.delete(feed)
108@http
109def post_feed_trigger(session: Session, user: User, feed_id: int) -> serial.UID:
110 """
111 Trigger a Datafeed execution.
113 Initiates an asynchronous job to fetch data from the configured source URL.
114 Returns the Job ID and UID which can be used to track the execution status.
115 """
116 feed = session.get_one(Datafeed, feed_id)
118 _check_can_manage(session, feed.org_id, user)
120 # Create JobExecution record
121 execution = JobExecution(
122 job_type="datafeed",
123 status=JobStatus.pending,
124 datafeed=feed,
125 )
126 session.add(execution)
127 session.flush()
129 # Trigger DAG
130 run_id = dagu.trigger_datafeed_dag(feed, execution.id)
132 execution.dagu_run_id = run_id
133 if not run_id:
134 execution.status = JobStatus.failed
135 execution.log = "Failed to trigger DAG"
136 else:
137 execution.status = JobStatus.running
139 return serial.UID(uid=execution.uid)
142@http
143def get_feed_executions(
144 session: Session, user: User, feed_id: int, pager: Pager | None = None
145) -> serial.DatafeedExecList:
146 """
147 Fetch a paginated list of execution runs for the given datafeed.
149 Returns the history of job executions for this Datafeed, including status,
150 timestamps, and any error messages or results.
151 """
152 feed = session.get_one(Datafeed, feed_id)
154 _check_can_manage(session, feed.org_id, user)
156 lq = (
157 session.query(JobExecution)
158 .filter(JobExecution.datafeed == feed)
159 .options(subqueryload(JobExecution.datafeed))
160 )
162 if pager is None:
163 pager = Pager(page=1, page_size=50)
165 total_records = lq.count()
166 exec_validate = serial.DatafeedExec.model_validate
168 lq = lq.order_by(JobExecution.created_at.desc())
169 lq = lq.slice(pager.startfrom, pager.goto)
171 records = [exec_validate(r, from_attributes=True) for r in lq]
173 return serial.DatafeedExecList(
174 data=records, pagination=pager.as_pagination(total_records, len(records))
175 )
178@http
179def get_feed(session: Session, user: User, feed_id: int) -> serial.Datafeed:
180 """
181 Fetch a single Datafeed configuration.
183 Returns the full configuration details for the specified Datafeed.
184 """
185 feed = session.get_one(Datafeed, feed_id)
187 _check_can_manage(session, feed.org_id, user)
189 return serial.Datafeed.model_validate(feed, from_attributes=True)