Coverage for postrfp/buyer/api/endpoints/audit.py: 98%

60 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2View and query Audit Event Logs 

3""" 

4 

5from sqlalchemy.orm import Session 

6from sqlalchemy.orm.exc import NoResultFound 

7 

8from ....shared import fetch 

9from postrfp.shared.decorators import http 

10from postrfp.model import ( 

11 QuestionInstance, 

12 Issue, 

13 AuditEvent, 

14 EventOrgACL, 

15 Organisation, 

16 Project, 

17 User, 

18) 

19from postrfp.model.audit import evt_types 

20from postrfp.model.questionnaire.b36 import from_b36 

21from postrfp.authorisation import perms 

22from postrfp.shared import serial 

23from postrfp.shared.pager import Pager 

24 

25from .. import authorise 

26 

27 

28@http 

29def get_project_events( 

30 session: Session, user: User, project_id: int, event_type: str, pager: Pager 

31) -> list[serial.SummaryEvent]: 

32 """ 

33 Project‑scoped audit timeline. Returns lightweight events (id, timestamp, user_id, type) 

34 visible to the caller’s organisation. Optional filtering by event_type. Supports paging. 

35 Permission: PROJECT_ACCESS. 

36 """ 

37 project = fetch.project(session, project_id) 

38 authorise.check(user, perms.PROJECT_ACCESS, project=project) 

39 

40 ev_query = fetch.project_audit_events(user.organisation, project, event_type) 

41 

42 cols = ( 

43 AuditEvent.id, 

44 AuditEvent.timestamp, 

45 AuditEvent.user_id, 

46 AuditEvent.event_type, 

47 ) 

48 

49 return [ 

50 serial.SummaryEvent.model_validate(ae) 

51 for ae in ev_query.slice(pager.startfrom, pager.goto).with_entities(*cols) 

52 ] 

53 

54 

55@http 

56def get_events( 

57 session: Session, user: User, event_type: str, pager: Pager 

58) -> list[serial.SummaryEvent]: 

59 """ 

60 Organisation‑wide audit feed across all accessible projects. Optional event_type filter, 

61 paged. Permission: MANAGE_ORGANISATION. 

62 """ 

63 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

64 q = fetch.audit_events(user.organisation, event_type=event_type) 

65 cols = ( 

66 AuditEvent.id, 

67 AuditEvent.timestamp, 

68 AuditEvent.user_id, 

69 AuditEvent.event_type, 

70 ) 

71 return [ 

72 serial.SummaryEvent.model_validate(e) 

73 for e in q.slice(pager.startfrom, pager.goto).with_entities(*cols) 

74 ] 

75 

76 

77@http 

78def get_event(session: Session, user: User, event_id: int) -> serial.AuditEvent: 

79 """ 

80 Full details for a single audit event plus resolved context (issue info, question number, 

81 section, user, project title when applicable). Returns 404 if not visible to the caller’s 

82 organisation. Permission: MANAGE_ORGANISATION. 

83 """ 

84 authorise.check(user, perms.MANAGE_ORGANISATION, target_org=user.organisation) 

85 acl = ( 

86 session.query(EventOrgACL) 

87 .filter( 

88 EventOrgACL.event_id == event_id, 

89 EventOrgACL.organisation == user.organisation, 

90 ) 

91 .first() 

92 ) 

93 if not acl: 

94 raise NoResultFound(f"No Event found for ID {event_id}") 

95 

96 event_dict = acl.event.as_dict() 

97 issue_id = event_dict.get("issue_id", False) 

98 if issue_id and issue_id is not None: 

99 try: 

100 issue = ( 

101 session.query(Issue.label, Organisation.id.label("respondent_id")) 

102 .join(Organisation) 

103 .filter(Issue.id == issue_id) 

104 .one() 

105 ._asdict() 

106 ) 

107 except NoResultFound: # Issue might have been deleted 

108 issue = None 

109 

110 event_dict["issue"] = issue 

111 

112 question_id = event_dict.get("question_id", False) 

113 if question_id: 

114 q = ( 

115 session.query(QuestionInstance.b36_number, QuestionInstance.section_id) 

116 .filter(QuestionInstance.id == question_id) 

117 .one() 

118 ) 

119 event_dict.update( 

120 {"question_number": from_b36(q.b36_number), "section_id": q.section_id} 

121 ) 

122 if acl.event.user: 

123 ev_user = acl.event.user 

124 event_dict["user"] = serial.User.model_validate(ev_user) 

125 

126 if acl.event.project_id: 

127 pid = acl.event.project_id 

128 pt = session.query(Project.title).filter(Project.id == pid).scalar() 

129 event_dict["project_title"] = pt 

130 

131 return serial.AuditEvent(**event_dict) 

132 

133 

134@http 

135def get_event_types(session: Session) -> list[str]: 

136 """ 

137 List of available audit event type identifiers (public constants). Useful for filtering. 

138 """ 

139 return [a for a in dir(evt_types) if not a.startswith("__")] 

140 

141 

142@http 

143def get_question_events( 

144 session: Session, user: User, question_id: int 

145) -> list[serial.FullEvent]: 

146 """ 

147 Chronological history of QUESTION and SECTION events for all instances sharing the same 

148 underlying question definition as the supplied question instance. Ordered oldest first. 

149 Permission: PROJECT_VIEW_QUESTIONNAIRE (section scope). 

150 """ 

151 qi = ( 

152 session.query(QuestionInstance).filter(QuestionInstance.id == question_id).one() 

153 ) 

154 project = qi.project 

155 authorise.check( 

156 user, 

157 perms.PROJECT_VIEW_QUESTIONNAIRE, 

158 project=project, 

159 section_id=qi.section_id, 

160 ) 

161 

162 events = ( 

163 session.query(AuditEvent) 

164 .join(QuestionInstance, AuditEvent.question_id == QuestionInstance.id) 

165 .filter( 

166 QuestionInstance.question_def_id == qi.question_def_id, 

167 AuditEvent.event_class.in_(("QUESTION", "SECTION")), 

168 ) 

169 .order_by(AuditEvent.id.asc()) 

170 ) 

171 

172 return [serial.FullEvent.model_validate(e) for e in events]