Coverage for postrfp / shared / fetch / scoreq.py: 95%

123 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2Functions which fetch Score objects from the database 

3""" 

4 

5import logging 

6from operator import itemgetter 

7from typing import Any, TYPE_CHECKING, Union, NamedTuple 

8from decimal import Decimal 

9 

10from sqlalchemy.orm.exc import NoResultFound 

11 

12from sqlalchemy import literal, func, case, and_, cast, INTEGER, VARCHAR, text, tuple_ 

13from sqlalchemy.orm import Session, Query 

14from sqlalchemy.orm.session import object_session 

15 

16if TYPE_CHECKING: 

17 from sqlalchemy import Alias, Subquery 

18 from decimal import Decimal 

19 

20from postrfp.model.questionnaire.b36 import from_b36 

21from postrfp.model import ( 

22 User, 

23 Project, 

24 Section, 

25 Issue, 

26 Score, 

27 QuestionInstance, 

28 SectionPermission, 

29 Participant, 

30 TotalWeighting, 

31) 

32from postrfp.templates import get_template 

33from postrfp.authorisation import perms 

34 

35 

36log = logging.getLogger(__name__) 

37 

38 

39class ScoreTuple(NamedTuple): 

40 issue_id: int 

41 score_value: Decimal | None = None 

42 scoreset_id: str = "" 

43 comment: str | None = None 

44 

45 

46def scores( 

47 session: Session, 

48 project: Project, 

49 section: Section, 

50 scoreset_id: str, 

51 user: User, 

52 filter_by, 

53) -> list[dict[str, Union[str, int, "Decimal"]]]: 

54 Score.validate_scoreset(scoreset_id, project) 

55 score_perm = get_permission_for_scoreset(user, scoreset_id) 

56 user.check_permission(score_perm) 

57 

58 sq = ( 

59 session.query( 

60 QuestionInstance.id.label("question_id"), 

61 Issue.id.label("issue_id"), 

62 literal(scoreset_id).label("scoreset_id"), 

63 Score.score.label("score"), 

64 ) 

65 .join(Section) 

66 .join(Project, Section.project) 

67 .join(Issue) 

68 .outerjoin( 

69 Score, 

70 and_( 

71 Score.scoreset_id == scoreset_id, 

72 QuestionInstance.id == Score.question_instance_id, 

73 Issue.id == Score.issue_id, 

74 ), 

75 ) 

76 .filter(Project.id == Issue.project_id, Section.id == section.id, filter_by) 

77 .order_by(QuestionInstance.id.asc(), Issue.id.asc()) 

78 ) 

79 return [s._asdict() for s in sq] 

80 

81 

82def or_create_score( 

83 session: Session, 

84 project: Project, 

85 question: QuestionInstance, 

86 score_doc, 

87) -> tuple[Score, bool]: 

88 """ 

89 Get issue and question required to access score 

90 Do required validation and permission checks 

91 """ 

92 issue_id, scoreset_id = score_doc.issue_id, score_doc.scoreset_id 

93 if scoreset_id is None: 

94 scoreset_id = "" 

95 

96 Score.validate_scoreset(scoreset_id, project) 

97 

98 issue = project.get_issue(issue_id) 

99 

100 # Fetch or create score object 

101 created = False 

102 try: 

103 score = ( 

104 session.query(Score) 

105 .filter( 

106 Score.question_instance_id == question.id, 

107 Score.issue_id == issue.id, 

108 Score.scoreset_id == scoreset_id, 

109 ) 

110 .one() 

111 ) 

112 except NoResultFound: 

113 score = Score(issue=issue, question=question, scoreset_id=scoreset_id) 

114 session.add(score) 

115 created = True 

116 

117 return score, created 

118 

119 

120def or_create_scores_batch( 

121 session: Session, 

122 project: Project, 

123 score_requests: list[tuple[QuestionInstance, ScoreTuple]], 

124) -> list[tuple[Score, bool]]: 

125 """ 

126 Batch version of or_create_score for better performance. 

127 

128 Args: 

129 session: Database session 

130 project: Project object 

131 score_requests: List of (question, score_doc) tuples 

132 

133 Returns: 

134 List of (score, created) tuples in the same order as input 

135 """ 

136 if not score_requests: 

137 return [] 

138 

139 # Normalize scoreset_ids and validate 

140 normalized_requests = [] 

141 for question, score_doc in score_requests: 

142 scoreset_id = score_doc.scoreset_id or "" 

143 Score.validate_scoreset(scoreset_id, project) 

144 issue = project.get_issue(score_doc.issue_id) 

145 normalized_requests.append((question, issue, scoreset_id, score_doc)) 

146 

147 # Build keys for a single fetch of existing scores 

148 keys = [ 

149 (question.id, issue.id, scoreset_id) 

150 for (question, issue, scoreset_id, _sd) in normalized_requests 

151 ] 

152 

153 # Fetch all existing scores 

154 existing_scores = {} 

155 if keys: 

156 existing_q = session.query(Score).filter( 

157 tuple_(Score.question_instance_id, Score.issue_id, Score.scoreset_id).in_( 

158 keys 

159 ) 

160 ) 

161 for score in existing_q: 

162 key = (score.question_instance_id, score.issue_id, score.scoreset_id) 

163 existing_scores[key] = score 

164 

165 # Build results, creating new scores where needed 

166 results = [] 

167 new_scores = [] 

168 for question, issue, scoreset_id, score_doc in normalized_requests: 

169 key = (question.id, issue.id, scoreset_id) 

170 if key in existing_scores: 

171 results.append((existing_scores[key], False)) 

172 else: 

173 new_score = Score(issue=issue, question=question, scoreset_id=scoreset_id) 

174 new_scores.append(new_score) 

175 results.append((new_score, True)) 

176 

177 if new_scores: 

178 session.add_all(new_scores) 

179 

180 return results 

181 

182 

183def scoring_data(project: Project, scoreset_id: str = "") -> Query: 

184 """ 

185 Returns a list of a score records for the given project and scoring set. 

186 

187 Each dict has keys: score, number, issue_id 

188 

189 """ 

190 session = object_session(project) 

191 assert session is not None 

192 q = ( 

193 session.query( 

194 Score.score, 

195 QuestionInstance.id.label("question_id"), 

196 QuestionInstance.b36_number, 

197 Score.issue_id, 

198 ) 

199 .join(QuestionInstance) 

200 .filter(QuestionInstance.project == project, Score.scoreset_id == scoreset_id) 

201 ) 

202 

203 return q 

204 

205 

206def score_totals_by_project( 

207 session: Session, org_id: str, project_ids: list[int] 

208) -> Query: 

209 project_ids_q = ( 

210 session.query(Participant.project_id) 

211 .filter(Participant.org_id == org_id) 

212 .filter(Participant.project_id.in_(project_ids)) 

213 .distinct() 

214 ) 

215 

216 return ( 

217 session.query( 

218 func.sum(Score.score * TotalWeighting.absolute_weight).label( 

219 "total_weighted_score" 

220 ), 

221 Project.title.label("project_title"), 

222 Project.org_id.label("project_owner"), 

223 Project.date_published, 

224 Project.id.label("project_id"), 

225 Issue.respondent_id, 

226 ) 

227 .join( 

228 TotalWeighting, 

229 Score.question_instance_id == TotalWeighting.question_instance_id, 

230 ) 

231 .join(Issue) 

232 .join(Project) 

233 .filter( 

234 Score.scoreset_id == "", 

235 TotalWeighting.weighting_set_id == Project.default_weighting_set_id, 

236 TotalWeighting.section_id == 0, 

237 Project.id.in_(project_ids_q), 

238 ) 

239 .group_by( 

240 Project.id, 

241 Score.issue_id, 

242 Issue.respondent_id, 

243 Project.title, 

244 Project.org_id, 

245 Project.date_published, 

246 ) 

247 .order_by(Project.id.desc(), text("total_weighted_score desc")) 

248 ) 

249 

250 

251def score_gaps( 

252 issue: Issue, 

253 weighting_set_id: int | None = None, 

254 expose_weights: bool = True, 

255 show_gap_value: bool = True, 

256 debug: bool = False, 

257) -> list[dict[str, Any]]: 

258 # Resolve None weightset_id to the actual default weighting set ID 

259 if weighting_set_id is None: 

260 session = object_session(issue) 

261 assert session is not None 

262 project = session.query(Project).filter(Project.id == issue.project_id).one() 

263 weighting_set_id = project.get_or_create_default_weighting_set_id() 

264 

265 tmpl = get_template("sql/score_gap.sql") 

266 sql_script = text( 

267 tmpl.render( 

268 expose_weights=expose_weights, show_gap_value=show_gap_value, debug=debug 

269 ) 

270 ) 

271 

272 params = { 

273 "project_id": issue.project_id, 

274 "issue_id": issue.id, 

275 "weighting_set_id": weighting_set_id, 

276 } 

277 

278 res: list[dict[str, Any]] = [] 

279 session = object_session(issue) 

280 

281 assert session is not None 

282 

283 if show_gap_value: 

284 for q_row in session.execute(sql_script, params): 

285 q_dict = dict(q_row._mapping) 

286 q_dict["number"] = from_b36(q_row.b36_number) 

287 res.append(q_dict) 

288 else: 

289 # Don't show a numerical value for the gap, just =, - or + 

290 for q_row in session.execute(sql_script, params): 

291 q_dict = dict(q_row._mapping) 

292 q_dict["number"] = from_b36(q_row.b36_number) 

293 score_gap = q_dict["score_gap"] 

294 if score_gap is None: 

295 q_dict["score_gap"] = "==" 

296 elif score_gap > 0: 

297 q_dict["score_gap"] = "+" 

298 elif score_gap == 0: 

299 q_dict["score_gap"] = "==" 

300 else: 

301 # score_gap < 1 

302 q_dict["score_gap"] = "-" 

303 res.append(q_dict) 

304 

305 res.sort(key=itemgetter("number")) 

306 

307 return res 

308 

309 

310def question_scoresummary( 

311 session: Session, 

312 user: User, 

313 project: Project, 

314 section: Section, 

315 scoreset_id: str = "", 

316) -> Query: 

317 # Questions 

318 questions = ( 

319 session.query( 

320 QuestionInstance.id.label("question_id"), 

321 Issue.id.label("issue_id"), 

322 literal(scoreset_id).label("scoreset_id"), 

323 ) 

324 .join(Section) 

325 .join(Project, Section.project) 

326 .join(Issue) 

327 .outerjoin( 

328 Score, 

329 and_( 

330 Score.scoreset_id == scoreset_id, 

331 QuestionInstance.id == Score.question_instance_id, 

332 Issue.id == Score.issue_id, 

333 ), 

334 ) 

335 .filter( 

336 Issue.project == project, 

337 Issue.scoreable_filter(project), 

338 Section.id == section.id, 

339 ) 

340 ) 

341 

342 if user.is_restricted: 

343 query = questions.outerjoin( 

344 SectionPermission, 

345 and_( 

346 SectionPermission.user == user, 

347 Section.id == SectionPermission.section_id, 

348 ), 

349 ).add_columns( 

350 case( 

351 (SectionPermission.section_id != None, cast(Score.score, VARCHAR)), # noqa 

352 else_=literal("N/A"), 

353 ).label("score") 

354 ) 

355 else: 

356 query = questions.add_columns(Score.score) 

357 

358 return query 

359 

360 

361def subsection_scoressummary( 

362 session: Session, 

363 user: User, 

364 project: Project, 

365 section: Section, 

366 scoreset_id: str = "", 

367) -> "Subquery": 

368 """ 

369 Subsections 

370 First create a subquery totalling up the counts of questions 

371 contained immediately in each section. 

372 Exclude all sections the score_user does not have permissions on 

373 """ 

374 sub = ( 

375 session.query( 

376 Section.b36_number.label("b36_number"), 

377 Issue.id.label("issue_id"), 

378 func.count(QuestionInstance.id).label("q_count"), 

379 func.count(Score.score).label("q_scored"), 

380 func.coalesce(func.sum(Score.score), 0).label("score"), 

381 ) 

382 .join(Project, Section.project) 

383 .join(Issue) 

384 .outerjoin( 

385 QuestionInstance, 

386 and_( 

387 QuestionInstance.project_id == Project.id, 

388 QuestionInstance.section_id == Section.id, 

389 ), 

390 ) 

391 .outerjoin( 

392 Score, 

393 and_( 

394 Score.scoreset_id == scoreset_id, 

395 QuestionInstance.id == Score.question_instance_id, 

396 Issue.id == Score.issue_id, 

397 ), 

398 ) 

399 .filter(Issue.project_id == project.id, Issue.scoreable_filter(project)) 

400 .group_by(Section.id, Issue.id, Section.b36_number) 

401 ) 

402 

403 if user.is_restricted: 

404 sub = sub.join( 

405 SectionPermission, 

406 and_( 

407 SectionPermission.user == user, 

408 Section.id == SectionPermission.section_id, 

409 ), 

410 ) 

411 

412 return sub.subquery() 

413 

414 

415def section_scoresummary( 

416 session: Session, 

417 user: User, 

418 project: Project, 

419 section: Section, 

420 sub: Union["Alias", "Subquery"], 

421) -> Query: 

422 """ 

423 create main query of all immediate subsections in this section 

424 and total up the totals from the subsection and child subsections 

425 from the subquery. No rows from the subquery indicates the score_user 

426 does not have access to any subsections, in this case a score of 'N/A' 

427 is returned. 

428 """ 

429 

430 subsections = ( 

431 session.query( 

432 Section.id.label("section_id"), 

433 Issue.id.label("issue_id"), 

434 cast(func.sum(sub.c.q_count), INTEGER).label("question_count"), 

435 cast(func.sum(sub.c.q_scored), INTEGER).label("questions_scored"), 

436 func.coalesce( 

437 cast(cast(func.sum(sub.c.score), INTEGER), VARCHAR), literal("N/A") 

438 ).label("score"), 

439 ) 

440 .join(Project, Section.project) 

441 .join(Issue) 

442 .outerjoin( 

443 sub, 

444 and_( 

445 sub.c.b36_number.startswith(Section.b36_number), 

446 Issue.id == sub.c.issue_id, 

447 ), 

448 ) 

449 .filter( 

450 Issue.project == project, 

451 Issue.scoreable_filter(project), 

452 Section.parent_id == section.id, 

453 ) 

454 .group_by(Section.id, Issue.id) 

455 ) 

456 

457 return subsections 

458 

459 

460def get_permission_for_scoreset(user, scoreset_id, to_save=False): 

461 if scoreset_id in (None, "") or scoreset_id != user.id: 

462 return ( 

463 perms.ISSUE_SAVE_AGREED_SCORES 

464 if to_save 

465 else perms.ISSUE_VIEW_AGREED_SCORES 

466 ) 

467 else: 

468 return perms.ISSUE_SAVE_SCORES if to_save else perms.ISSUE_VIEW_SCORES