Coverage for postrfp/shared/fetch/scoreq.py: 95%

123 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Functions which fetch Score objects from the database 

3""" 

4 

5import logging 

6from operator import itemgetter 

7from typing import Any, TYPE_CHECKING, Union, NamedTuple 

8from decimal import Decimal 

9 

10from sqlalchemy.orm.exc import NoResultFound 

11 

12from sqlalchemy import literal, func, case, and_, cast, INTEGER, text, tuple_ 

13from sqlalchemy.orm import Session, Query 

14from sqlalchemy.orm.session import object_session 

15 

16if TYPE_CHECKING: 

17 from sqlalchemy import Alias, Subquery 

18 from decimal import Decimal 

19 

20from postrfp.model.questionnaire.b36 import from_b36 

21from postrfp.model import ( 

22 User, 

23 Project, 

24 Section, 

25 Issue, 

26 Score, 

27 QuestionInstance, 

28 SectionPermission, 

29 Participant, 

30 TotalWeighting, 

31) 

32from postrfp.templates import get_template 

33from postrfp.authorisation import perms 

34 

35 

36log = logging.getLogger(__name__) 

37 

38 

39class ScoreTuple(NamedTuple): 

40 issue_id: int 

41 score_value: Decimal | None = None 

42 scoreset_id: str = "" 

43 comment: str | None = None 

44 

45 

46def scores( 

47 session: Session, 

48 project: Project, 

49 section: Section, 

50 scoreset_id: str, 

51 user: User, 

52 filter_by, 

53) -> list[dict[str, Union[str, int, "Decimal"]]]: 

54 Score.validate_scoreset(scoreset_id, project) 

55 score_perm = get_permission_for_scoreset(user, scoreset_id) 

56 user.check_permission(score_perm) 

57 

58 sq = ( 

59 session.query( 

60 QuestionInstance.id.label("question_id"), 

61 Issue.id.label("issue_id"), 

62 literal(scoreset_id).label("scoreset_id"), 

63 Score.score.label("score"), 

64 ) 

65 .join(Section) 

66 .join(Project, Section.project) 

67 .join(Issue) 

68 .outerjoin( 

69 Score, 

70 and_( 

71 Score.scoreset_id == scoreset_id, 

72 QuestionInstance.id == Score.question_instance_id, 

73 Issue.id == Score.issue_id, 

74 ), 

75 ) 

76 .filter(Project.id == Issue.project_id, Section.id == section.id, filter_by) 

77 .order_by(QuestionInstance.id.asc(), Issue.id.asc()) 

78 ) 

79 return [s._asdict() for s in sq] 

80 

81 

82def or_create_score( 

83 session: Session, 

84 project: Project, 

85 question: QuestionInstance, 

86 score_doc, 

87) -> tuple[Score, bool]: 

88 """ 

89 Get issue and question required to access score 

90 Do required validation and permission checks 

91 """ 

92 issue_id, scoreset_id = score_doc.issue_id, score_doc.scoreset_id 

93 if scoreset_id is None: 

94 scoreset_id = "" 

95 

96 Score.validate_scoreset(scoreset_id, project) 

97 

98 issue = project.get_issue(issue_id) 

99 

100 # Fetch or create score object 

101 created = False 

102 try: 

103 score = ( 

104 session.query(Score) 

105 .filter( 

106 Score.question_instance_id == question.id, 

107 Score.issue_id == issue.id, 

108 Score.scoreset_id == scoreset_id, 

109 ) 

110 .one() 

111 ) 

112 except NoResultFound: 

113 score = Score(issue=issue, question=question, scoreset_id=scoreset_id) 

114 session.add(score) 

115 created = True 

116 

117 return score, created 

118 

119 

120def or_create_scores_batch( 

121 session: Session, 

122 project: Project, 

123 score_requests: list[tuple[QuestionInstance, ScoreTuple]], 

124) -> list[tuple[Score, bool]]: 

125 """ 

126 Batch version of or_create_score for better performance. 

127 

128 Args: 

129 session: Database session 

130 project: Project object 

131 score_requests: List of (question, score_doc) tuples 

132 

133 Returns: 

134 List of (score, created) tuples in the same order as input 

135 """ 

136 if not score_requests: 

137 return [] 

138 

139 # Normalize scoreset_ids and validate 

140 normalized_requests = [] 

141 for question, score_doc in score_requests: 

142 scoreset_id = score_doc.scoreset_id or "" 

143 Score.validate_scoreset(scoreset_id, project) 

144 issue = project.get_issue(score_doc.issue_id) 

145 normalized_requests.append((question, issue, scoreset_id, score_doc)) 

146 

147 # Build keys for a single fetch of existing scores 

148 keys = [ 

149 (question.id, issue.id, scoreset_id) 

150 for (question, issue, scoreset_id, _sd) in normalized_requests 

151 ] 

152 

153 # Fetch all existing scores 

154 existing_scores = {} 

155 if keys: 

156 existing_q = session.query(Score).filter( 

157 tuple_(Score.question_instance_id, Score.issue_id, Score.scoreset_id).in_( 

158 keys 

159 ) 

160 ) 

161 for score in existing_q: 

162 key = (score.question_instance_id, score.issue_id, score.scoreset_id) 

163 existing_scores[key] = score 

164 

165 # Build results, creating new scores where needed 

166 results = [] 

167 new_scores = [] 

168 for question, issue, scoreset_id, score_doc in normalized_requests: 

169 key = (question.id, issue.id, scoreset_id) 

170 if key in existing_scores: 

171 results.append((existing_scores[key], False)) 

172 else: 

173 new_score = Score(issue=issue, question=question, scoreset_id=scoreset_id) 

174 new_scores.append(new_score) 

175 results.append((new_score, True)) 

176 

177 if new_scores: 

178 session.add_all(new_scores) 

179 

180 return results 

181 

182 

183def scoring_data(project: Project, scoreset_id: str = "") -> Query: 

184 """ 

185 Returns a list of a score records for the given project and scoring set. 

186 

187 Each dict has keys: score, number, issue_id 

188 

189 """ 

190 session = object_session(project) 

191 assert session is not None 

192 q = ( 

193 session.query( 

194 Score.score, 

195 QuestionInstance.id.label("question_id"), 

196 QuestionInstance.b36_number, 

197 Score.issue_id, 

198 ) 

199 .join(QuestionInstance) 

200 .filter(QuestionInstance.project == project, Score.scoreset_id == scoreset_id) 

201 ) 

202 

203 return q 

204 

205 

206def score_totals_by_project( 

207 session: Session, org_id: str, project_ids: list[int] 

208) -> Query: 

209 project_ids_q = ( 

210 session.query(Participant.project_id) 

211 .filter(Participant.org_id == org_id) 

212 .filter(Participant.project_id.in_(project_ids)) 

213 .distinct() 

214 ) 

215 

216 return ( 

217 session.query( 

218 func.sum(Score.score * TotalWeighting.absolute_weight).label( 

219 "total_weighted_score" 

220 ), 

221 Project.title.label("project_title"), 

222 Project.org_id.label("project_owner"), 

223 Project.date_published, 

224 Project.id.label("project_id"), 

225 Issue.respondent_id, 

226 ) 

227 .join( 

228 TotalWeighting, 

229 Score.question_instance_id == TotalWeighting.question_instance_id, 

230 ) 

231 .join(Issue) 

232 .join(Project) 

233 .filter( 

234 Score.scoreset_id == "", 

235 TotalWeighting.weighting_set_id == Project.default_weighting_set_id, 

236 TotalWeighting.section_id == 0, 

237 Project.id.in_(project_ids_q), 

238 ) 

239 .group_by(Project.id, Score.issue_id) 

240 .order_by(Project.id.desc(), text("total_weighted_score desc")) 

241 ) 

242 

243 

244def score_gaps( 

245 issue: Issue, 

246 weighting_set_id: int | None = None, 

247 expose_weights: bool = True, 

248 show_gap_value: bool = True, 

249 debug: bool = False, 

250) -> list[dict[str, Any]]: 

251 # Resolve None weightset_id to the actual default weighting set ID 

252 if weighting_set_id is None: 

253 session = object_session(issue) 

254 assert session is not None 

255 project = session.query(Project).filter(Project.id == issue.project_id).one() 

256 weighting_set_id = project.get_or_create_default_weighting_set_id() 

257 

258 tmpl = get_template("sql/score_gap.sql") 

259 sql_script = text( 

260 tmpl.render( 

261 expose_weights=expose_weights, show_gap_value=show_gap_value, debug=debug 

262 ) 

263 ) 

264 

265 params = { 

266 "project_id": issue.project_id, 

267 "issue_id": issue.id, 

268 "weighting_set_id": weighting_set_id, 

269 } 

270 

271 res: list[dict[str, Any]] = [] 

272 session = object_session(issue) 

273 

274 assert session is not None 

275 

276 if show_gap_value: 

277 for q_row in session.execute(sql_script, params): 

278 q_dict = dict(q_row._mapping) 

279 q_dict["number"] = from_b36(q_row.b36_number) 

280 res.append(q_dict) 

281 else: 

282 # Don't show a numerical value for the gap, just =, - or + 

283 for q_row in session.execute(sql_script, params): 

284 q_dict = dict(q_row._mapping) 

285 q_dict["number"] = from_b36(q_row.b36_number) 

286 score_gap = q_dict["score_gap"] 

287 if score_gap is None: 

288 q_dict["score_gap"] = "==" 

289 elif score_gap > 0: 

290 q_dict["score_gap"] = "+" 

291 elif score_gap == 0: 

292 q_dict["score_gap"] = "==" 

293 else: 

294 # score_gap < 1 

295 q_dict["score_gap"] = "-" 

296 res.append(q_dict) 

297 

298 res.sort(key=itemgetter("number")) 

299 

300 return res 

301 

302 

303def question_scoresummary( 

304 session: Session, 

305 user: User, 

306 project: Project, 

307 section: Section, 

308 scoreset_id: str = "", 

309) -> Query: 

310 # Questions 

311 questions = ( 

312 session.query( 

313 QuestionInstance.id.label("question_id"), 

314 Issue.id.label("issue_id"), 

315 literal(scoreset_id).label("scoreset_id"), 

316 ) 

317 .join(Section) 

318 .join(Project, Section.project) 

319 .join(Issue) 

320 .outerjoin( 

321 Score, 

322 and_( 

323 Score.scoreset_id == scoreset_id, 

324 QuestionInstance.id == Score.question_instance_id, 

325 Issue.id == Score.issue_id, 

326 ), 

327 ) 

328 .filter( 

329 Issue.project == project, 

330 Issue.scoreable_filter(project), 

331 Section.id == section.id, 

332 ) 

333 ) 

334 

335 if user.is_restricted: 

336 query = questions.outerjoin( 

337 SectionPermission, 

338 and_( 

339 SectionPermission.user == user, 

340 Section.id == SectionPermission.section_id, 

341 ), 

342 ).add_columns( 

343 case( 

344 (SectionPermission.section_id != None, Score.score), # noqa 

345 else_=literal("N/A"), 

346 ).label("score") 

347 ) 

348 else: 

349 query = questions.add_columns(Score.score) 

350 

351 return query 

352 

353 

354def subsection_scoressummary( 

355 session: Session, 

356 user: User, 

357 project: Project, 

358 section: Section, 

359 scoreset_id: str = "", 

360) -> "Subquery": 

361 """ 

362 Subsections 

363 First create a subquery totalling up the counts of questions 

364 contained immediately in each section. 

365 Exclude all sections the score_user does not have permissions on 

366 """ 

367 sub = ( 

368 session.query( 

369 Section.b36_number.label("b36_number"), 

370 Issue.id.label("issue_id"), 

371 func.count(QuestionInstance.id).label("q_count"), 

372 func.count(Score.score).label("q_scored"), 

373 func.coalesce(func.sum(Score.score), 0).label("score"), 

374 ) 

375 .join(Project, Section.project) 

376 .join(Issue) 

377 .outerjoin( 

378 QuestionInstance, 

379 and_( 

380 QuestionInstance.project_id == Project.id, 

381 QuestionInstance.section_id == Section.id, 

382 ), 

383 ) 

384 .outerjoin( 

385 Score, 

386 and_( 

387 Score.scoreset_id == scoreset_id, 

388 QuestionInstance.id == Score.question_instance_id, 

389 Issue.id == Score.issue_id, 

390 ), 

391 ) 

392 .filter(Issue.project_id == project.id, Issue.scoreable_filter(project)) 

393 .group_by(Section.id, Issue.id) 

394 ) 

395 

396 if user.is_restricted: 

397 sub = sub.join( 

398 SectionPermission, 

399 and_( 

400 SectionPermission.user == user, 

401 Section.id == SectionPermission.section_id, 

402 ), 

403 ) 

404 

405 return sub.subquery() 

406 

407 

408def section_scoresummary( 

409 session: Session, 

410 user: User, 

411 project: Project, 

412 section: Section, 

413 sub: Union["Alias", "Subquery"], 

414) -> Query: 

415 """ 

416 create main query of all immediate subsections in this section 

417 and total up the totals from the subsection and child subsections 

418 from the subquery. No rows from the subquery indicates the score_user 

419 does not have access to any subsections, in this case a score of 'N/A' 

420 is returned. 

421 """ 

422 

423 subsections = ( 

424 session.query( 

425 Section.id.label("section_id"), 

426 Issue.id.label("issue_id"), 

427 cast(func.sum(sub.c.q_count), INTEGER).label("question_count"), 

428 cast(func.sum(sub.c.q_scored), INTEGER).label("questions_scored"), 

429 func.coalesce(cast(func.sum(sub.c.score), INTEGER), literal("N/A")).label( 

430 "score" 

431 ), 

432 ) 

433 .join(Project, Section.project) 

434 .join(Issue) 

435 .outerjoin( 

436 sub, 

437 and_( 

438 sub.c.b36_number.startswith(Section.b36_number), 

439 Issue.id == sub.c.issue_id, 

440 ), 

441 ) 

442 .filter( 

443 Issue.project == project, 

444 Issue.scoreable_filter(project), 

445 Section.parent_id == section.id, 

446 ) 

447 .group_by(Section.id, Issue.id) 

448 ) 

449 

450 return subsections 

451 

452 

453def get_permission_for_scoreset(user, scoreset_id, to_save=False): 

454 if scoreset_id in (None, "") or scoreset_id != user.id: 

455 return ( 

456 perms.ISSUE_SAVE_AGREED_SCORES 

457 if to_save 

458 else perms.ISSUE_VIEW_AGREED_SCORES 

459 ) 

460 else: 

461 return perms.ISSUE_SAVE_SCORES if to_save else perms.ISSUE_VIEW_SCORES