Coverage for postrfp/shared/fetch/answq.py: 100%
42 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from typing import Dict, Any, List
2from collections import defaultdict
4from sqlalchemy.orm import Session, Query, object_session, joinedload, subqueryload
5from sqlalchemy.sql.elements import ColumnElement
6from sqlalchemy import select, func
8from postrfp.model.questionnaire.answering import Answer, ResponseStatus
9from postrfp.model import (
10 Issue,
11 QuestionInstance,
12 QuestionDefinition,
13 QElement,
14 User,
15 Project,
16 Participant,
17 QuestionResponseState,
18 AAttachment,
19 Section,
20 SectionPermission,
21)
24# Temporarily living with Answer queries until
25# more Issue queries arise
26def issue(session: Session, issue_id: int) -> Issue:
27 return session.query(Issue).filter(Issue.id == issue_id).one()
30def answer(session: Session, answer_id: int) -> Answer:
31 return session.query(Answer).filter(Answer.id == answer_id).one()
34def answers_in_issues_query(
35 session: Session, project_id: int, issue_id_set: set
36) -> Query:
37 return (
38 session.query(Answer)
39 .join(Issue)
40 .filter(Issue.project_id == project_id, Answer.issue_id.in_(issue_id_set))
41 )
44def visible_answers(
45 session: Session, question_instance: QuestionInstance
46) -> Dict[str, Dict[str, str]]:
47 scoreable_filter: ColumnElement[bool] = Issue.status.in_(
48 ("Submitted", "Updateable")
49 )
51 answers_query = (
52 session.query(Answer)
53 .filter(Answer.question_instance_id == question_instance.id)
54 .join(Answer.issue)
55 .filter(scoreable_filter)
56 )
58 qelements: defaultdict[str, dict[str, str]] = defaultdict(dict)
59 for answer in answers_query.all():
60 qelements[str(answer.issue_id)][str(answer.element_id)] = answer.answer
62 return qelements
65def element_answers(session: Session, user: User, element_id: int) -> List[Any]:
66 q = (
67 session.query(
68 Answer.id.label("answer_id"),
69 Answer.answer,
70 Issue.respondent_id,
71 Issue.id.label("issue_id"),
72 Project.id.label("project_id"),
73 Project.title.label("project_title"),
74 Project.date_published,
75 )
76 .join(Issue, Issue.id == Answer.issue_id)
77 .join(Project)
78 .join(Participant)
79 .filter(
80 Answer.element_id == element_id,
81 Participant.organisation == user.organisation,
82 Issue.status.in_(("Submitted", "Updateable")),
83 )
84 )
86 return q.all()
89def importable_answer_lookup(
90 session: Session, source_issue: Issue, target_project: Project, sec_number: str
91) -> Query:
92 from postrfp.shared.fetch import _question_ids_q
94 sel_al = (
95 _question_ids_q(session, target_project.id, sec_number=sec_number)
96 .subquery()
97 .alias()
98 )
99 lq = (
100 session.query(
101 Answer.element_id,
102 Answer.answer,
103 QuestionDefinition.id.label("question_def_id"),
104 )
105 .join(QElement, Answer.element_id == QElement.id)
106 .join(QuestionDefinition)
107 .join(QuestionInstance)
108 .join(QuestionResponseState)
109 .join(sel_al, sel_al.c.question_def_id == QuestionInstance.question_def_id)
110 .filter(
111 Answer.issue == source_issue,
112 QuestionInstance.project_id == source_issue.project_id,
113 QuestionResponseState.issue == source_issue,
114 )
115 .filter(
116 QuestionResponseState.status.in_(
117 [ResponseStatus.ANSWERED, ResponseStatus.APPROVED]
118 )
119 )
120 )
121 return lq
124def answer_attachments_q(project: Project, user: User) -> Query:
125 """
126 Returns a list of AAttachment (Answer Attachment) objects from
127 scoreable issues in the given project
128 """
129 session = object_session(project)
130 assert session is not None
131 issue_filter = Issue.scoreable_filter(project)
132 q = (
133 session.query(AAttachment)
134 .join(Answer)
135 .join(Issue)
136 .filter(issue_filter)
137 .filter(Issue.project_id == project.id)
138 .options(
139 joinedload(AAttachment.answer),
140 joinedload(AAttachment.answer, Answer.question_instance),
141 subqueryload(AAttachment.answer, Answer.issue),
142 joinedload(AAttachment.answer, Answer.issue, Issue.respondent),
143 )
144 )
146 if user.is_restricted:
147 q = (
148 q.join(QuestionInstance)
149 .join(QuestionDefinition)
150 .join(Section)
151 .join(SectionPermission)
152 .filter(SectionPermission.user == user)
153 )
155 return q
158def answering_stats(issue: Issue, section: Section) -> List[Dict[str, Any]]:
159 QR = QuestionResponseState
161 q = (
162 select(QR.allocated_to, QR.status, func.count(QR.id).label("question_count"))
163 .join(Issue)
164 .select_from(QR)
165 .join(QuestionInstance)
166 .join(Section)
167 .filter(Section.b36_number.startswith(section.b36_number))
168 .filter(Issue.id == issue.id)
169 .group_by(QR.allocated_to, QR.status)
170 )
171 session = object_session(issue)
172 assert session is not None
174 return [
175 dict(
176 status=si.status.name,
177 allocated_to=si.allocated_to,
178 question_count=si.question_count,
179 )
180 for si in session.execute(q)
181 ]