Coverage for postrfp/shared/fetch/quesq.py: 99%
121 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import logging
2from typing import (
3 Iterable,
4 List,
5 Optional,
6 Dict,
7 Any,
8)
10from sqlalchemy import (
11 select,
12 func,
13 or_,
14 and_,
15 desc,
16)
17from sqlalchemy.orm import (
18 joinedload,
19 lazyload,
20 Bundle,
21 Query,
22 Session,
23)
24from sqlalchemy.orm.exc import NoResultFound
25from sqlalchemy.orm.session import object_session
28from postrfp.model.questionnaire.answering import (
29 QuestionResponseState,
30 ResponseStatus,
31)
32from postrfp.model import (
33 Section,
34 QuestionInstance,
35 QuestionDefinition,
36 QElement,
37 Issue,
38 Project,
39)
40from postrfp.model.questionnaire.qelements import QAttachment
41from postrfp.model.questionnaire.b36 import from_b36
44log = logging.getLogger(__name__)
47def qelement(session: Session, q_element_id: int) -> QElement:
48 element = session.get(QElement, q_element_id)
49 if element is None:
50 raise NoResultFound(f"No QElement found with id {q_element_id}")
51 return element
54def question(
55 session: Session, question_id: int, with_project=False
56) -> QuestionInstance:
57 """
58 Fetch a question instance by ID, eager loading the project relationship.
59 """
60 if with_project:
61 query = session.query(QuestionInstance).options(
62 joinedload(QuestionInstance.project)
63 )
64 query = query.options(joinedload(QuestionInstance.project))
65 return query.filter(QuestionInstance.id == question_id).one()
66 else:
67 return session.get_one(QuestionInstance, question_id)
70def question_of_project(project: Project, question_id: int) -> QuestionInstance:
71 """
72 Fetch a question instance if it belongs to the given project.
73 It is assumed the user has access to the project.
74 Raises
75 ------
76 NoResultFound
77 """
78 return project.questions.filter(QuestionInstance.id == question_id).one()
81def question_of_section(
82 session: Session, section_id: int, question_id: int
83) -> QuestionInstance:
84 """
85 Fetch question instance by ID only if it belongs in the section whose ID provided
86 Useful when a sections permission have been checked
87 """
88 return (
89 session.query(QuestionInstance)
90 .filter(
91 QuestionInstance.id == question_id,
92 QuestionInstance.section_id == section_id,
93 )
94 .one()
95 )
98def question_instance_by_number(
99 session: Session, project_id: int, qnode_number: str
100) -> QuestionInstance:
101 return (
102 session.query(QuestionInstance)
103 .filter(
104 QuestionInstance.b36_number == qnode_number,
105 QuestionInstance.project_id == project_id,
106 )
107 .one()
108 )
111ElBundle: Bundle = Bundle(
112 "element_bundle",
113 QElement.id,
114 QElement.label,
115 QElement.row,
116 QElement.col,
117 QElement.colspan,
118 QElement.mandatory,
119 QElement.height,
120 QElement.width,
121 QElement.regexp,
122 QElement.el_type.label("el_type"),
123 QElement.rowspan,
124 QElement.choices,
125 QuestionInstance.id.label("question_id"),
126 QuestionInstance.b36_number,
127 QuestionDefinition.title,
128 single_entity=True,
129)
131QAttBundle: Bundle = Bundle(
132 "qatt_bundle",
133 QAttachment.id,
134 QAttachment.element_id,
135 QAttachment.size_bytes,
136 QAttachment.filename,
137 QAttachment.mimetype,
138 single_entity=True,
139)
142def response_states(
143 issue: Issue, section_id: Optional[int] = None
144) -> Dict[int, QuestionResponseState]:
145 """
146 Returns a dictionary of QuestionResponseState objects keyed by question_instance_id
147 """
148 session = object_session(issue)
149 assert session is not None
150 QRS = QuestionResponseState
151 q = session.query(QRS).join(QuestionInstance).filter(QRS.issue == issue)
153 if section_id is not None:
154 q = q.filter(QuestionInstance.section_id == section_id)
156 return {qrs.question_instance_id: qrs for qrs in q}
159def answered_questions(issue: Issue, section_id: int) -> Iterable[Dict[str, Any]]:
160 session = object_session(issue)
161 assert session is not None
163 aq = issue.answers.join(QuestionInstance).filter(
164 QuestionInstance.section_id == section_id
165 )
167 answer_lookup = {a.element_id: a.answer for a in aq}
169 q_filter = QuestionInstance.section_id == section_id
170 atq = (
171 session.query(QAttBundle)
172 .join(QElement)
173 .join(QuestionDefinition)
174 .join(QuestionInstance)
175 .filter(QuestionInstance.section_id == section_id)
176 )
178 qatt_lookup = {qa.element_id: qa._asdict() for qa in atq}
180 response_state_lookup = response_states(issue, section_id=section_id)
182 q = (
183 session.query(ElBundle)
184 .join(QuestionDefinition)
185 .join(QuestionInstance)
186 .filter(q_filter)
187 .order_by(
188 QuestionInstance.b36_number,
189 QuestionInstance.position,
190 QElement.row,
191 QElement.col,
192 )
193 )
195 return iter_quick_questions(q, answer_lookup, qatt_lookup, response_state_lookup)
198def iter_quick_questions(
199 elbundle_query: Query,
200 answer_lookup: Dict[int, Any],
201 qatt_lookup: Dict[int, Any],
202 response_state_lookup: Dict[int, QuestionResponseState],
203) -> Iterable[Dict[str, Any]]:
204 """
205 Builds question dictionaries from ElBundle rows
207 This is about twice as fast as approaches going via the ORM
208 """
210 answerables = {"TX", "CB", "CR", "CC", "AT"}
211 labelled = {"LB", "QA", "CB"}
212 current_question = None
213 current_question_id = None
214 # row_list = None
215 current_row = None
217 for el in elbundle_query:
218 el_type = el.el_type
220 if el.question_id != current_question_id:
221 # starting a new question
222 if current_question is not None:
223 # if not the first question, yield the previous one
224 yield current_question
226 current_question_id = el.question_id
227 row_list: list[list[dict]] = []
228 current_row = -1
229 current_question = {
230 "title": el.title,
231 "id": current_question_id,
232 "number": from_b36(el.b36_number),
233 "elements": row_list,
234 }
235 rsjs = response_state_lookup[current_question_id].as_dict()
236 current_question["response_state"] = rsjs
238 if el.row > current_row:
239 row_list.append([])
241 current_row = el.row
243 el_dict = {
244 "id": el.id,
245 "el_type": el.el_type,
246 "colspan": el.colspan,
247 "rowspan": el.rowspan,
248 }
250 if el_type in labelled:
251 el_dict["label"] = el.label
252 if el_type == "QA" and el.id in qatt_lookup:
253 el_dict["attachment"] = qatt_lookup[el.id]
255 if el_type in answerables:
256 el_dict["answer"] = answer_lookup.get(el.id, None)
258 if el_type in ("CR", "CC"):
259 el_dict["choices"] = [{"label": c["label"]} for c in el.choices]
261 elif el_type == "TX":
262 el_dict["height"] = el.height
263 el_dict["width"] = el.width
264 el_dict["regexp"] = el.regexp
266 if el_type != "CB":
267 el_dict["mandatory"] = el.mandatory
269 row_list[-1].append(el_dict)
271 if current_question is not None:
272 yield current_question # don't forget the last one!!
275def questionnaire_stats(session: Session, project_id: int) -> Dict[str, Any]:
276 aec = (
277 session.query(QElement.el_type, func.count(QElement.id))
278 .join(QuestionDefinition)
279 .join(QuestionInstance)
280 .filter(QuestionInstance.project_id == project_id)
281 .group_by(QElement.el_type)
282 )
284 el_counts: dict[str, int] = dict((el_type, count) for el_type, count in aec)
285 answerable_count = 0
287 for answerable_type in QElement.answerable_types:
288 if answerable_type in el_counts:
289 answerable_count += el_counts[answerable_type]
291 el_counts["answerable_elements"] = answerable_count
292 qi_count = (
293 session.query(QuestionInstance.id).filter_by(project_id=project_id).count()
294 )
295 sec_count = session.query(Section.id).filter_by(project_id=project_id).count()
297 return {"questions": qi_count, "sections": sec_count, "elements": el_counts}
300def unanswered_mandatory(issue: Issue) -> Query:
301 """
302 Return a query fetching QuestionInstance id and number fields for unanswered, mandatory
303 questions for the given issue
304 """
305 session = object_session(issue)
306 assert session is not None
307 return (
308 session.query(QuestionInstance.id, QuestionInstance.b36_number)
309 .join(QuestionDefinition)
310 .join(QElement)
311 .join(QuestionResponseState)
312 .filter(QElement.mandatory == 1)
313 .filter(QuestionInstance.project == issue.project)
314 .filter(QuestionResponseState.status == ResponseStatus.NOT_ANSWERED)
315 .filter(QuestionResponseState.issue_id == issue.id)
316 .distinct()
317 )
320def _question_ids_q(
321 session: Session, target_project_id: int, sec_number: Optional[str] = None
322) -> Query:
323 # IDs of all question definitions in target (destination) issue
324 sel = session.query(QuestionInstance.question_def_id).filter(
325 QuestionInstance.project_id == target_project_id
326 )
327 if sec_number is not None:
328 sel = sel.filter(QuestionInstance.b36_number.startswith(str(sec_number)))
329 return sel
332def importable_answers(
333 session: Session, target_issue: Issue, sec_number: Optional[str] = None
334) -> Query:
335 """Get a count of answered questions in source_issue that can be imported into
336 target_issue
337 """
339 sel_al = (
340 _question_ids_q(session, target_issue.project_id, sec_number=sec_number)
341 .subquery()
342 .alias()
343 )
345 q = (
346 session.query(
347 Issue.id.label("issue_id"),
348 Issue.issue_date,
349 Issue.submitted_date,
350 Project.title,
351 func.count(QuestionResponseState.id).label("question_count"),
352 )
353 .join(QuestionResponseState.question_instance)
354 .join(Issue)
355 .join(Project)
356 .join(sel_al, sel_al.c.question_def_id == QuestionInstance.question_def_id)
357 .filter(
358 Issue.respondent_id == target_issue.respondent_id,
359 Issue.id != target_issue.id,
360 Issue.status.in_(["Accepted", "Submitted", "Updateable"]),
361 QuestionResponseState.status.in_(
362 [ResponseStatus.ANSWERED, ResponseStatus.APPROVED]
363 ),
364 )
365 .group_by(Issue.id)
366 .order_by(desc("question_count"))
367 )
369 return q
372def duplicated_qdefs(
373 session: Session,
374 destination_project_id: int,
375 source_project_id: int,
376 src_sections: List[Section],
377 src_questions: List[QuestionInstance],
378) -> List[QuestionInstance]:
379 """
380 Returns a list of QuestionInstance objects belonging to the Project given by
381 destination_project_id and whose QuestionDefintion is shared with QuestionInstances in
382 src_questions or src_sections
383 """
384 # Find QuestionDefinition ids for all questions to be imported from the source project
385 condition = []
386 if src_sections:
387 regex = "|".join([f"^{s.b36_number}" for s in src_sections])
388 condition.append(
389 and_(
390 QuestionInstance.b36_number.op("REGEXP")(regex),
391 QuestionInstance.project_id == source_project_id,
392 )
393 )
395 if src_questions:
396 condition.append(QuestionInstance.id.in_([q.id for q in src_questions]))
398 source_qids = select(QuestionInstance.question_def_id).where(or_(*condition))
400 # Find QuestionInstances in the destination project that share the same QuestionDefinitions
401 # as in the source project
402 qi_query = (
403 session.query(QuestionInstance)
404 .filter(QuestionInstance.question_def_id.in_(source_qids))
405 .filter(QuestionInstance.project_id == destination_project_id)
406 .options(lazyload(QuestionInstance.question_def))
407 )
409 return qi_query.all()