Coverage for postrfp/buyer/api/endpoints/scoring.py: 98%

220 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Scoring endpoints: manage raw scores, comments, autoscores, summaries and calculated / 

3weighted aggregations. Empty scoreset_id ("") = agreed (consensus) set; non‑empty = individual. 

4""" 

5 

6from datetime import datetime 

7 

8from typing import NamedTuple 

9from decimal import Decimal 

10 

11from sqlalchemy.orm import Session 

12from sqlalchemy.orm.exc import NoResultFound 

13 

14from postrfp.authorisation import perms 

15from postrfp.shared import fetch, update, serial 

16from postrfp.shared.decorators import http 

17from postrfp.model import ( 

18 ScoreComment, 

19 AuditEvent, 

20 Score, 

21 Project, 

22 Issue, 

23 User, 

24 QuestionInstance, 

25) 

26from postrfp.shared.serial.common import ScoringModel 

27from postrfp.model.questionnaire.b36 import from_b36 

28 

29from postrfp.buyer.api import authorise 

30from postrfp.shared.exceptions import AuthorizationFailure 

31 

32 

33@http 

34def get_project_scores( 

35 session: Session, user: User, project_id: int, scoreset_id: str = "__default__" 

36) -> serial.ScoringData: 

37 """ 

38 Bulk export of all scores for a project in one score set (agreed or individual) for 

39 analytic use. "__default__" resolves to the agreed set. Permission: ISSUE_VIEW_AGREED_SCORES. 

40 """ 

41 project = fetch.project(session, project_id) 

42 authorise.check( 

43 user, perms.ISSUE_VIEW_AGREED_SCORES, project=project, deny_restricted=True 

44 ) 

45 scoreset_id = scoreset_id if scoreset_id != "__default__" else "" 

46 score_list = [] 

47 for s in fetch.scoring_data(project, scoreset_id=scoreset_id): 

48 score_dict = s._asdict() 

49 score_dict["number"] = from_b36(s.b36_number) 

50 score_list.append(score_dict) 

51 

52 return serial.ScoringData( 

53 scores=score_list, 

54 scoreset_id=scoreset_id, 

55 ) 

56 

57 

58@http 

59def get_question_scores( 

60 session: Session, user: User, question_id: int, scoreset_id: str = "" 

61): 

62 """ 

63 Scores for one question across all Issues in a specified score set (agreed or individual). 

64 Permission determined by score set context. 

65 """ 

66 question = fetch.question(session, question_id, with_project=True) 

67 section = question.section 

68 

69 question_filter = QuestionInstance.id == question.id 

70 

71 score_permission = fetch.get_permission_for_scoreset(user, scoreset_id) 

72 authorise.check( 

73 user, score_permission, project=question.project, section_id=section.id 

74 ) 

75 

76 return fetch.scores( 

77 session, section.project, section, scoreset_id, user, question_filter 

78 ) 

79 

80 

81@http 

82def post_question_score( 

83 session: Session, user: User, question_id: int, score_doc: serial.Score 

84) -> serial.Id: 

85 """ 

86 Create or update a single score (question + issue + score set). Validates range, 

87 enforces save permission, emits audit event (create/change). 

88 """ 

89 question = fetch.question(session, question_id, with_project=True) 

90 authorise.check( 

91 user, 

92 perms.PROJECT_VIEW_QUESTIONNAIRE, 

93 project=question.project, 

94 section_id=question.section_id, 

95 deny_restricted=False, 

96 ) 

97 project = question.project 

98 score, created = fetch.or_create_score(session, project, question, score_doc) 

99 initial_score_value = score.score 

100 score_value = score_doc.score_value 

101 Score.check_score_value(score_value, project) 

102 score.score = score_value 

103 

104 score_perm = fetch.get_permission_for_scoreset( 

105 user, score_doc.scoreset_id, to_save=True 

106 ) 

107 authorise.check( 

108 user, 

109 score_perm, 

110 project=question.project, 

111 section_id=question.section_id, 

112 deny_restricted=False, 

113 ) 

114 

115 # Need the Score record's ID for the audit event record, so flush 

116 session.flush() 

117 update.log_score_event(session, score, initial_score_value, created, project, user) 

118 return serial.Id(id=score.id) 

119 

120 

121@http 

122def get_project_section_issue_scores( 

123 session: Session, 

124 user: User, 

125 project_id: int, 

126 section_id: int, 

127 issue_id: int, 

128 scoreset_id: str = "", 

129): 

130 """ 

131 Scores for a single Issue within the scope of a Section (and underlying fetch rules), 

132 filtered by score set. Permission: derived from score set. 

133 """ 

134 project = fetch.project(session, project_id) 

135 section = fetch.section_of_project(project, section_id) 

136 issue = project.get_issue(issue_id) 

137 

138 score_permission = fetch.get_permission_for_scoreset(user, scoreset_id) 

139 authorise.check( 

140 user, score_permission, project=project, issue=issue, section_id=section.id 

141 ) 

142 

143 issue_filter = Issue.id == issue.id 

144 return fetch.scores(session, project, section, scoreset_id, user, issue_filter) 

145 

146 

147@http 

148def post_question_score_comment( 

149 session: Session, user: User, question_id: int, score_doc: serial.Score 

150) -> None: 

151 """ 

152 Add a comment to (and optionally update) a score. Creates score if needed. 

153 Emits SCORE_COMMENT_ADDED plus score change audit if value updated. 

154 """ 

155 question = fetch.question(session, question_id, with_project=True) 

156 authorise.check( 

157 user, 

158 perms.PROJECT_VIEW_QUESTIONNAIRE, 

159 project=question.project, 

160 section_id=question.section_id, 

161 ) 

162 project = question.project 

163 

164 score, created = fetch.or_create_score(session, project, question, score_doc) 

165 

166 to_save = False if score_doc.score_value is None else True 

167 

168 score_permission = fetch.get_permission_for_scoreset( 

169 user, score_doc.scoreset_id, to_save=to_save 

170 ) 

171 

172 authorise.check( 

173 user, 

174 score_permission, 

175 issue=score.issue, 

176 section_id=question.section_id, 

177 project=question.project, 

178 ) 

179 

180 if score_doc.score_value is not None: 

181 initial_score_value = score.score 

182 score_value = score_doc.score_value 

183 Score.check_score_value(score_value, project) 

184 score.score = score_value 

185 # Need the Score record's ID for the audit event record, so flush 

186 session.flush() 

187 update.log_score_event( 

188 session, score, initial_score_value, created, project, user 

189 ) 

190 

191 if score_doc.comment is not None: 

192 # make the comment and add to database 

193 comment = ScoreComment( 

194 score=score, 

195 comment_time=datetime.now(), 

196 user_id=user.id, 

197 comment_text=score_doc.comment, 

198 ) 

199 session.add(comment) 

200 session.flush() 

201 evt = AuditEvent.create( 

202 session, 

203 "SCORE_COMMENT_ADDED", 

204 object_id=comment.id, 

205 user=user, 

206 project=project, 

207 issue_id=score_doc.issue_id, 

208 question_id=question.id, 

209 ) 

210 evt.add_change("Comment", "", comment.comment_text) 

211 

212 session.add(evt) 

213 

214 

215def check_autoscore_permissions( 

216 project: Project, initiating_user: User, target_user: User 

217): 

218 if not project.multiscored: 

219 raise ValueError("Project must be using Multiple Score Sets") 

220 if target_user.organisation not in project.participants: 

221 m = f"User {target_user.id} not a participant in project {project.id}" 

222 raise AuthorizationFailure(m) 

223 target_user.check_permission(perms.ISSUE_SAVE_SCORES) 

224 initiating_user.check_permission(perms.ISSUE_SAVE_AGREED_SCORES) 

225 

226 

227@http 

228def get_project_calcautoscores( 

229 session: Session, user: User, project_id: int, target_user: User 

230): 

231 """ 

232 Preview autoscores (not persisted) for a target scorer’s set. Requires: Live project, 

233 multiscored, participant target, proper save/agreed permissions. 

234 """ 

235 project = fetch.project(session, project_id) 

236 

237 if not project.status_name == "Live": 

238 raise ValueError("Project must be live to generate autoscores") 

239 

240 check_autoscore_permissions(project, user, target_user) 

241 ascores = fetch.generate_autoscores(project, session, target_user) 

242 return list(ascores.values()) 

243 

244 

245@http 

246def post_project_calcautoscores( 

247 session: Session, user: User, project_id: int, target_user: User 

248): 

249 """ 

250 Persist autoscores into a target scorer’s set. Skips unchanged values. Emits create/change 

251 audit events (autoscore flagged). Same preconditions as preview. 

252 """ 

253 project = fetch.project(session, project_id) 

254 

255 if project.status_name != "Live": 

256 raise ValueError("Project must be live to generate autoscores") 

257 

258 check_autoscore_permissions(project, user, target_user) 

259 existing_scores = fetch.scores_dict_scoreset(project, target_user.id) 

260 

261 # Collect all scores for batch processing 

262 scores_to_update = [] 

263 scores_to_create = [] 

264 

265 for auto_key, autoscore_entry in fetch.generate_autoscores( 

266 project, session, target_user 

267 ).items(): 

268 if auto_key in existing_scores: 

269 score = existing_scores[auto_key] 

270 # More efficient score comparison - avoid int conversion if possible 

271 new_score = autoscore_entry.score 

272 if score.score is not None and score.score == new_score: 

273 continue 

274 initial_score_value = score.score 

275 score.score = Decimal(new_score) 

276 scores_to_update.append((score, initial_score_value)) 

277 else: 

278 score = Score( 

279 question_instance_id=autoscore_entry.question_id, 

280 scoreset_id=autoscore_entry.scoreset_id, 

281 issue_id=autoscore_entry.issue_id, 

282 score=autoscore_entry.score, 

283 ) 

284 scores_to_create.append(score) 

285 

286 # Batch add all new scores 

287 if scores_to_create: 

288 session.add_all(scores_to_create) 

289 

290 # Need ID values for newly created score objects 

291 session.flush() 

292 

293 # Batch create audit events and comments 

294 update.log_score_events_batch( 

295 session, scores_to_update, scores_to_create, project, target_user 

296 ) 

297 

298 

299@http 

300def get_section_scoresummaries( 

301 session: Session, user: User, section_id: int, scoreset_id: str 

302) -> serial.ScoreSummary: 

303 """ 

304 Coverage & subtotal summary for a Section: per-subsection progress (questions vs scored) 

305 and per-question score snapshot. Used for completion dashboards. Permission: score set view. 

306 """ 

307 section = fetch.section(session, section_id) 

308 project = section.project 

309 permission = fetch.get_permission_for_scoreset(user, scoreset_id) 

310 authorise.check(user, permission, project=project, section_id=section.id) 

311 sub = fetch.subsection_scoressummary(session, user, project, section, scoreset_id) 

312 return serial.ScoreSummary( 

313 subsections=[ 

314 serial.SectionScore.model_validate(row) 

315 for row in fetch.section_scoresummary(session, user, project, section, sub) 

316 ], 

317 questions=[ 

318 serial.QuestionScore.model_validate(q) 

319 for q in fetch.question_scoresummary( 

320 session, user, project, section, scoreset_id 

321 ) 

322 ], 

323 ) 

324 

325 

326@http 

327def get_section_scores( 

328 session: Session, 

329 user: User, 

330 section_id: int, 

331 scoreset_id: str | None = None, 

332 weightset_id: int | None = None, 

333 scoring_model: str | None = None, 

334) -> serial.CalculatedScores: 

335 """ 

336 Calculated scores (immediate child subsections & questions) applying optional weighting 

337 and scoring model (default "Unweighted"). Returns per Issue breakdown + totals. 

338 """ 

339 from postrfp.shared.fetch.view_scoring import get_child_scores 

340 from postrfp.model.questionnaire.score_views import ( 

341 QuestionScoreComponent, 

342 SectionScoreComponent, 

343 ) 

344 

345 section = fetch.section(session, section_id) 

346 project = section.project 

347 authorise.check( 

348 user, 

349 perms.ISSUE_VIEW_AGREED_SCORES, 

350 project=project, 

351 section_id=section.id, 

352 deny_restricted=True, 

353 ) 

354 

355 # Set defaults 

356 scoreset_id = scoreset_id or "" 

357 scoring_model = scoring_model or "Unweighted" 

358 

359 # Get scores for immediate child questions and sections 

360 scores_data = get_child_scores( 

361 session=session, 

362 section=section, 

363 target_types=["question", "section"], 

364 scoreset_id=scoreset_id, 

365 weighting_set_id=weightset_id, 

366 scoring_model=scoring_model, 

367 ) 

368 

369 # Type the components properly 

370 question_components: list[QuestionScoreComponent] = [ 

371 comp 

372 for comp in scores_data.get("question", []) 

373 if isinstance(comp, QuestionScoreComponent) 

374 ] 

375 section_components: list[SectionScoreComponent] = [ 

376 comp 

377 for comp in scores_data.get("section", []) 

378 if isinstance(comp, SectionScoreComponent) 

379 ] 

380 

381 # Build the response structure 

382 score_dict = {} 

383 

384 # Group by issue_id 

385 all_issues = set() 

386 for q_comp in question_components: 

387 all_issues.add(q_comp.issue_id) 

388 for s_comp in section_components: 

389 all_issues.add(s_comp.issue_id) 

390 

391 for issue_id in all_issues: 

392 # Get question scores for this issue - keep as Decimal until final conversion 

393 issue_question_decimals = { 

394 q_comp.question_id: q_comp.get_calculated_score(scoring_model) 

395 for q_comp in question_components 

396 if q_comp.issue_id == issue_id and q_comp.raw_score is not None 

397 } 

398 

399 # Get section scores for this issue - keep as Decimal until final conversion 

400 issue_section_decimals = { 

401 s_comp.section_id: s_comp.get_calculated_score(scoring_model) 

402 for s_comp in section_components 

403 if s_comp.issue_id == issue_id and s_comp.raw_total is not None 

404 } 

405 

406 # Calculate total score using Decimal arithmetic 

407 total_decimal = ( 

408 sum(issue_question_decimals.values(), Decimal("0")) 

409 + sum(issue_section_decimals.values(), Decimal("0")) 

410 ).quantize(Decimal("0.0001")) 

411 

412 # Convert to float only at the very end for serialization 

413 issue_question_scores = { 

414 qid: float(score) for qid, score in issue_question_decimals.items() 

415 } 

416 issue_section_scores = { 

417 sid: float(score) for sid, score in issue_section_decimals.items() 

418 } 

419 

420 # Create the IssueScores object 

421 score_dict[issue_id] = serial.IssueScores( 

422 total_score=float(total_decimal), 

423 section_scores=issue_section_scores, 

424 question_scores=issue_question_scores, 

425 ) 

426 

427 return serial.CalculatedScores( 

428 scoring_model=ScoringModel(scoring_model), scores=score_dict 

429 ) 

430 

431 

432@http 

433def get_project_scoresets( 

434 session: Session, user: User, project_id: int, scoreset_id: str = "" 

435) -> list[serial.ScoreSet]: 

436 """ 

437 list accessible score sets. With ISSUE_VIEW_AGREED_SCORES returns all distinct user sets 

438 plus synthetic agreed set (__default__). Otherwise returns only caller’s own set. 

439 """ 

440 

441 project = fetch.project(session, project_id) 

442 

443 # VIEW_AGREED_SCORE permission allows a user to view other user's score sets 

444 if user.has_permission(perms.ISSUE_VIEW_AGREED_SCORES): 

445 sq = ( 

446 session.query(Score.scoreset_id, User.fullname) 

447 .join(Issue) 

448 .outerjoin(User, Score.scoreset_id == User.id) 

449 .filter(Issue.project == project, Score.scoreset_id != "") 

450 .distinct() 

451 ) 

452 

453 sc = [serial.ScoreSet.model_validate(row) for row in sq] 

454 sc.append( 

455 serial.ScoreSet(scoreset_id="__default__", fullname="Agreed Scoring Set") 

456 ) 

457 return sc 

458 

459 else: 

460 user.check_permission(perms.ISSUE_VIEW_SCORES) 

461 return [serial.ScoreSet(scoreset_id=user.id, fullname=user.fullname)] 

462 

463 

464@http 

465def get_question_issue_comments( 

466 session: Session, user: User, question_id: int, issue_id: int, scoreset_id: str = "" 

467): 

468 """ 

469 All comments (chronological) for a specific score (question + issue + score set). 

470 Empty list if score not yet created. Requires view permissions. 

471 """ 

472 question = fetch.question(session, question_id, with_project=True) 

473 authorise.check( 

474 user, 

475 perms.PROJECT_VIEW_QUESTIONNAIRE, 

476 project=question.project, 

477 section_id=question.section_id, 

478 ) 

479 issue = question.project.get_issue(issue_id) 

480 

481 score_permission = fetch.get_permission_for_scoreset(user, scoreset_id) 

482 

483 authorise.check( 

484 user, 

485 score_permission, 

486 issue=issue, 

487 section_id=question.section_id, 

488 project=question.project, 

489 ) 

490 

491 try: 

492 score = ( 

493 session.query(Score) 

494 .filter( 

495 Score.question_instance_id == question.id, 

496 Score.issue_id == issue.id, 

497 Score.scoreset_id == scoreset_id, 

498 ) 

499 .one() 

500 ) 

501 except NoResultFound: 

502 return [] 

503 

504 return [comment.as_dict() for comment in score.comments] 

505 

506 

507class ScoreData(NamedTuple): 

508 score: Score 

509 initial_score_value: Decimal | None 

510 created: bool 

511 

512 

513@http 

514def post_section_scoreset_scores( 

515 session: Session, 

516 user: User, 

517 section_id: int, 

518 scoreset_id: str, 

519 section_score_docs: serial.SectionScoreDocs, 

520) -> list[serial.Id]: 

521 """ 

522 Bulk upsert of multiple scores across questions & issues within a Section for one score set. 

523 Emits per-score audit events. Fails fast on permission or value validation errors. 

524 """ 

525 

526 section = fetch.section(session, section_id) 

527 project = section.project 

528 authorise.check( 

529 user, 

530 perms.PROJECT_VIEW_QUESTIONNAIRE, 

531 project=project, 

532 section_id=section_id, 

533 deny_restricted=False, 

534 ) 

535 

536 score_perm = fetch.get_permission_for_scoreset(user, scoreset_id, to_save=True) 

537 authorise.check( 

538 user, score_perm, project=project, section_id=section_id, deny_restricted=False 

539 ) 

540 

541 data: list[ScoreData] = [] 

542 

543 # Collect all score requests for batch processing 

544 score_requests = [] 

545 score_docs_mapping = [] 

546 

547 for doc in section_score_docs.root: 

548 question = fetch.question_of_section(session, section_id, doc.question_id) 

549 for score_doc in doc.scores: 

550 issue = project.get_issue(score_doc.issue_id) 

551 sd = fetch.ScoreTuple( 

552 issue_id=issue.id, 

553 score_value=score_doc.score_value, 

554 scoreset_id=scoreset_id, 

555 ) 

556 score_requests.append((question, sd)) 

557 score_docs_mapping.append(sd) 

558 

559 # Process all scores in batch 

560 batch_results = fetch.or_create_scores_batch(session, project, score_requests) 

561 

562 # Build data list with results 

563 for i, (score, created) in enumerate(batch_results): 

564 sd = score_docs_mapping[i] 

565 initial_score_value = score.score 

566 score_value = sd.score_value 

567 Score.check_score_value(score_value, project) 

568 score.score = score_value 

569 data.append( 

570 ScoreData( 

571 score=score, 

572 initial_score_value=initial_score_value, 

573 created=created, 

574 ) 

575 ) 

576 

577 session.flush() 

578 

579 # Separate scores by created vs updated for batch processing 

580 scores_to_create = [] 

581 scores_to_update = [] 

582 

583 for item in data: 

584 if item.created: 

585 scores_to_create.append(item.score) 

586 else: 

587 scores_to_update.append((item.score, item.initial_score_value)) 

588 

589 # Use batch processing only if there are multiple scores, otherwise use individual logging 

590 total_scores = len(scores_to_create) + len(scores_to_update) 

591 if total_scores > 3: # Threshold for batch processing 

592 # Batch create audit events for all scores 

593 update.log_score_events_batch( 

594 session, 

595 scores_to_update, 

596 scores_to_create, 

597 project, 

598 user, 

599 add_comments=False, 

600 ) 

601 else: 

602 # Use individual logging for small batches to avoid overhead 

603 for item in data: 

604 update.log_score_event( 

605 session, 

606 item.score, 

607 item.initial_score_value, 

608 item.created, 

609 project, 

610 user, 

611 ) 

612 

613 # Build return list of score IDs 

614 score_ids: list[serial.Id] = [] 

615 for item in data: 

616 score_ids.append(serial.Id(id=item.score.id)) 

617 

618 return score_ids