Coverage for postrfp / vendor / api / issue.py: 98%

58 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2Respondent Issue endpoints. 

3 

4Exposes a limited Issue view for respondents: list/retrieve Issues, perform 

5allowed status transitions, toggle workflow usage, and create/list respondent 

6notes (with audit logging where applicable). 

7""" 

8 

9from sqlalchemy import not_, and_ 

10from sqlalchemy.orm import Session, Query 

11 

12from postrfp.model import ( 

13 Issue, 

14 Project, 

15 IssueWatchList, 

16 Organisation, 

17 ProjectNote, 

18 AuditEvent, 

19 User, 

20) 

21from postrfp.authorisation import perms 

22from postrfp.shared import serial, fetch 

23from postrfp.shared.decorators import http 

24from postrfp.shared.issue_transition import change_issue_status 

25from postrfp.vendor.validation import validate 

26 

27# columns exposed to respondent users 

28respondent_columns = ( 

29 Issue.id, 

30 Issue.project_id, 

31 Issue.status, 

32 Issue.issue_date, 

33 Issue.accepted_date, 

34 Issue.deadline, 

35 Issue.label, 

36 Issue.winloss_exposed, 

37 Issue.winloss_expiry, 

38 Issue.submitted_date, 

39 Issue.respondent_id, 

40 Issue.use_workflow, 

41 Project.title, 

42 Project.org_id, 

43 Organisation.name.label("owner_org_name"), 

44 IssueWatchList.date_created.isnot(None).label("is_watched"), 

45) 

46 

47 

48def _issue_q(session: Session, user: User) -> Query: 

49 return ( 

50 session.query(Issue) 

51 .join(Project) 

52 .join(Organisation) 

53 .outerjoin( 

54 IssueWatchList, 

55 and_( 

56 IssueWatchList.user_id == user.id, IssueWatchList.issue_id == Issue.id 

57 ), 

58 ) 

59 .with_entities(*respondent_columns) 

60 ) 

61 

62 

63@http 

64def get_issue( 

65 session: Session, effective_user: User, issue_id: int 

66) -> serial.VendorIssue: 

67 """ 

68 Retrieve a single Issue visible to the respondent (restricted field set including 

69 project title and watch status). Visibility validated against respondent org. 

70 """ 

71 # Using subscript notation here because we want to 

72 # reuse the list of columns but this won't work for returning 

73 # a single value 

74 issue = _issue_q(session, effective_user).filter(Issue.id == issue_id).one() 

75 

76 validate(effective_user, issue) 

77 

78 return serial.VendorIssue(**issue._asdict()) 

79 

80 

81@http 

82def get_issues(session: Session, effective_user: User) -> list[serial.VendorIssue]: 

83 """ 

84 List respondent Issues (excludes Not Sent / Retracted). Ordered by most recent 

85 issue_date descending. 

86 """ 

87 issues = ( 

88 _issue_q(session, effective_user) 

89 .filter( 

90 Issue.respondent_id == effective_user.organisation.id, 

91 not_(Issue.status.in_(("Not Sent", "Retracted"))), 

92 ) 

93 .order_by(Issue.issue_date.desc()) 

94 ) 

95 return [serial.VendorIssue(**i._asdict()) for i in issues] 

96 

97 

98""" Status Changes """ 

99 

100 

101@http 

102def post_issue_status( 

103 session: "Session", 

104 effective_user: "User", 

105 issue_id: int, 

106 issue_status_doc: serial.IssueStatus, 

107) -> None: 

108 """ 

109 Perform an allowed respondent status transition: Accepted, Submitted, Declined. 

110 Enforces mandatory answer / deadline rules on submission. Emits audit events. 

111 """ 

112 issue = fetch.issue(session, issue_id) 

113 status = issue_status_doc.new_status 

114 

115 # This check is already effectively performed in Issue.changed_status 

116 # but double checking here 

117 permitted_statuses = ("Accepted", "Submitted", "Declined") 

118 if status not in permitted_statuses: 

119 raise ValueError("new status must be one of %s" % str(permitted_statuses)) 

120 status_perms = { 

121 "Accepted": perms.ISSUE_ACCEPT, 

122 "Submitted": perms.ISSUE_SUBMIT, 

123 "Declined": perms.ISSUE_DECLINE, 

124 } 

125 action = status_perms[status] 

126 validate(effective_user, issue=issue, action=action) 

127 change_issue_status(issue, effective_user, status) 

128 

129 

130@http 

131def post_issue_workflow( 

132 session: Session, 

133 effective_user: User, 

134 issue_id: int, 

135 issue_workflow_doc: serial.IssueUseWorkflow, 

136) -> None: 

137 """ 

138 Toggle workflow usage flag for this Issue (no audit event). Permission: 

139 ISSUE_UPDATE_WORKFLOW. 

140 """ 

141 issue = fetch.issue(session, issue_id) 

142 validate(effective_user, issue=issue, action=perms.ISSUE_UPDATE_WORKFLOW) 

143 issue.use_workflow = issue_workflow_doc.use_workflow 

144 

145 

146""" NOTES """ 

147 

148 

149@http 

150def post_issue_note( 

151 session: Session, 

152 effective_user: User, 

153 issue_id: int, 

154 respondent_note_doc: serial.RespondentNote, 

155) -> serial.Id: 

156 """ 

157 Create a respondent note (kind=RespondentNote). Private flag restricts visibility 

158 to respondent org. Emits PROJECT_NOTE_ADDED audit event with field changes. 

159 """ 

160 issue = fetch.issue(session, issue_id) 

161 validate(effective_user, issue, action=perms.PROJECT_ADD_NOTE) 

162 project = issue.project 

163 

164 note = ProjectNote( 

165 kind="RespondentNote", 

166 project=project, 

167 note_text=respondent_note_doc.note_text, 

168 private=respondent_note_doc.private, 

169 user_id=effective_user.id, 

170 org_id=effective_user.org_id, 

171 target_org_id=None, # RespondentNotes don't set target_org_id 

172 ) 

173 

174 session.add(note) 

175 session.flush() 

176 

177 evt = AuditEvent.create( 

178 session, 

179 "PROJECT_NOTE_ADDED", 

180 project=project, 

181 object_id=note.id, 

182 user_id=note.user_id, 

183 org_id=note.org_id, 

184 ) 

185 evt.add_change("note_text", "", note.note_text) 

186 evt.add_change("private", "", note.private) 

187 session.add(evt) 

188 return serial.Id(id=note.id) 

189 

190 

191@http 

192def get_issue_notes( 

193 session: Session, effective_user: User, issue_id: int 

194) -> list[serial.ReadNote]: 

195 """ 

196 List notes visible to the respondent for this Issue (privacy / distribution 

197 filtering applied). 

198 """ 

199 issue = fetch.issue(session, issue_id) 

200 validate(effective_user, issue, action=perms.ISSUE_VIEW_ANSWERS) 

201 nq = fetch.vendor_notes(issue, effective_user) 

202 rn = serial.ReadNote 

203 return [rn.model_validate(note) for note in nq]