Coverage for postrfp/buyer/api/endpoints/audit.py: 98%
60 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2View and query Audit Event Logs
3"""
5from sqlalchemy.orm import Session
6from sqlalchemy.orm.exc import NoResultFound
8from ....shared import fetch
9from postrfp.shared.decorators import http
10from postrfp.model import (
11 QuestionInstance,
12 Issue,
13 AuditEvent,
14 EventOrgACL,
15 Organisation,
16 Project,
17 User,
18)
19from postrfp.model.audit import evt_types
20from postrfp.model.questionnaire.b36 import from_b36
21from postrfp.authorisation import perms
22from postrfp.shared import serial
23from postrfp.shared.pager import Pager
25from .. import authorise
28@http
29def get_project_events(
30 session: Session, user: User, project_id: int, event_type: str, pager: Pager
31) -> list[serial.SummaryEvent]:
32 """
33 Project‑scoped audit timeline. Returns lightweight events (id, timestamp, user_id, type)
34 visible to the caller’s organisation. Optional filtering by event_type. Supports paging.
35 Permission: PROJECT_ACCESS.
36 """
37 project = fetch.project(session, project_id)
38 authorise.check(user, perms.PROJECT_ACCESS, project=project)
40 ev_query = fetch.project_audit_events(user.organisation, project, event_type)
42 cols = (
43 AuditEvent.id,
44 AuditEvent.timestamp,
45 AuditEvent.user_id,
46 AuditEvent.event_type,
47 )
49 return [
50 serial.SummaryEvent.model_validate(ae)
51 for ae in ev_query.slice(pager.startfrom, pager.goto).with_entities(*cols)
52 ]
55@http
56def get_events(
57 session: Session, user: User, event_type: str, pager: Pager
58) -> list[serial.SummaryEvent]:
59 """
60 Organisation‑wide audit feed across all accessible projects. Optional event_type filter,
61 paged. Permission: MANAGE_ORGANISATION.
62 """
63 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
64 q = fetch.audit_events(user.organisation, event_type=event_type)
65 cols = (
66 AuditEvent.id,
67 AuditEvent.timestamp,
68 AuditEvent.user_id,
69 AuditEvent.event_type,
70 )
71 return [
72 serial.SummaryEvent.model_validate(e)
73 for e in q.slice(pager.startfrom, pager.goto).with_entities(*cols)
74 ]
77@http
78def get_event(session: Session, user: User, event_id: int) -> serial.AuditEvent:
79 """
80 Full details for a single audit event plus resolved context (issue info, question number,
81 section, user, project title when applicable). Returns 404 if not visible to the caller’s
82 organisation. Permission: MANAGE_ORGANISATION.
83 """
84 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation)
85 acl = (
86 session.query(EventOrgACL)
87 .filter(
88 EventOrgACL.event_id == event_id,
89 EventOrgACL.organisation == user.organisation,
90 )
91 .first()
92 )
93 if not acl:
94 raise NoResultFound(f"No Event found for ID {event_id}")
96 event_dict = acl.event.as_dict()
97 issue_id = event_dict.get("issue_id", False)
98 if issue_id and issue_id is not None:
99 try:
100 issue = (
101 session.query(Issue.label, Organisation.id.label("respondent_id"))
102 .join(Organisation)
103 .filter(Issue.id == issue_id)
104 .one()
105 ._asdict()
106 )
107 except NoResultFound: # Issue might have been deleted
108 issue = None
110 event_dict["issue"] = issue
112 question_id = event_dict.get("question_id", False)
113 if question_id:
114 q = (
115 session.query(QuestionInstance.b36_number, QuestionInstance.section_id)
116 .filter(QuestionInstance.id == question_id)
117 .one()
118 )
119 event_dict.update(
120 {"question_number": from_b36(q.b36_number), "section_id": q.section_id}
121 )
122 if acl.event.user:
123 ev_user = acl.event.user
124 event_dict["user"] = serial.User.model_validate(ev_user)
126 if acl.event.project_id:
127 pid = acl.event.project_id
128 pt = session.query(Project.title).filter(Project.id == pid).scalar()
129 event_dict["project_title"] = pt
131 return serial.AuditEvent(**event_dict)
134@http
135def get_event_types(session: Session) -> list[str]:
136 """
137 List of available audit event type identifiers (public constants). Useful for filtering.
138 """
139 return [a for a in dir(evt_types) if not a.startswith("__")]
142@http
143def get_question_events(
144 session: Session, user: User, question_id: int
145) -> list[serial.FullEvent]:
146 """
147 Chronological history of QUESTION and SECTION events for all instances sharing the same
148 underlying question definition as the supplied question instance. Ordered oldest first.
149 Permission: PROJECT_VIEW_QUESTIONNAIRE (section scope).
150 """
151 qi = (
152 session.query(QuestionInstance).filter(QuestionInstance.id == question_id).one()
153 )
154 project = qi.project
155 authorise.check(
156 user,
157 perms.PROJECT_VIEW_QUESTIONNAIRE,
158 project=project,
159 section_id=qi.section_id,
160 )
162 events = (
163 session.query(AuditEvent)
164 .join(QuestionInstance, AuditEvent.question_id == QuestionInstance.id)
165 .filter(
166 QuestionInstance.question_def_id == qi.question_def_id,
167 AuditEvent.event_class.in_(("QUESTION", "SECTION")),
168 )
169 .order_by(AuditEvent.id.asc())
170 )
172 return [serial.FullEvent.model_validate(e) for e in events]