Coverage for postrfp/vendor/api/questionnaire.py: 100%

132 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2from typing import Optional 

3from operator import attrgetter 

4from itertools import groupby 

5from collections import defaultdict 

6 

7from sqlalchemy.orm.exc import NoResultFound 

8from sqlalchemy.orm import subqueryload, Session 

9import webob.exc 

10 

11from postrfp.model.questionnaire.answering import Answer 

12from postrfp.shared import attachments, fetch 

13from postrfp.shared.decorators import http 

14from postrfp.shared import update 

15from ..validation import validate 

16from postrfp.model.exc import ValidationFailure 

17from postrfp.authorisation import perms 

18from postrfp.shared import serial 

19from postrfp.model import Section, QuestionInstance, User, Project 

20from postrfp.model.questionnaire.b36 import from_b36 

21 

22log = logging.getLogger(__name__) 

23 

24 

25@http 

26def get_issue_tree( 

27 session: Session, effective_user: User, issue_id: int, q_with_questions: bool 

28) -> serial.Nodes: 

29 """ 

30 Questionnaire structure (section + optional question hierarchy) for the Issue's Project root. 

31 Set q_with_questions=true to embed questions at each section node. 

32 """ 

33 issue = fetch.issue(session, issue_id) 

34 validate(effective_user, issue) 

35 root = fetch.light_tree(session, issue.project_id, with_questions=q_with_questions) 

36 return serial.Nodes(**root) 

37 

38 

39@http 

40def get_issue_question( 

41 session: Session, effective_user: User, issue_id: int, node_number: str 

42) -> serial.AnsweredQuestion: 

43 """ 

44 A single question (by questionnaire number) with respondent-specific answer state 

45 and response workflow metadata (response_state). 

46 """ 

47 issue = fetch.issue(session, issue_id) 

48 question = fetch.question_instance_by_number(session, issue.project_id, node_number) 

49 validate(effective_user, issue, question=question) 

50 qdict = question.single_vendor_dict(issue) 

51 qdict["response_state"] = issue.response_state_for_q(question.id).as_dict() 

52 # TODO Fix schemas to permit answered questions. 

53 return qdict # type: ignore[return-value] 

54 

55 

56@http 

57def get_issue_sections( 

58 session: Session, effective_user: User, issue_id: int 

59) -> list[serial.TreeNode]: 

60 """ 

61 Flat list of all sections in the Project (IDs, parent IDs, human numbers, titles) 

62 ordered by questionnaire numbering. 

63 """ 

64 issue = fetch.issue(session, issue_id) 

65 validate(effective_user, issue) 

66 

67 secq = ( 

68 session.query(Section.id, Section.parent_id, Section.b36_number, Section.title) 

69 .filter(Section.project_id == issue.project_id) 

70 .order_by(Section.b36_number) 

71 ) 

72 

73 return [ 

74 serial.TreeNode( 

75 id=s.id, parent_id=s.parent_id, number=from_b36(s.b36_number), title=s.title 

76 ) 

77 for s in secq 

78 ] 

79 

80 

81@http 

82def get_issue_section( 

83 session: Session, effective_user: User, issue_id: int, section_id: int 

84) -> serial.FullSection: 

85 """ 

86 Full section detail including immediate subsections and answered question data for this Issue. 

87 """ 

88 issue = fetch.issue(session, issue_id) 

89 section = fetch.section(session, section_id) 

90 validate(effective_user, issue, section=section) 

91 sec_dict = section.as_dict() 

92 sec_dict["subsections"] = [s.as_dict() for s in section.subsections] 

93 sec_dict["questions"] = list(fetch.answered_questions(issue, section_id)) 

94 # TODO Fix schemas to permit answered questions. 

95 return sec_dict # type: ignore[return-value] 

96 

97 

98@http 

99def get_issue_section_stats( 

100 session: Session, effective_user: User, issue_id: int, section_id: int 

101) -> list[serial.AnswerStats]: 

102 """ 

103 Progress metrics for a section: counts grouped by response status for this Issue. 

104 """ 

105 issue = fetch.issue(session, issue_id) 

106 section = fetch.section(session, section_id) 

107 validate(effective_user, issue, section=section) 

108 return [ 

109 serial.AnswerStats.model_validate(x) 

110 for x in fetch.answering_stats(issue, section) 

111 ] 

112 

113 

114@http 

115def post_issue_question_answers( 

116 session: Session, 

117 effective_user: User, 

118 issue_id: int, 

119 question_id: int, 

120 answers_doc: serial.ElementAnswerList, 

121) -> None: 

122 """ 

123 Save/replace answers for all elements of one question. Attachments must be uploaded 

124 via the dedicated attachment endpoint (not here). Rejects empty submission. 

125 """ 

126 

127 if len(answers_doc.root) == 0: 

128 raise webob.exc.HTTPBadRequest("No answers provided") 

129 

130 issue = fetch.issue(session, issue_id) 

131 question = fetch.question(session, question_id) 

132 response_state = issue.response_state_for_q(question_id) 

133 

134 validate( 

135 effective_user, 

136 issue=issue, 

137 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

138 question=question, 

139 response_state=response_state, 

140 ) 

141 

142 answer_lookup = {a.element_id: a.answer for a in answers_doc.root} 

143 

144 update.save_answers(session, effective_user, question, answer_lookup, issue) 

145 

146 

147@http 

148def get_issue_answers_search( 

149 session: Session, effective_user: User, issue_id: int, search_term: str 

150) -> serial.AnswerSearchList: 

151 """ 

152 Text search across this Issue's answers (case‑sensitive via utf8mb4_bin collation). 

153 Returns matching question numbers & IDs. 

154 """ 

155 issue = fetch.issue(session, issue_id) 

156 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS) 

157 

158 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise 

159 

160 q = ( 

161 session.query(QuestionInstance.id, QuestionInstance.b36_number) 

162 .join(Answer) 

163 .filter(Answer.issue_id == issue_id) 

164 .filter(Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%")) 

165 .group_by(QuestionInstance.id) 

166 ) 

167 

168 return serial.AnswerSearchList( 

169 matches=[ 

170 serial.AnswerSearch(number=from_b36(row.b36_number), question_id=row.id) 

171 for row in q 

172 ] 

173 ) 

174 

175 

176@http 

177def post_issue_answers_replace( 

178 session: Session, 

179 effective_user: User, 

180 issue_id: int, 

181 replace_doc: serial.TextReplace, 

182) -> serial.Count: 

183 """ 

184 Bulk in‑place text replacement (search_term -> replace_term) across all answers 

185 in the Issue. Returns count of questions affected. Honors permissions & bulk_import flag. 

186 """ 

187 issue = fetch.issue(session, issue_id) 

188 validate( 

189 effective_user, 

190 issue=issue, 

191 bulk_import=True, 

192 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

193 ) 

194 

195 replace_term = replace_doc.replace_term 

196 search_term = replace_doc.search_term 

197 

198 q_lookup: dict[QuestionInstance, dict[int, str]] = defaultdict(dict) 

199 a: Answer 

200 

201 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise 

202 for a in ( 

203 session.query(Answer) 

204 .options(subqueryload(Answer.question_instance)) 

205 .filter( 

206 Answer.issue_id == issue_id, 

207 Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%"), 

208 ) 

209 ): 

210 q_lookup[a.question_instance][a.element_id] = a.answer 

211 

212 for qi, answer_lookup in q_lookup.items(): 

213 for el_id in answer_lookup: 

214 answer_lookup[el_id] = answer_lookup[el_id].replace( 

215 search_term, replace_term 

216 ) 

217 

218 update.save_answers(session, effective_user, qi, answer_lookup, issue) 

219 return serial.Count(description="answers_updated_count", count=len(q_lookup)) 

220 

221 

222@http 

223def delete_issue_answer_element( 

224 session: Session, effective_user: User, issue_id: int, element_id: int 

225) -> None: 

226 """ 

227 Delete the answer (and any associated attachment) for a single question element 

228 in this Issue. 

229 """ 

230 issue = fetch.issue(session, issue_id) 

231 qelement = fetch.qelement(session, element_id) 

232 question_instance = qelement.get_question_instance(issue.project_id) 

233 response_state = issue.response_state_for_q(question_instance.id) 

234 

235 validate( 

236 effective_user, 

237 issue=issue, 

238 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

239 question=question_instance, 

240 response_state=response_state, 

241 ) 

242 

243 answer = qelement.get_answer(issue) 

244 if answer.attachment is not None: 

245 try: 

246 session.delete(answer.attachment) 

247 attachments.delete_from_disc(answer.attachment) 

248 except FileNotFoundError: 

249 m = ( 

250 f"Could not delete attachment id {answer.attachment.id}" 

251 f" filename {answer.attachment.filename} - File Not Found" 

252 ) 

253 log.warn(m) 

254 

255 session.delete(answer) 

256 

257 

258@http 

259def get_issue_answerimport( 

260 session: Session, 

261 effective_user: User, 

262 issue_id: int, 

263 node_number: Optional[str] = None, 

264) -> serial.ImportableAnswersList: 

265 """ 

266 Preview importable answers from other Issues in the same Project, optionally 

267 restricted to a section (node_number). Counts only; no data mutation. 

268 """ 

269 target_issue = fetch.issue(session, issue_id) 

270 validate(effective_user, target_issue, action=perms.ISSUE_VIEW_ANSWERS) 

271 

272 res = fetch.importable_answers(session, target_issue, sec_number=node_number) 

273 doc = serial.ImportableAnswersList( 

274 root=[serial.ImportableAnswers.model_validate(x) for x in res] 

275 ) 

276 return doc 

277 

278 

279@http 

280def post_issue_answerimport( 

281 session: Session, 

282 effective_user: User, 

283 issue_id: int, 

284 import_answers_doc: serial.ImportAnswers, 

285) -> serial.AnswerImportResult: 

286 """ 

287 Import answers from another Issue (optionally section‑scoped). Returns imported, 

288 unchanged, and error lists per question. Skips invalid or unchanged targets. 

289 """ 

290 

291 target_issue = fetch.issue(session, issue_id) 

292 validate( 

293 effective_user, 

294 target_issue, 

295 action=perms.ISSUE_SAVE_QUESTION_RESPONSE, 

296 bulk_import=True, 

297 ) 

298 

299 try: 

300 source_issue = fetch.issue(session, import_answers_doc.source_issue_id) 

301 except NoResultFound: 

302 m = f"Source Issue with ID {import_answers_doc.source_issue_id} not found" 

303 raise ValueError(m) 

304 validate(effective_user, source_issue, action=perms.ISSUE_VIEW_ANSWERS) 

305 

306 proj: Project = target_issue.project 

307 

308 rows = fetch.importable_answer_lookup( 

309 session, source_issue, proj, import_answers_doc.section_number 

310 ) 

311 

312 # create an individual answer lookup dict for each question, keyed by q def id 

313 by_qid = { 

314 qdef_id: {a.element_id: a.answer for a in answers} 

315 for qdef_id, answers in groupby(rows, key=attrgetter("question_def_id")) 

316 } 

317 src_id_set = by_qid.keys() 

318 

319 imported, errors, unchanged = [], [], [] 

320 for q in proj.questions.filter( 

321 QuestionInstance.question_def_id.in_(src_id_set) 

322 ).order_by(QuestionInstance.b36_number): 

323 try: 

324 ref = q.number or f"#{q.id} {q.title}" 

325 lookup = by_qid[q.question_def_id] 

326 if update.save_answers( 

327 session, effective_user, q, lookup, target_issue, imported=True 

328 ): 

329 imported.append(ref) 

330 else: 

331 unchanged.append(ref) 

332 except ValidationFailure as ve: 

333 errors.append(f"{ref}: {ve.message}, {ve.errors_list}") 

334 

335 return serial.AnswerImportResult( 

336 imported=imported, errors=errors, unchanged=unchanged 

337 )