Coverage for postrfp/buyer/api/endpoints/reports/yesnoqual.py: 99%

99 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from io import BytesIO 

2from functools import partial 

3from collections import defaultdict 

4from itertools import groupby 

5from operator import attrgetter 

6 

7import xlsxwriter # type: ignore[import] 

8from sqlalchemy.orm import Session 

9 

10from postrfp.buyer.api import authorise 

11from postrfp.shared import fetch 

12from postrfp.model.humans import User 

13from postrfp.model.questionnaire.answering import Answer 

14from postrfp.model.questionnaire import nodes 

15from postrfp.shared.decorators import http 

16from postrfp.authorisation import perms 

17from postrfp.shared.constants import MimeTypes 

18from postrfp.shared.response import XAccelResponse 

19from postrfp.model import QuestionInstance, Issue, Score 

20from postrfp.model.composite import QuestionMeta 

21 

22from .responses import attachment 

23 

24 

25@http 

26def get_project_report_yesno( 

27 session: Session, user: User, project_id: int 

28) -> XAccelResponse: 

29 """ 

30 Generate a spreadsheet report of answer to questions with a specific structure of 4 rows: 

31 

32 1. Question Body 

33 2. Multiple choice options, e.g. 'Yes; No; Yes, with Qualifications' 

34 3. Comments box header, e.g. 'Qualifications' 

35 4. Text input field 

36 

37 Questions not matching this structure are ignored 

38 """ 

39 

40 project = fetch.project(session, project_id) 

41 authorise.check(user, perms.PROJECT_ACCESS, project=project, deny_restricted=True) 

42 

43 yes_no_signature = "c2367903b883c29f5007efcb1f751d0b" 

44 

45 query = ( 

46 project.questions.join(nodes.QuestionDefinition) 

47 .join(QuestionMeta) 

48 .filter(QuestionMeta.signature == yes_no_signature) 

49 .order_by(QuestionInstance.b36_number) 

50 ) 

51 

52 questions = query.all() 

53 

54 answer_map: dict[int, dict] = defaultdict(dict) 

55 score_map: dict[int, dict] = defaultdict(dict) 

56 

57 if len(questions) > 0: 

58 q_id_set = {q.id for q in questions} 

59 answers = ( 

60 session.query(Answer) 

61 .join(Issue) 

62 .filter(Issue.project == project, Answer.question_instance_id.in_(q_id_set)) 

63 ) 

64 

65 for a in answers: 

66 answer_map[a.issue_id][a.element_id] = a 

67 

68 scores = ( 

69 session.query(Score) 

70 .join(Issue) 

71 .filter( 

72 Issue.project == project, 

73 Score.scoreset_id == "", 

74 Score.question_instance_id.in_(q_id_set), 

75 ) 

76 ) 

77 for s in scores: 

78 score_map[s.issue_id][s.question_instance_id] = s 

79 

80 buff = BytesIO() 

81 workbook = xlsxwriter.Workbook(buff, {"in_memory": True}) 

82 try: 

83 worksheet = workbook.add_worksheet() 

84 

85 bold_format = workbook.add_format() 

86 bold_format.set_bold() 

87 

88 wrap_format = workbook.add_format() 

89 wrap_format.set_text_wrap() 

90 

91 header = partial(worksheet.write, 0) 

92 headings = ( 

93 "Question Number", 

94 "Title", 

95 "Question Body", 

96 "Respondent", 

97 "Yes/No", 

98 "Comments", 

99 "Score", 

100 "Weight", 

101 ) 

102 for col, heading in enumerate(headings): 

103 header(col, heading, bold_format) 

104 

105 if len(questions) > 0: 

106 row = 0 

107 

108 # Get weights using modern CTE API 

109 weightset_id = project.default_weighting_set_id 

110 if weightset_id is None: 

111 # Fallback to all weights being 1 if no default weighting set 

112 question_weights = {} 

113 else: 

114 weights_data = fetch.total_weightings_dict(project, weightset_id) 

115 # Convert to lookup dict: question_instance_id -> weight 

116 question_weights = { 

117 q["question_instance_id"]: q["weight"] 

118 for q in weights_data["questions"] 

119 } 

120 

121 for instance in questions: 

122 qdef = instance.question_def 

123 els = instance.question_def.elements 

124 label = els[0].label 

125 

126 # Get weight from modern API 

127 weight = question_weights.get(instance.id, 1) 

128 

129 for issue in project.scoreable_issues: 

130 row = row + 1 

131 cell = partial(worksheet.write, row) 

132 issue_map = answer_map[issue.id] 

133 yes_no = issue_map.get(els[1].id, None) 

134 if yes_no is not None: 

135 yes_no = yes_no.answer 

136 

137 comments = issue_map.get(els[3].id, None) 

138 if comments is not None: 

139 comments = comments.answer 

140 try: 

141 score = score_map[issue.id][instance.id].score 

142 except KeyError: # pragma: no cover 

143 score = None 

144 

145 cell(0, instance.number) 

146 cell(1, qdef.title) 

147 cell(2, label) 

148 cell(3, issue.respondent.name) 

149 cell(4, yes_no) 

150 cell(5, comments, wrap_format) 

151 cell(6, score) 

152 cell(7, weight) 

153 finally: 

154 workbook.close() 

155 

156 buff.seek(0) 

157 return attachment(buff.read(), MimeTypes.XLSX.value, f"{project.title[:25]}.xlsx") 

158 

159 

160@http 

161def get_scoretotals(session: Session, user: User, project_ids: list): 

162 """ 

163 Get a list of total scores by Project and Respondent for the projects given 

164 by the ids in project_ids 

165 """ 

166 user.check_permission(perms.ISSUE_VIEW_SCORES) 

167 get_pid = attrgetter("project_id") 

168 rows = [] 

169 score_totals = fetch.score_totals_by_project( 

170 session, user.organisation.id, project_ids 

171 ) 

172 for pid, items in groupby(score_totals, get_pid): 

173 record = {"project_id": pid, "scores": []} 

174 for item in items: 

175 if "project_title" not in record: 

176 record["project_title"] = item.project_title 

177 record["owner"] = item.project_owner 

178 record["published"] = item.date_published 

179 record["scores"].append( 

180 { 

181 "score": round(item.total_weighted_score, ndigits=2), 

182 "respondent": item.respondent_id, 

183 } 

184 ) 

185 rows.append(record) 

186 return rows