Coverage for postrfp / shared / fetch / view_scoring.py: 88%
86 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1from typing import Dict, List, Optional
2from sqlalchemy import and_, func, case
3from sqlalchemy.orm import Session
4from postrfp.model import Section, QuestionInstance, Score
5from postrfp.model.questionnaire.score_views import QuestionScoreComponent
6from postrfp.model.questionnaire.weightings import TotalWeighting
7from decimal import Decimal
10class DirectSectionScore:
11 """
12 Calculates section scores through direct table queries for optimal performance.
14 This approach replaces the previous section_score_components database view,
15 providing better performance for querying specific parent sections
16 by avoiding full hierarchy materialization.
18 Use via get_child_scores() which automatically uses this approach for section
19 scores while still using the view for question scores (which performs well).
21 Attributes match the former SectionScoreComponent interface for compatibility.
22 """
24 def __init__(self, data: dict):
25 self.section_id = data["section_id"]
26 self.issue_id = data["issue_id"]
27 self.parent_id = data["parent_id"]
28 self.scoreset_id = data["scoreset_id"]
29 self.weighting_set_id = data["weighting_set_id"]
30 self.question_count = data["question_count"]
31 self.questions_scored = data["questions_scored"]
32 self.raw_total = data["raw_total"]
33 self.direct_weighted_total = data["direct_weighted_total"]
34 self.absolute_weighted_total = data["absolute_weighted_total"]
35 # Add these to match SectionScoreComponent exactly
36 self.id = None
38 def get_calculated_score(self, scoring_model: str) -> Decimal:
39 """Calculate total score based on scoring model with consistent precision."""
40 if scoring_model == "Unweighted":
41 if self.raw_total is not None:
42 return Decimal(str(self.raw_total)).quantize(Decimal("0.0001"))
43 return Decimal("0.0000")
44 elif scoring_model == "Direct":
45 if self.direct_weighted_total is not None:
46 return Decimal(str(self.direct_weighted_total)).quantize(
47 Decimal("0.0001")
48 )
49 return Decimal("0.0000")
50 elif scoring_model == "Absolute":
51 if self.absolute_weighted_total is not None:
52 return Decimal(str(self.absolute_weighted_total)).quantize(
53 Decimal("0.0001")
54 )
55 return Decimal("0.0000")
56 else:
57 if self.raw_total is not None:
58 return Decimal(str(self.raw_total)).quantize(Decimal("0.0001"))
59 return Decimal("0.0000")
62def get_child_scores(
63 session: Session,
64 section: Section,
65 target_types: List[str],
66 scoreset_id: str = "",
67 weighting_set_id: Optional[int] = None,
68 scoring_model: str = "Unweighted",
69) -> Dict[str, List]:
70 """
71 Get scores using direct queries for better performance.
72 Falls back to views only when necessary.
73 """
74 if weighting_set_id is None:
75 weighting_set_id = section.project.get_or_create_default_weighting_set_id()
77 results: Dict[str, List] = {
78 "question": [],
79 "section": [],
80 }
82 # Get question scores if requested - use view as it's efficient
83 if "question" in target_types:
84 question_query = session.query(QuestionScoreComponent).filter(
85 QuestionScoreComponent.section_id == section.id,
86 QuestionScoreComponent.scoreset_id == scoreset_id,
87 )
89 # Add weighting filter conditionally
90 if weighting_set_id:
91 question_query = question_query.filter(
92 and_(
93 QuestionScoreComponent.weighting_set_id == weighting_set_id,
94 QuestionScoreComponent.weighting_set_id.isnot(None),
95 )
96 )
98 results["question"] = list(question_query)
100 # Get section scores if requested - use direct query for performance
101 if "section" in target_types:
102 # Use direct query instead of view for better performance
103 direct_scores = _get_immediate_child_section_scores_direct(
104 session, section, scoreset_id, weighting_set_id
105 )
106 results["section"] = [
107 DirectSectionScore(data) for data in direct_scores.values()
108 ]
110 return results
113def _get_immediate_child_section_scores_direct(
114 session: Session,
115 parent_section: Section,
116 scoreset_id: str = "",
117 weighting_set_id: Optional[int] = None,
118) -> dict[tuple[int, int], dict]:
119 """
120 Get aggregated scores for immediate child sections using direct table queries.
121 This bypasses the view for much better performance.
122 """
123 # Get immediate child sections
124 child_sections = (
125 session.query(Section.id, Section.b36_number)
126 .filter(Section.parent_id == parent_section.id)
127 .all()
128 )
130 if not child_sections:
131 return {}
133 # Build section hierarchy mapping in a single query
134 # Get all sections that are descendants of any child
135 child_patterns = [child.b36_number for child in child_sections]
137 # Use OR conditions to find all descendants in one query
138 from sqlalchemy import or_
140 descendant_conditions = [
141 Section.b36_number.like(f"{pattern}%") for pattern in child_patterns
142 ]
144 all_descendants = (
145 session.query(Section.id, Section.b36_number)
146 .filter(
147 Section.project_id == parent_section.project_id, or_(*descendant_conditions)
148 )
149 .all()
150 )
152 # Map each descendant to its immediate child parent
153 section_map = {} # descendant_id -> child_id
154 descendant_ids = []
156 for desc in all_descendants:
157 # Find which child this descendant belongs to
158 # by checking which child's b36_number is a prefix
159 for child in child_sections:
160 if desc.b36_number.startswith(child.b36_number):
161 section_map[desc.id] = child.id
162 descendant_ids.append(desc.id)
163 break
165 if not descendant_ids:
166 return {}
168 # Query scores with weightings for all descendant sections
169 query = (
170 session.query(
171 QuestionInstance.section_id,
172 Score.issue_id,
173 func.count(QuestionInstance.id).label("question_count"),
174 func.count(case((Score.score.isnot(None), 1))).label("questions_scored"),
175 func.sum(Score.score).label("raw_total"),
176 func.sum(Score.score * func.coalesce(TotalWeighting.weight, 1.0)).label(
177 "direct_weighted_total"
178 ),
179 func.sum(
180 Score.score * func.coalesce(TotalWeighting.absolute_weight, 1.0)
181 ).label("absolute_weighted_total"),
182 )
183 .select_from(QuestionInstance)
184 .join(Score, Score.question_instance_id == QuestionInstance.id)
185 .outerjoin(
186 TotalWeighting,
187 and_(
188 TotalWeighting.question_instance_id == QuestionInstance.id,
189 TotalWeighting.section_id == 0,
190 TotalWeighting.weighting_set_id == weighting_set_id,
191 ),
192 )
193 .filter(
194 QuestionInstance.section_id.in_(descendant_ids),
195 Score.scoreset_id == scoreset_id,
196 Score.score.isnot(None),
197 )
198 .group_by(QuestionInstance.section_id, Score.issue_id)
199 )
201 results = query.all()
203 # Aggregate by child section (roll up descendants)
204 aggregated = {}
205 for row in results:
206 desc_section_id = row.section_id
207 issue_id = row.issue_id
208 child_section_id = section_map.get(desc_section_id)
210 if child_section_id is None:
211 continue
213 key = (child_section_id, issue_id)
214 if key not in aggregated:
215 aggregated[key] = {
216 "section_id": child_section_id,
217 "issue_id": issue_id,
218 "parent_id": parent_section.id,
219 "scoreset_id": scoreset_id,
220 "weighting_set_id": weighting_set_id,
221 "question_count": 0,
222 "questions_scored": 0,
223 "raw_total": Decimal("0"),
224 "direct_weighted_total": Decimal("0"),
225 "absolute_weighted_total": Decimal("0"),
226 }
228 agg = aggregated[key]
229 agg["question_count"] += row.question_count or 0
230 agg["questions_scored"] += row.questions_scored or 0
231 agg["raw_total"] += Decimal(str(row.raw_total or 0))
232 agg["direct_weighted_total"] += Decimal(str(row.direct_weighted_total or 0))
233 agg["absolute_weighted_total"] += Decimal(str(row.absolute_weighted_total or 0))
235 return aggregated