Coverage for postrfp/vendor/api/questionnaire.py: 100%
132 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import logging
2from typing import Optional
3from operator import attrgetter
4from itertools import groupby
5from collections import defaultdict
7from sqlalchemy.orm.exc import NoResultFound
8from sqlalchemy.orm import subqueryload, Session
9import webob.exc
11from postrfp.model.questionnaire.answering import Answer
12from postrfp.shared import attachments, fetch
13from postrfp.shared.decorators import http
14from postrfp.shared import update
15from ..validation import validate
16from postrfp.model.exc import ValidationFailure
17from postrfp.authorisation import perms
18from postrfp.shared import serial
19from postrfp.model import Section, QuestionInstance, User, Project
20from postrfp.model.questionnaire.b36 import from_b36
22log = logging.getLogger(__name__)
25@http
26def get_issue_tree(
27 session: Session, effective_user: User, issue_id: int, q_with_questions: bool
28) -> serial.Nodes:
29 """
30 Questionnaire structure (section + optional question hierarchy) for the Issue's Project root.
31 Set q_with_questions=true to embed questions at each section node.
32 """
33 issue = fetch.issue(session, issue_id)
34 validate(effective_user, issue)
35 root = fetch.light_tree(session, issue.project_id, with_questions=q_with_questions)
36 return serial.Nodes(**root)
39@http
40def get_issue_question(
41 session: Session, effective_user: User, issue_id: int, node_number: str
42) -> serial.AnsweredQuestion:
43 """
44 A single question (by questionnaire number) with respondent-specific answer state
45 and response workflow metadata (response_state).
46 """
47 issue = fetch.issue(session, issue_id)
48 question = fetch.question_instance_by_number(session, issue.project_id, node_number)
49 validate(effective_user, issue, question=question)
50 qdict = question.single_vendor_dict(issue)
51 qdict["response_state"] = issue.response_state_for_q(question.id).as_dict()
52 # TODO Fix schemas to permit answered questions.
53 return qdict # type: ignore[return-value]
56@http
57def get_issue_sections(
58 session: Session, effective_user: User, issue_id: int
59) -> list[serial.TreeNode]:
60 """
61 Flat list of all sections in the Project (IDs, parent IDs, human numbers, titles)
62 ordered by questionnaire numbering.
63 """
64 issue = fetch.issue(session, issue_id)
65 validate(effective_user, issue)
67 secq = (
68 session.query(Section.id, Section.parent_id, Section.b36_number, Section.title)
69 .filter(Section.project_id == issue.project_id)
70 .order_by(Section.b36_number)
71 )
73 return [
74 serial.TreeNode(
75 id=s.id, parent_id=s.parent_id, number=from_b36(s.b36_number), title=s.title
76 )
77 for s in secq
78 ]
81@http
82def get_issue_section(
83 session: Session, effective_user: User, issue_id: int, section_id: int
84) -> serial.FullSection:
85 """
86 Full section detail including immediate subsections and answered question data for this Issue.
87 """
88 issue = fetch.issue(session, issue_id)
89 section = fetch.section(session, section_id)
90 validate(effective_user, issue, section=section)
91 sec_dict = section.as_dict()
92 sec_dict["subsections"] = [s.as_dict() for s in section.subsections]
93 sec_dict["questions"] = list(fetch.answered_questions(issue, section_id))
94 # TODO Fix schemas to permit answered questions.
95 return sec_dict # type: ignore[return-value]
98@http
99def get_issue_section_stats(
100 session: Session, effective_user: User, issue_id: int, section_id: int
101) -> list[serial.AnswerStats]:
102 """
103 Progress metrics for a section: counts grouped by response status for this Issue.
104 """
105 issue = fetch.issue(session, issue_id)
106 section = fetch.section(session, section_id)
107 validate(effective_user, issue, section=section)
108 return [
109 serial.AnswerStats.model_validate(x)
110 for x in fetch.answering_stats(issue, section)
111 ]
114@http
115def post_issue_question_answers(
116 session: Session,
117 effective_user: User,
118 issue_id: int,
119 question_id: int,
120 answers_doc: serial.ElementAnswerList,
121) -> None:
122 """
123 Save/replace answers for all elements of one question. Attachments must be uploaded
124 via the dedicated attachment endpoint (not here). Rejects empty submission.
125 """
127 if len(answers_doc.root) == 0:
128 raise webob.exc.HTTPBadRequest("No answers provided")
130 issue = fetch.issue(session, issue_id)
131 question = fetch.question(session, question_id)
132 response_state = issue.response_state_for_q(question_id)
134 validate(
135 effective_user,
136 issue=issue,
137 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
138 question=question,
139 response_state=response_state,
140 )
142 answer_lookup = {a.element_id: a.answer for a in answers_doc.root}
144 update.save_answers(session, effective_user, question, answer_lookup, issue)
147@http
148def get_issue_answers_search(
149 session: Session, effective_user: User, issue_id: int, search_term: str
150) -> serial.AnswerSearchList:
151 """
152 Text search across this Issue's answers (case‑sensitive via utf8mb4_bin collation).
153 Returns matching question numbers & IDs.
154 """
155 issue = fetch.issue(session, issue_id)
156 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS)
158 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise
160 q = (
161 session.query(QuestionInstance.id, QuestionInstance.b36_number)
162 .join(Answer)
163 .filter(Answer.issue_id == issue_id)
164 .filter(Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%"))
165 .group_by(QuestionInstance.id)
166 )
168 return serial.AnswerSearchList(
169 matches=[
170 serial.AnswerSearch(number=from_b36(row.b36_number), question_id=row.id)
171 for row in q
172 ]
173 )
176@http
177def post_issue_answers_replace(
178 session: Session,
179 effective_user: User,
180 issue_id: int,
181 replace_doc: serial.TextReplace,
182) -> serial.Count:
183 """
184 Bulk in‑place text replacement (search_term -> replace_term) across all answers
185 in the Issue. Returns count of questions affected. Honors permissions & bulk_import flag.
186 """
187 issue = fetch.issue(session, issue_id)
188 validate(
189 effective_user,
190 issue=issue,
191 bulk_import=True,
192 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
193 )
195 replace_term = replace_doc.replace_term
196 search_term = replace_doc.search_term
198 q_lookup: dict[QuestionInstance, dict[int, str]] = defaultdict(dict)
199 a: Answer
201 # NB use of tf8mb4_bin collation - all searches case insensitive otherwise
202 for a in (
203 session.query(Answer)
204 .options(subqueryload(Answer.question_instance))
205 .filter(
206 Answer.issue_id == issue_id,
207 Answer.answer.collate("utf8mb4_bin").like(f"%{search_term}%"),
208 )
209 ):
210 q_lookup[a.question_instance][a.element_id] = a.answer
212 for qi, answer_lookup in q_lookup.items():
213 for el_id in answer_lookup:
214 answer_lookup[el_id] = answer_lookup[el_id].replace(
215 search_term, replace_term
216 )
218 update.save_answers(session, effective_user, qi, answer_lookup, issue)
219 return serial.Count(description="answers_updated_count", count=len(q_lookup))
222@http
223def delete_issue_answer_element(
224 session: Session, effective_user: User, issue_id: int, element_id: int
225) -> None:
226 """
227 Delete the answer (and any associated attachment) for a single question element
228 in this Issue.
229 """
230 issue = fetch.issue(session, issue_id)
231 qelement = fetch.qelement(session, element_id)
232 question_instance = qelement.get_question_instance(issue.project_id)
233 response_state = issue.response_state_for_q(question_instance.id)
235 validate(
236 effective_user,
237 issue=issue,
238 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
239 question=question_instance,
240 response_state=response_state,
241 )
243 answer = qelement.get_answer(issue)
244 if answer.attachment is not None:
245 try:
246 session.delete(answer.attachment)
247 attachments.delete_from_disc(answer.attachment)
248 except FileNotFoundError:
249 m = (
250 f"Could not delete attachment id {answer.attachment.id}"
251 f" filename {answer.attachment.filename} - File Not Found"
252 )
253 log.warn(m)
255 session.delete(answer)
258@http
259def get_issue_answerimport(
260 session: Session,
261 effective_user: User,
262 issue_id: int,
263 node_number: Optional[str] = None,
264) -> serial.ImportableAnswersList:
265 """
266 Preview importable answers from other Issues in the same Project, optionally
267 restricted to a section (node_number). Counts only; no data mutation.
268 """
269 target_issue = fetch.issue(session, issue_id)
270 validate(effective_user, target_issue, action=perms.ISSUE_VIEW_ANSWERS)
272 res = fetch.importable_answers(session, target_issue, sec_number=node_number)
273 doc = serial.ImportableAnswersList(
274 root=[serial.ImportableAnswers.model_validate(x) for x in res]
275 )
276 return doc
279@http
280def post_issue_answerimport(
281 session: Session,
282 effective_user: User,
283 issue_id: int,
284 import_answers_doc: serial.ImportAnswers,
285) -> serial.AnswerImportResult:
286 """
287 Import answers from another Issue (optionally section‑scoped). Returns imported,
288 unchanged, and error lists per question. Skips invalid or unchanged targets.
289 """
291 target_issue = fetch.issue(session, issue_id)
292 validate(
293 effective_user,
294 target_issue,
295 action=perms.ISSUE_SAVE_QUESTION_RESPONSE,
296 bulk_import=True,
297 )
299 try:
300 source_issue = fetch.issue(session, import_answers_doc.source_issue_id)
301 except NoResultFound:
302 m = f"Source Issue with ID {import_answers_doc.source_issue_id} not found"
303 raise ValueError(m)
304 validate(effective_user, source_issue, action=perms.ISSUE_VIEW_ANSWERS)
306 proj: Project = target_issue.project
308 rows = fetch.importable_answer_lookup(
309 session, source_issue, proj, import_answers_doc.section_number
310 )
312 # create an individual answer lookup dict for each question, keyed by q def id
313 by_qid = {
314 qdef_id: {a.element_id: a.answer for a in answers}
315 for qdef_id, answers in groupby(rows, key=attrgetter("question_def_id"))
316 }
317 src_id_set = by_qid.keys()
319 imported, errors, unchanged = [], [], []
320 for q in proj.questions.filter(
321 QuestionInstance.question_def_id.in_(src_id_set)
322 ).order_by(QuestionInstance.b36_number):
323 try:
324 ref = q.number or f"#{q.id} {q.title}"
325 lookup = by_qid[q.question_def_id]
326 if update.save_answers(
327 session, effective_user, q, lookup, target_issue, imported=True
328 ):
329 imported.append(ref)
330 else:
331 unchanged.append(ref)
332 except ValidationFailure as ve:
333 errors.append(f"{ref}: {ve.message}, {ve.errors_list}")
335 return serial.AnswerImportResult(
336 imported=imported, errors=errors, unchanged=unchanged
337 )