Coverage for postrfp / shared / fetch / view_scoring.py: 88%

86 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from typing import Dict, List, Optional 

2from sqlalchemy import and_, func, case 

3from sqlalchemy.orm import Session 

4from postrfp.model import Section, QuestionInstance, Score 

5from postrfp.model.questionnaire.score_views import QuestionScoreComponent 

6from postrfp.model.questionnaire.weightings import TotalWeighting 

7from decimal import Decimal 

8 

9 

10class DirectSectionScore: 

11 """ 

12 Calculates section scores through direct table queries for optimal performance. 

13 

14 This approach replaces the previous section_score_components database view, 

15 providing better performance for querying specific parent sections 

16 by avoiding full hierarchy materialization. 

17 

18 Use via get_child_scores() which automatically uses this approach for section 

19 scores while still using the view for question scores (which performs well). 

20 

21 Attributes match the former SectionScoreComponent interface for compatibility. 

22 """ 

23 

24 def __init__(self, data: dict): 

25 self.section_id = data["section_id"] 

26 self.issue_id = data["issue_id"] 

27 self.parent_id = data["parent_id"] 

28 self.scoreset_id = data["scoreset_id"] 

29 self.weighting_set_id = data["weighting_set_id"] 

30 self.question_count = data["question_count"] 

31 self.questions_scored = data["questions_scored"] 

32 self.raw_total = data["raw_total"] 

33 self.direct_weighted_total = data["direct_weighted_total"] 

34 self.absolute_weighted_total = data["absolute_weighted_total"] 

35 # Add these to match SectionScoreComponent exactly 

36 self.id = None 

37 

38 def get_calculated_score(self, scoring_model: str) -> Decimal: 

39 """Calculate total score based on scoring model with consistent precision.""" 

40 if scoring_model == "Unweighted": 

41 if self.raw_total is not None: 

42 return Decimal(str(self.raw_total)).quantize(Decimal("0.0001")) 

43 return Decimal("0.0000") 

44 elif scoring_model == "Direct": 

45 if self.direct_weighted_total is not None: 

46 return Decimal(str(self.direct_weighted_total)).quantize( 

47 Decimal("0.0001") 

48 ) 

49 return Decimal("0.0000") 

50 elif scoring_model == "Absolute": 

51 if self.absolute_weighted_total is not None: 

52 return Decimal(str(self.absolute_weighted_total)).quantize( 

53 Decimal("0.0001") 

54 ) 

55 return Decimal("0.0000") 

56 else: 

57 if self.raw_total is not None: 

58 return Decimal(str(self.raw_total)).quantize(Decimal("0.0001")) 

59 return Decimal("0.0000") 

60 

61 

62def get_child_scores( 

63 session: Session, 

64 section: Section, 

65 target_types: List[str], 

66 scoreset_id: str = "", 

67 weighting_set_id: Optional[int] = None, 

68 scoring_model: str = "Unweighted", 

69) -> Dict[str, List]: 

70 """ 

71 Get scores using direct queries for better performance. 

72 Falls back to views only when necessary. 

73 """ 

74 if weighting_set_id is None: 

75 weighting_set_id = section.project.get_or_create_default_weighting_set_id() 

76 

77 results: Dict[str, List] = { 

78 "question": [], 

79 "section": [], 

80 } 

81 

82 # Get question scores if requested - use view as it's efficient 

83 if "question" in target_types: 

84 question_query = session.query(QuestionScoreComponent).filter( 

85 QuestionScoreComponent.section_id == section.id, 

86 QuestionScoreComponent.scoreset_id == scoreset_id, 

87 ) 

88 

89 # Add weighting filter conditionally 

90 if weighting_set_id: 

91 question_query = question_query.filter( 

92 and_( 

93 QuestionScoreComponent.weighting_set_id == weighting_set_id, 

94 QuestionScoreComponent.weighting_set_id.isnot(None), 

95 ) 

96 ) 

97 

98 results["question"] = list(question_query) 

99 

100 # Get section scores if requested - use direct query for performance 

101 if "section" in target_types: 

102 # Use direct query instead of view for better performance 

103 direct_scores = _get_immediate_child_section_scores_direct( 

104 session, section, scoreset_id, weighting_set_id 

105 ) 

106 results["section"] = [ 

107 DirectSectionScore(data) for data in direct_scores.values() 

108 ] 

109 

110 return results 

111 

112 

113def _get_immediate_child_section_scores_direct( 

114 session: Session, 

115 parent_section: Section, 

116 scoreset_id: str = "", 

117 weighting_set_id: Optional[int] = None, 

118) -> dict[tuple[int, int], dict]: 

119 """ 

120 Get aggregated scores for immediate child sections using direct table queries. 

121 This bypasses the view for much better performance. 

122 """ 

123 # Get immediate child sections 

124 child_sections = ( 

125 session.query(Section.id, Section.b36_number) 

126 .filter(Section.parent_id == parent_section.id) 

127 .all() 

128 ) 

129 

130 if not child_sections: 

131 return {} 

132 

133 # Build section hierarchy mapping in a single query 

134 # Get all sections that are descendants of any child 

135 child_patterns = [child.b36_number for child in child_sections] 

136 

137 # Use OR conditions to find all descendants in one query 

138 from sqlalchemy import or_ 

139 

140 descendant_conditions = [ 

141 Section.b36_number.like(f"{pattern}%") for pattern in child_patterns 

142 ] 

143 

144 all_descendants = ( 

145 session.query(Section.id, Section.b36_number) 

146 .filter( 

147 Section.project_id == parent_section.project_id, or_(*descendant_conditions) 

148 ) 

149 .all() 

150 ) 

151 

152 # Map each descendant to its immediate child parent 

153 section_map = {} # descendant_id -> child_id 

154 descendant_ids = [] 

155 

156 for desc in all_descendants: 

157 # Find which child this descendant belongs to 

158 # by checking which child's b36_number is a prefix 

159 for child in child_sections: 

160 if desc.b36_number.startswith(child.b36_number): 

161 section_map[desc.id] = child.id 

162 descendant_ids.append(desc.id) 

163 break 

164 

165 if not descendant_ids: 

166 return {} 

167 

168 # Query scores with weightings for all descendant sections 

169 query = ( 

170 session.query( 

171 QuestionInstance.section_id, 

172 Score.issue_id, 

173 func.count(QuestionInstance.id).label("question_count"), 

174 func.count(case((Score.score.isnot(None), 1))).label("questions_scored"), 

175 func.sum(Score.score).label("raw_total"), 

176 func.sum(Score.score * func.coalesce(TotalWeighting.weight, 1.0)).label( 

177 "direct_weighted_total" 

178 ), 

179 func.sum( 

180 Score.score * func.coalesce(TotalWeighting.absolute_weight, 1.0) 

181 ).label("absolute_weighted_total"), 

182 ) 

183 .select_from(QuestionInstance) 

184 .join(Score, Score.question_instance_id == QuestionInstance.id) 

185 .outerjoin( 

186 TotalWeighting, 

187 and_( 

188 TotalWeighting.question_instance_id == QuestionInstance.id, 

189 TotalWeighting.section_id == 0, 

190 TotalWeighting.weighting_set_id == weighting_set_id, 

191 ), 

192 ) 

193 .filter( 

194 QuestionInstance.section_id.in_(descendant_ids), 

195 Score.scoreset_id == scoreset_id, 

196 Score.score.isnot(None), 

197 ) 

198 .group_by(QuestionInstance.section_id, Score.issue_id) 

199 ) 

200 

201 results = query.all() 

202 

203 # Aggregate by child section (roll up descendants) 

204 aggregated = {} 

205 for row in results: 

206 desc_section_id = row.section_id 

207 issue_id = row.issue_id 

208 child_section_id = section_map.get(desc_section_id) 

209 

210 if child_section_id is None: 

211 continue 

212 

213 key = (child_section_id, issue_id) 

214 if key not in aggregated: 

215 aggregated[key] = { 

216 "section_id": child_section_id, 

217 "issue_id": issue_id, 

218 "parent_id": parent_section.id, 

219 "scoreset_id": scoreset_id, 

220 "weighting_set_id": weighting_set_id, 

221 "question_count": 0, 

222 "questions_scored": 0, 

223 "raw_total": Decimal("0"), 

224 "direct_weighted_total": Decimal("0"), 

225 "absolute_weighted_total": Decimal("0"), 

226 } 

227 

228 agg = aggregated[key] 

229 agg["question_count"] += row.question_count or 0 

230 agg["questions_scored"] += row.questions_scored or 0 

231 agg["raw_total"] += Decimal(str(row.raw_total or 0)) 

232 agg["direct_weighted_total"] += Decimal(str(row.direct_weighted_total or 0)) 

233 agg["absolute_weighted_total"] += Decimal(str(row.absolute_weighted_total or 0)) 

234 

235 return aggregated