Coverage for postrfp/buyer/api/endpoints/reports/yesnoqual.py: 99%
99 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from io import BytesIO
2from functools import partial
3from collections import defaultdict
4from itertools import groupby
5from operator import attrgetter
7import xlsxwriter # type: ignore[import]
8from sqlalchemy.orm import Session
10from postrfp.buyer.api import authorise
11from postrfp.shared import fetch
12from postrfp.model.humans import User
13from postrfp.model.questionnaire.answering import Answer
14from postrfp.model.questionnaire import nodes
15from postrfp.shared.decorators import http
16from postrfp.authorisation import perms
17from postrfp.shared.constants import MimeTypes
18from postrfp.shared.response import XAccelResponse
19from postrfp.model import QuestionInstance, Issue, Score
20from postrfp.model.composite import QuestionMeta
22from .responses import attachment
25@http
26def get_project_report_yesno(
27 session: Session, user: User, project_id: int
28) -> XAccelResponse:
29 """
30 Generate a spreadsheet report of answer to questions with a specific structure of 4 rows:
32 1. Question Body
33 2. Multiple choice options, e.g. 'Yes; No; Yes, with Qualifications'
34 3. Comments box header, e.g. 'Qualifications'
35 4. Text input field
37 Questions not matching this structure are ignored
38 """
40 project = fetch.project(session, project_id)
41 authorise.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True)
43 yes_no_signature = "c2367903b883c29f5007efcb1f751d0b"
45 query = (
46 project.questions.join(nodes.QuestionDefinition)
47 .join(QuestionMeta)
48 .filter(QuestionMeta.signature == yes_no_signature)
49 .order_by(QuestionInstance.b36_number)
50 )
52 questions = query.all()
54 answer_map: dict[int, dict] = defaultdict(dict)
55 score_map: dict[int, dict] = defaultdict(dict)
57 if len(questions) > 0:
58 q_id_set = {q.id for q in questions}
59 answers = (
60 session.query(Answer)
61 .join(Issue)
62 .filter(Issue.project == project, Answer.question_instance_id.in_(q_id_set))
63 )
65 for a in answers:
66 answer_map[a.issue_id][a.element_id] = a
68 scores = (
69 session.query(Score)
70 .join(Issue)
71 .filter(
72 Issue.project == project,
73 Score.scoreset_id == "",
74 Score.question_instance_id.in_(q_id_set),
75 )
76 )
77 for s in scores:
78 score_map[s.issue_id][s.question_instance_id] = s
80 buff = BytesIO()
81 workbook = xlsxwriter.Workbook(buff, {"in_memory": True})
82 try:
83 worksheet = workbook.add_worksheet()
85 bold_format = workbook.add_format()
86 bold_format.set_bold()
88 wrap_format = workbook.add_format()
89 wrap_format.set_text_wrap()
91 header = partial(worksheet.write, 0)
92 headings = (
93 "Question Number",
94 "Title",
95 "Question Body",
96 "Respondent",
97 "Yes/No",
98 "Comments",
99 "Score",
100 "Weight",
101 )
102 for col, heading in enumerate(headings):
103 header(col, heading, bold_format)
105 if len(questions) > 0:
106 row = 0
108 # Get weights using modern CTE API
109 weightset_id = project.default_weighting_set_id
110 if weightset_id is None:
111 # Fallback to all weights being 1 if no default weighting set
112 question_weights = {}
113 else:
114 weights_data = fetch.total_weightings_dict(project, weightset_id)
115 # Convert to lookup dict: question_instance_id -> weight
116 question_weights = {
117 q["question_instance_id"]: q["weight"]
118 for q in weights_data["questions"]
119 }
121 for instance in questions:
122 qdef = instance.question_def
123 els = instance.question_def.elements
124 label = els[0].label
126 # Get weight from modern API
127 weight = question_weights.get(instance.id, 1)
129 for issue in project.scoreable_issues:
130 row = row + 1
131 cell = partial(worksheet.write, row)
132 issue_map = answer_map[issue.id]
133 yes_no = issue_map.get(els[1].id, None)
134 if yes_no is not None:
135 yes_no = yes_no.answer
137 comments = issue_map.get(els[3].id, None)
138 if comments is not None:
139 comments = comments.answer
140 try:
141 score = score_map[issue.id][instance.id].score
142 except KeyError: # pragma: no cover
143 score = None
145 cell(0, instance.number)
146 cell(1, qdef.title)
147 cell(2, label)
148 cell(3, issue.respondent.name)
149 cell(4, yes_no)
150 cell(5, comments, wrap_format)
151 cell(6, score)
152 cell(7, weight)
153 finally:
154 workbook.close()
156 buff.seek(0)
157 return attachment(buff.read(), MimeTypes.XLSX.value, f"{project.title[:25]}.xlsx")
160@http
161def get_scoretotals(session: Session, user: User, project_ids: list):
162 """
163 Get a list of total scores by Project and Respondent for the projects given
164 by the ids in project_ids
165 """
166 user.check_permission(perms.ISSUE_VIEW_SCORES)
167 get_pid = attrgetter("project_id")
168 rows = []
169 score_totals = fetch.score_totals_by_project(
170 session, user.organisation.id, project_ids
171 )
172 for pid, items in groupby(score_totals, get_pid):
173 record = {"project_id": pid, "scores": []}
174 for item in items:
175 if "project_title" not in record:
176 record["project_title"] = item.project_title
177 record["owner"] = item.project_owner
178 record["published"] = item.date_published
179 record["scores"].append(
180 {
181 "score": round(item.total_weighted_score, ndigits=2),
182 "respondent": item.respondent_id,
183 }
184 )
185 rows.append(record)
186 return rows