Coverage for postrfp/shared/fetch/autoscoreq.py: 76%

103 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2from typing import NamedTuple 

3from collections import defaultdict 

4 

5from sqlalchemy.orm import Session, joinedload, object_session 

6 

7 

8from postrfp.model import Project, Answer, Score, Issue, QElement 

9 

10log = logging.getLogger(__name__) 

11 

12 

13class AutoscoreEntry(NamedTuple): 

14 """Typed structure for a generated autoscore entry. 

15 

16 Matches the keys produced by _create_autoscore_entry and consumed by 

17 callers such as postrfp.buyer.api.update.reset_scores. 

18 """ 

19 

20 issue_id: int 

21 respondent: str 

22 question_id: int 

23 scoreset_id: str 

24 score: int 

25 question_number: str | None 

26 

27 

28def _build_score_key(issue, question, scoreset_id): 

29 """Builds a unique key for a score.""" 

30 

31 return "%s:%s:%s" % (issue.id, question.id, scoreset_id) 

32 

33 

34def _get_multichoice_elements_with_choices(question) -> list[QElement]: 

35 """Extract question elements that contain choices for autoscoring.""" 

36 

37 return [ 

38 el 

39 for el in question.question_def.elements 

40 if el.contains_choices and el.choices 

41 ] 

42 

43 

44def _process_answer(answer: Answer, element) -> int | None: 

45 """Process a single answer to extract autoscore value if available.""" 

46 

47 if not element or not element.choices: 

48 return None 

49 

50 for choice in element.choices: 

51 if ( 

52 answer.answer == choice.get("label") 

53 and "autoscore" in choice 

54 and choice["autoscore"] is not None 

55 ): 

56 try: 

57 return int(choice["autoscore"]) 

58 except (ValueError, TypeError): 

59 log.warning( 

60 f"Invalid autoscore format '{choice['autoscore']}' for " 

61 f"element {element.id}, choice '{choice.get('label')}'" 

62 ) 

63 return None 

64 

65 

66def _fetch_and_process_question_answers( 

67 session: Session, question, element_ids: list[int], scoreable_issue_ids: list[int] 

68) -> defaultdict[int, list[int]]: 

69 """Fetch and process answers for a question, returning scores by issue ID.""" 

70 

71 question_score_map = defaultdict(list) 

72 

73 answers_query = ( 

74 session.query(Answer, Answer.element) 

75 .join(Answer.element) 

76 .filter(Answer.question_instance_id == question.id) 

77 .filter(Answer.element_id.in_(element_ids)) 

78 .filter(Answer.issue_id.in_(scoreable_issue_ids)) 

79 ) 

80 

81 # Process all answers in a single query result 

82 for answer, element in answers_query: 

83 score_value = _process_answer(answer, element) 

84 if score_value is not None: 

85 question_score_map[answer.issue_id].append(score_value) 

86 

87 return question_score_map 

88 

89 

90def _fetch_all_autoscore_answers_batch( 

91 session: Session, autoscore_questions: list, scoreable_issue_ids: list[int] 

92) -> dict[int, defaultdict[int, list[int]]]: 

93 """ 

94 Batch fetch all answers for autoscore questions to minimize database queries. 

95 

96 Returns: {question_id: {issue_id: [scores]}} 

97 """ 

98 question_element_map = {} 

99 all_element_ids = [] 

100 

101 # Collect all element IDs for all autoscore questions 

102 for question in autoscore_questions: 

103 multichoice_elements = _get_multichoice_elements_with_choices(question) 

104 element_ids = [el.id for el in multichoice_elements] 

105 if element_ids: 

106 question_element_map[question.id] = { 

107 "element_ids": element_ids, 

108 "elements": {el.id: el for el in multichoice_elements}, 

109 } 

110 all_element_ids.extend(element_ids) 

111 

112 if not all_element_ids: 

113 return {} 

114 

115 # Single query to fetch all relevant answers 

116 answers_query = ( 

117 session.query(Answer) 

118 .filter(Answer.element_id.in_(all_element_ids)) 

119 .filter(Answer.issue_id.in_(scoreable_issue_ids)) 

120 .options(joinedload(Answer.element)) 

121 ) 

122 

123 # Group results by question_id and issue_id 

124 results: dict[int, defaultdict[int, list[int]]] = {} 

125 for answer in answers_query: 

126 question_id = answer.question_instance_id 

127 if question_id not in question_element_map: 

128 continue 

129 

130 if question_id not in results: 

131 results[question_id] = defaultdict(list) 

132 

133 score_value = _process_answer(answer, answer.element) 

134 if score_value is not None: 

135 results[question_id][answer.issue_id].append(score_value) 

136 

137 return results 

138 

139 

140def _calculate_final_score(score_list: list[int], max_score: int) -> int: 

141 """Calculate the final score, applying any capping rules.""" 

142 

143 return min(sum(score_list), max_score) 

144 

145 

146def _create_autoscore_entry( 

147 issue, question, user_id: str, score: int 

148) -> AutoscoreEntry: 

149 """Create an autoscore dictionary entry.""" 

150 

151 return AutoscoreEntry( 

152 issue_id=issue.id, 

153 respondent=issue.respondent.name if issue.respondent else "N/A", 

154 question_id=question.id, 

155 scoreset_id=user_id, 

156 score=score, 

157 question_number=question.number, 

158 ) 

159 

160 

161def generate_autoscores( 

162 project: Project, session: Session, user 

163) -> dict[str, AutoscoreEntry]: 

164 """Generates a dictionary of autoscores for scoreable issues.""" 

165 

166 if project.status_name != "Live": 

167 return {} 

168 

169 autoscores_dict: dict[str, AutoscoreEntry] = {} 

170 

171 # Pre-fetch scoreable issues to avoid repeated queries inside the loop 

172 scoreable_issue_map = {iss.id: iss for iss in project.scoreable_issues} 

173 if not scoreable_issue_map: 

174 return {} 

175 

176 # Get all autoscore questions 

177 autoscore_questions = [ 

178 q for q in project.query_visible_questions(user) if q.is_autoscored 

179 ] 

180 

181 if not autoscore_questions: 

182 return {} 

183 

184 # Batch fetch all answers for all autoscore questions 

185 all_question_scores = _fetch_all_autoscore_answers_batch( 

186 session, autoscore_questions, list(scoreable_issue_map.keys()) 

187 ) 

188 

189 # Process each question's results 

190 for question in autoscore_questions: 

191 question_score_map: dict = all_question_scores.get(question.id, {}) 

192 

193 # Finalize scores for this question and populate autoscores_dict 

194 for issue_id, score_list in question_score_map.items(): 

195 final_score = _calculate_final_score(score_list, project.maximum_score) 

196 

197 # Get the Issue object from our pre-fetched map 

198 issue = scoreable_issue_map.get(issue_id) 

199 if issue: # Should always be found, but check for safety 

200 key = _build_score_key(issue, question, user.id) 

201 autoscores_dict[key] = _create_autoscore_entry( 

202 issue, question, user.id, final_score 

203 ) 

204 

205 return autoscores_dict 

206 

207 

208def scores_dict(project: Project) -> dict[str, Score]: 

209 """Returns a dictionary of all scores for the project, keyed by build_score_key.""" 

210 

211 if project.status_name != "Live": 

212 # Scoring typically applies to Live projects 

213 return {} 

214 

215 project_scores = {} 

216 for issue in project.scoreable_issues: 

217 for score in issue.scores.all(): 

218 key = _build_score_key(issue, score.question, score.scoreset_id) 

219 project_scores[key] = score 

220 return project_scores 

221 

222 

223def scores_dict_scoreset(project: Project, scoreset_id: str) -> dict[str, Score]: 

224 """ 

225 Optimized version that fetches scores for a specific scoreset with minimal queries. 

226 N.B. This version doesn't check for "scoreable_issues" - that is the responsibility of the 

227 caller. 

228 """ 

229 from sqlalchemy.orm import joinedload 

230 

231 if project.status_name != "Live": 

232 return {} 

233 

234 # Single query to get all relevant scores with proper joins 

235 session = object_session(project) 

236 assert session is not None 

237 scores = ( 

238 session.query(Score) 

239 .options(joinedload(Score.question), joinedload(Score.issue)) 

240 .join(Issue, Score.issue_id == Issue.id) 

241 .filter(Issue.project_id == project.id) 

242 .filter(Score.scoreset_id == scoreset_id) 

243 .all() 

244 ) 

245 

246 project_scores = {} 

247 for score in scores: 

248 key = _build_score_key(score.issue, score.question, score.scoreset_id) 

249 project_scores[key] = score 

250 

251 return project_scores