Coverage for postrfp/shared/fetch/autoscoreq.py: 76%
103 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import logging
2from typing import NamedTuple
3from collections import defaultdict
5from sqlalchemy.orm import Session, joinedload, object_session
8from postrfp.model import Project, Answer, Score, Issue, QElement
10log = logging.getLogger(__name__)
13class AutoscoreEntry(NamedTuple):
14 """Typed structure for a generated autoscore entry.
16 Matches the keys produced by _create_autoscore_entry and consumed by
17 callers such as postrfp.buyer.api.update.reset_scores.
18 """
20 issue_id: int
21 respondent: str
22 question_id: int
23 scoreset_id: str
24 score: int
25 question_number: str | None
28def _build_score_key(issue, question, scoreset_id):
29 """Builds a unique key for a score."""
31 return "%s:%s:%s" % (issue.id, question.id, scoreset_id)
34def _get_multichoice_elements_with_choices(question) -> list[QElement]:
35 """Extract question elements that contain choices for autoscoring."""
37 return [
38 el
39 for el in question.question_def.elements
40 if el.contains_choices and el.choices
41 ]
44def _process_answer(answer: Answer, element) -> int | None:
45 """Process a single answer to extract autoscore value if available."""
47 if not element or not element.choices:
48 return None
50 for choice in element.choices:
51 if (
52 answer.answer == choice.get("label")
53 and "autoscore" in choice
54 and choice["autoscore"] is not None
55 ):
56 try:
57 return int(choice["autoscore"])
58 except (ValueError, TypeError):
59 log.warning(
60 f"Invalid autoscore format '{choice['autoscore']}' for "
61 f"element {element.id}, choice '{choice.get('label')}'"
62 )
63 return None
66def _fetch_and_process_question_answers(
67 session: Session, question, element_ids: list[int], scoreable_issue_ids: list[int]
68) -> defaultdict[int, list[int]]:
69 """Fetch and process answers for a question, returning scores by issue ID."""
71 question_score_map = defaultdict(list)
73 answers_query = (
74 session.query(Answer, Answer.element)
75 .join(Answer.element)
76 .filter(Answer.question_instance_id == question.id)
77 .filter(Answer.element_id.in_(element_ids))
78 .filter(Answer.issue_id.in_(scoreable_issue_ids))
79 )
81 # Process all answers in a single query result
82 for answer, element in answers_query:
83 score_value = _process_answer(answer, element)
84 if score_value is not None:
85 question_score_map[answer.issue_id].append(score_value)
87 return question_score_map
90def _fetch_all_autoscore_answers_batch(
91 session: Session, autoscore_questions: list, scoreable_issue_ids: list[int]
92) -> dict[int, defaultdict[int, list[int]]]:
93 """
94 Batch fetch all answers for autoscore questions to minimize database queries.
96 Returns: {question_id: {issue_id: [scores]}}
97 """
98 question_element_map = {}
99 all_element_ids = []
101 # Collect all element IDs for all autoscore questions
102 for question in autoscore_questions:
103 multichoice_elements = _get_multichoice_elements_with_choices(question)
104 element_ids = [el.id for el in multichoice_elements]
105 if element_ids:
106 question_element_map[question.id] = {
107 "element_ids": element_ids,
108 "elements": {el.id: el for el in multichoice_elements},
109 }
110 all_element_ids.extend(element_ids)
112 if not all_element_ids:
113 return {}
115 # Single query to fetch all relevant answers
116 answers_query = (
117 session.query(Answer)
118 .filter(Answer.element_id.in_(all_element_ids))
119 .filter(Answer.issue_id.in_(scoreable_issue_ids))
120 .options(joinedload(Answer.element))
121 )
123 # Group results by question_id and issue_id
124 results: dict[int, defaultdict[int, list[int]]] = {}
125 for answer in answers_query:
126 question_id = answer.question_instance_id
127 if question_id not in question_element_map:
128 continue
130 if question_id not in results:
131 results[question_id] = defaultdict(list)
133 score_value = _process_answer(answer, answer.element)
134 if score_value is not None:
135 results[question_id][answer.issue_id].append(score_value)
137 return results
140def _calculate_final_score(score_list: list[int], max_score: int) -> int:
141 """Calculate the final score, applying any capping rules."""
143 return min(sum(score_list), max_score)
146def _create_autoscore_entry(
147 issue, question, user_id: str, score: int
148) -> AutoscoreEntry:
149 """Create an autoscore dictionary entry."""
151 return AutoscoreEntry(
152 issue_id=issue.id,
153 respondent=issue.respondent.name if issue.respondent else "N/A",
154 question_id=question.id,
155 scoreset_id=user_id,
156 score=score,
157 question_number=question.number,
158 )
161def generate_autoscores(
162 project: Project, session: Session, user
163) -> dict[str, AutoscoreEntry]:
164 """Generates a dictionary of autoscores for scoreable issues."""
166 if project.status_name != "Live":
167 return {}
169 autoscores_dict: dict[str, AutoscoreEntry] = {}
171 # Pre-fetch scoreable issues to avoid repeated queries inside the loop
172 scoreable_issue_map = {iss.id: iss for iss in project.scoreable_issues}
173 if not scoreable_issue_map:
174 return {}
176 # Get all autoscore questions
177 autoscore_questions = [
178 q for q in project.query_visible_questions(user) if q.is_autoscored
179 ]
181 if not autoscore_questions:
182 return {}
184 # Batch fetch all answers for all autoscore questions
185 all_question_scores = _fetch_all_autoscore_answers_batch(
186 session, autoscore_questions, list(scoreable_issue_map.keys())
187 )
189 # Process each question's results
190 for question in autoscore_questions:
191 question_score_map: dict = all_question_scores.get(question.id, {})
193 # Finalize scores for this question and populate autoscores_dict
194 for issue_id, score_list in question_score_map.items():
195 final_score = _calculate_final_score(score_list, project.maximum_score)
197 # Get the Issue object from our pre-fetched map
198 issue = scoreable_issue_map.get(issue_id)
199 if issue: # Should always be found, but check for safety
200 key = _build_score_key(issue, question, user.id)
201 autoscores_dict[key] = _create_autoscore_entry(
202 issue, question, user.id, final_score
203 )
205 return autoscores_dict
208def scores_dict(project: Project) -> dict[str, Score]:
209 """Returns a dictionary of all scores for the project, keyed by build_score_key."""
211 if project.status_name != "Live":
212 # Scoring typically applies to Live projects
213 return {}
215 project_scores = {}
216 for issue in project.scoreable_issues:
217 for score in issue.scores.all():
218 key = _build_score_key(issue, score.question, score.scoreset_id)
219 project_scores[key] = score
220 return project_scores
223def scores_dict_scoreset(project: Project, scoreset_id: str) -> dict[str, Score]:
224 """
225 Optimized version that fetches scores for a specific scoreset with minimal queries.
226 N.B. This version doesn't check for "scoreable_issues" - that is the responsibility of the
227 caller.
228 """
229 from sqlalchemy.orm import joinedload
231 if project.status_name != "Live":
232 return {}
234 # Single query to get all relevant scores with proper joins
235 session = object_session(project)
236 assert session is not None
237 scores = (
238 session.query(Score)
239 .options(joinedload(Score.question), joinedload(Score.issue))
240 .join(Issue, Score.issue_id == Issue.id)
241 .filter(Issue.project_id == project.id)
242 .filter(Score.scoreset_id == scoreset_id)
243 .all()
244 )
246 project_scores = {}
247 for score in scores:
248 key = _build_score_key(score.issue, score.question, score.scoreset_id)
249 project_scores[key] = score
251 return project_scores