Coverage for postrfp/shared/update.py: 96%

310 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Functions that perform SQL Update queries 

3""" 

4 

5from copy import deepcopy 

6from decimal import Decimal 

7from datetime import datetime 

8from typing import TYPE_CHECKING 

9 

10if TYPE_CHECKING: 

11 from sqlalchemy.orm import Session 

12 

13from sqlalchemy.sql.elements import literal 

14from sqlalchemy.sql.expression import insert, select 

15from sqlalchemy import delete 

16 

17from sqlalchemy import func 

18from sqlalchemy.orm import ( 

19 object_session, 

20 noload, 

21) 

22from sqlalchemy.orm.exc import NoResultFound 

23 

24from postrfp.model import ( 

25 Answer, 

26 QuestionResponseState, 

27 ResponseStatus, 

28 Participant, 

29 ProjectPermission, 

30 SectionPermission, 

31 User, 

32 Issue, 

33 Project, 

34 QElement, 

35 AuditEvent, 

36 ScoreComment, 

37 QuestionInstance, 

38 Section, 

39 Score, 

40 QuestionDefinition, 

41 ImportType, 

42 Weighting, 

43 WeightingSet, 

44) 

45from postrfp.model.audit import evt_types, Status as EventStatus 

46 

47from postrfp.model.exc import CosmeticQuestionEditViolation 

48from postrfp.shared.exceptions import AuthorizationFailure 

49from postrfp.shared import serial 

50from postrfp.model.questionnaire.b36 import from_b36 

51from .fetch import generate_autoscores 

52 

53 

54def grant_project_permission(session: "Session", project: Project, user: User): 

55 """ 

56 Grants a (restricted) User permissions for the given 

57 project. Returns the newly created ProjectPermission instance 

58 """ 

59 if not user.is_restricted: 

60 raise ValueError( 

61 "Assigning ProjectPermission to a " + "non-restricted user has no effect" 

62 ) 

63 try: 

64 participant = ( 

65 session.query(Participant) 

66 .filter_by(project_id=project.id, organisation=user.organisation) 

67 .one() 

68 ) 

69 pp = ProjectPermission() 

70 pp.participant = participant 

71 pp.user = user 

72 session.add(pp) 

73 except NoResultFound: 

74 m = "User {user} is not a Participant in project {project}" 

75 raise AuthorizationFailure(message=m) 

76 return pp 

77 

78 

79def grant_section_permissions( 

80 session: "Session", project: Project, user: User, section_id_list: list[int] 

81): 

82 """ 

83 Grants the given user access to sections in the given project. 

84 A ProjectPermission for this user/project is created if it doesn't exist 

85 """ 

86 try: 

87 project_permission = project.permissions.filter_by(user_id=user.id).one() 

88 except NoResultFound: 

89 project_permission = grant_project_permission(session, project, user) 

90 

91 for sec_id in section_id_list: 

92 sp = SectionPermission( 

93 section_id=sec_id, user=user, project_permission=project_permission 

94 ) 

95 session.add(sp) 

96 

97 

98def _update_or_create_weighting( 

99 session: "Session", 

100 entity_id: int, 

101 weight_value: Decimal, 

102 lookup: dict, 

103 valid_id_set: set, 

104 save_func, 

105 project_id: int, 

106 entity_type: str, 

107 weighting_kwargs: dict, 

108): 

109 """ 

110 Helper function to update or create a weighting with sparse storage logic. 

111 

112 Args: 

113 session: Database session 

114 entity_id: ID of the question or section 

115 weight_value: The weight value to set 

116 lookup: Dict mapping entity IDs to existing Weighting objects 

117 valid_id_set: Set of valid entity IDs for this project 

118 save_func: Function to save new Weighting objects 

119 project_id: ID of the project 

120 entity_type: Type of entity ("Question" or "Section") for error messages 

121 weighting_kwargs: Dict of kwargs to pass to Weighting constructor 

122 """ 

123 if entity_id in lookup: 

124 # Existing weight record found 

125 if weight_value == 1: 

126 # Delete record for sparse storage (weight = 1 is default) 

127 session.delete(lookup[entity_id]) 

128 else: 

129 # Update to non-default weight 

130 lookup[entity_id].value = weight_value 

131 elif entity_id in valid_id_set: 

132 # No existing record, only create if non-default weight 

133 if weight_value != 1: 

134 save_func(Weighting(**weighting_kwargs, value=weight_value)) 

135 # If weight_value == 1, do nothing (sparse: no record needed for default) 

136 else: 

137 raise ValueError( 

138 f"{entity_type} ID {entity_id} does not belong to project {project_id}" 

139 ) 

140 

141 

142def save_weightset_weightings( 

143 session: "Session", weightset: WeightingSet, weights_doc: serial.WeightingsDoc 

144): 

145 """ 

146 Save weightings to a weighting set using sparse storage approach. 

147 

148 - Weight records are only stored for non-default values (anything != 1) 

149 - Setting a weight to 1 removes any existing weight record (sparse deletion) 

150 - Missing weight records automatically default to 1 via fallback logic 

151 

152 Behavior: 

153 - Updates existing weight records when new value != 1 

154 - Deletes existing weight records when new value == 1 (sparse approach) 

155 - Creates new weight records only when new value != 1 

156 - Ignores requests to set weight = 1 on non-existing records (already sparse) 

157 """ 

158 

159 q_lookup = {} 

160 sec_lookup = {} 

161 

162 for weighting in weightset.weightings: 

163 if weighting.section_id: 

164 sec_lookup[weighting.section_id] = weighting 

165 else: 

166 q_lookup[weighting.question_instance_id] = weighting 

167 

168 save = weightset.weightings.append # reference to bound method 

169 

170 q = session.query(QuestionInstance.id).filter( 

171 QuestionInstance.project_id == weightset.project_id 

172 ) 

173 qid_set = {qi.id for qi in q} 

174 project_id = weightset.project_id 

175 

176 for question_weight in weights_doc.questions: 

177 question_id = question_weight.question_instance_id 

178 qweight = Decimal(str(question_weight.weight)) 

179 

180 _update_or_create_weighting( 

181 session=session, 

182 entity_id=question_id, 

183 weight_value=qweight, 

184 lookup=q_lookup, 

185 valid_id_set=qid_set, 

186 save_func=save, 

187 project_id=project_id, 

188 entity_type="Question", 

189 weighting_kwargs={"question_instance_id": question_id}, 

190 ) 

191 

192 sq = session.query(Section).filter_by(project_id=project_id) 

193 sec_id_set = {s.id for s in sq} 

194 

195 for sw in weights_doc.sections: 

196 section_id = sw.section_id 

197 sec_weight = Decimal(str(sw.weight)) 

198 

199 _update_or_create_weighting( 

200 session=session, 

201 entity_id=section_id, 

202 weight_value=sec_weight, 

203 lookup=sec_lookup, 

204 valid_id_set=sec_id_set, 

205 save_func=save, 

206 project_id=project_id, 

207 entity_type="Section", 

208 weighting_kwargs={"section_id": section_id}, 

209 ) 

210 

211 

212def save_default_weightings( 

213 session, project: Project, weights_doc: serial.WeightingsDoc 

214): 

215 """ 

216 Save weightings to the project's default weighting set. 

217 The method explicitly creates one if needed. 

218 """ 

219 # Get the default weighting set ID (creates one if it doesn't exist) 

220 weighting_set_id = project.get_or_create_default_weighting_set_id() 

221 

222 # Get the weighting set object for further operations 

223 default_ws = session.get(WeightingSet, weighting_set_id) 

224 

225 # Initialize with default values of 1 if this is a new weighting set 

226 if default_ws.weightings.count() == 0: 

227 set_initial_weightings(default_ws, Decimal(1)) 

228 session.flush() 

229 

230 # Now delegate to the regular weightset saving function 

231 save_weightset_weightings(session, default_ws, weights_doc) 

232 

233 

234def set_initial_weightings(weighting_set: WeightingSet, initial_value: "Decimal"): 

235 """ 

236 Create Weighting records for each QuestionInstance and Section for weighting_set.project_id 

237 and weighting_set.weighting_set_id 

238 """ 

239 session = object_session(weighting_set) 

240 assert session is not None 

241 

242 qs = select( 

243 literal(weighting_set.id), QuestionInstance.id, literal(initial_value) 

244 ).where(QuestionInstance.project_id == weighting_set.project_id) 

245 qi = insert(Weighting).from_select( 

246 ["weighting_set_id", "question_instance_id", "value"], qs 

247 ) 

248 

249 session.execute(qi) 

250 

251 ss = select(literal(weighting_set.id), Section.id, literal(initial_value)).where( 

252 Section.project_id == weighting_set.project_id 

253 ) 

254 si = insert(Weighting).from_select(["weighting_set_id", "section_id", "value"], ss) 

255 

256 session.execute(si) 

257 

258 

259def copy_weightings( 

260 source_weighting_set: WeightingSet, destination_weighting_set: WeightingSet 

261): 

262 """ 

263 Copy Weighting values from source weighting set to destination 

264 """ 

265 session = object_session(destination_weighting_set) 

266 s = select( 

267 literal(destination_weighting_set.id), 

268 Weighting.section_id, 

269 Weighting.question_instance_id, 

270 Weighting.value, 

271 ).where(Weighting.weighting_set_id == source_weighting_set.id) 

272 

273 i = insert(Weighting).from_select( 

274 ["weighting_set_id", "section_id", "question_instance_id", "value"], s 

275 ) 

276 assert session is not None 

277 session.execute(i) 

278 

279 

280def reset_scores(session: "Session", project: Project, user: User): 

281 """ 

282 Deletes all scores & score comments for the given project. 

283 Recreates Autoscores for multiple choice questions 

284 """ 

285 stmt = delete(ScoreComment).where( 

286 ScoreComment.score_id.in_( 

287 select(Score.id).join(Issue).where(Issue.project_id == project.id) 

288 ) 

289 ) 

290 session.execute(stmt) 

291 

292 issue_map = {} 

293 

294 for issue in project.scoreable_issues: 

295 del_count = issue.scores.delete() 

296 issue_map[issue.id] = dict(respondent_id=issue.respondent_id, deleted=del_count) 

297 new_counts: dict[int, int] = {} 

298 

299 for ascore in generate_autoscores(project, session, user).values(): 

300 issue_id = ascore.issue_id 

301 score = Score( 

302 question_instance_id=ascore.question_id, 

303 issue_id=issue_id, 

304 score=ascore.score, 

305 ) 

306 if issue_id in new_counts: # pragma: no cover 

307 new_counts[issue_id] = new_counts[issue_id] + 1 

308 else: 

309 new_counts[issue_id] = 1 

310 session.add(score) 

311 

312 for issue_id, new_count in new_counts.items(): 

313 issue_map[issue_id]["added"] = new_count 

314 return issue_map 

315 

316 

317def save_answers( 

318 session: "Session", 

319 user: User, 

320 question: QuestionInstance, 

321 answer_lookup: dict, 

322 issue: Issue, 

323 imported: bool = False, 

324 set_done: bool = False, 

325): 

326 res = question.validate_and_save_answers(answer_lookup, issue) 

327 if res.change_list is None or len(res.change_list) == 0: 

328 return False 

329 

330 response_state: QuestionResponseState = issue.response_state_for_q(question.id) 

331 response_state.date_updated = datetime.now() 

332 response_state.updated_by = user.id 

333 

334 if res.unanswered_mandatory: 

335 response_state.status = ResponseStatus.NOT_ANSWERED 

336 else: 

337 response_state.status = ResponseStatus.ANSWERED 

338 

339 evt_type = evt_types.ANSWER_CREATED if res.is_new else evt_types.ANSWER_UPDATED 

340 if imported: 

341 evt_type = evt_types.ANSWER_IMPORTED 

342 

343 evt = AuditEvent.create( 

344 session, 

345 evt_type, 

346 object_id=response_state.id, 

347 user=user, 

348 project_id=issue.project_id, 

349 issue=issue, 

350 question_id=question.id, 

351 ) 

352 if set_done: 

353 evt.status = EventStatus.done 

354 if imported: 

355 evt.add_change("Import Source", issue.project.title, "") 

356 for old_value, new_value in res.change_list: 

357 evt.add_change("Answer", old_value, new_value) 

358 session.add(evt) 

359 

360 return True 

361 

362 

363def import_section( 

364 session: "Session", src_sec: Section, des_sec: Section, type: ImportType 

365) -> tuple[list[Section], list[QuestionInstance]]: 

366 """Import Section from another Project""" 

367 imp_secs: list[Section] = [] 

368 imp_qis: list[QuestionInstance] = [] 

369 new_sec = Section( 

370 title=src_sec.title, 

371 description=src_sec.description, 

372 project_id=des_sec.project_id, 

373 ) 

374 

375 des_sec.subsections.append(new_sec) 

376 imp_secs.append(new_sec) 

377 for qi in src_sec.questions: 

378 new_qi = import_q_instance(qi, new_sec, type) 

379 imp_qis.append(new_qi) 

380 

381 for sec in src_sec.subsections: 

382 secs, ques = import_section(session, sec, new_sec, type) 

383 imp_secs += secs 

384 imp_qis += ques 

385 

386 return imp_secs, imp_qis 

387 

388 

389def import_q_instance( 

390 src_qi: QuestionInstance, des_sec: Section, type: ImportType 

391) -> QuestionInstance: 

392 """Import question instances from another Project""" 

393 src_qdef = src_qi.question_def 

394 des_qdef = src_qi.question_def 

395 des_sec_quesions = des_sec.questions 

396 

397 if type == ImportType.COPY: 

398 des_qdef = QuestionDefinition( 

399 title=src_qdef.title, 

400 refcount=1, 

401 parent_id=src_qdef.id, 

402 ) 

403 

404 for src_el in src_qdef.elements: 

405 des_element = QElement( 

406 row=src_el.row, 

407 col=src_el.col, 

408 colspan=src_el.colspan, 

409 rowspan=src_el.rowspan, 

410 el_type=src_el.el_type, 

411 label=src_el.label, 

412 mandatory=src_el.mandatory, 

413 width=src_el.width, 

414 height=src_el.height, 

415 multitopic=src_el.multitopic, 

416 regexp=src_el.regexp, 

417 choices=src_el.choices, 

418 ) 

419 des_qdef.elements.append(des_element) 

420 elif type == ImportType.SHARE: 

421 des_qdef.refcount += 1 

422 

423 des_qi: QuestionInstance = QuestionInstance( 

424 project_id=des_sec.project_id, 

425 section_id=des_sec.id, 

426 question_def=des_qdef, 

427 ) 

428 des_sec_quesions.append(des_qi) 

429 return des_qi 

430 

431 

432def copy_q_definition(original_qdef: QuestionDefinition, session: "Session"): 

433 """Make a copy of the QuestionDefinition and all associated question elements""" 

434 

435 new_qdef = QuestionDefinition() 

436 new_qdef.title = original_qdef.title 

437 new_qdef.refcount = 1 

438 new_qdef.parent_id = original_qdef.id 

439 

440 for el in original_qdef.elements: 

441 new_el = QElement( 

442 row=el.row, 

443 col=el.col, 

444 colspan=el.colspan, 

445 rowspan=el.rowspan, 

446 el_type=el.el_type, 

447 label=el.label, 

448 mandatory=el.mandatory, 

449 width=el.width, 

450 height=el.height, 

451 multitopic=el.multitopic, 

452 regexp=el.regexp, 

453 choices=el.choices, 

454 ) 

455 new_qdef.elements.append(new_el) 

456 

457 return new_qdef 

458 

459 

460def delete_qinstances_update_def_refcounts( 

461 session: "Session", project_id, section_id: int | None = None 

462): 

463 """ 

464 Delete question instances, orphan question definitions and update refcount 

465 on remaining question definitions for the given project, optionally filtering 

466 by section_id 

467 """ 

468 

469 def qf(q): 

470 q = q.filter(QuestionInstance.project_id == project_id) 

471 if section_id is not None: 

472 q = q.filter(QuestionInstance.section_id == section_id) 

473 return q 

474 

475 qiq = qf(session.query(QuestionInstance.question_def_id.label("id"))) 

476 

477 qi_set = {r.id for r in qiq} 

478 # Can't delete question defs if question instances remain, so delete 

479 # instances first, having saved the relevant IDs 

480 qf(session.query(QuestionInstance)).delete(synchronize_session=False) 

481 

482 # Delete question defs in this project with a refcount of one - not shared 

483 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter( 

484 QuestionDefinition.refcount == 1 

485 ).delete(synchronize_session=False) 

486 

487 # Decrement refcount for all remaining question definitions 

488 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter( 

489 QuestionDefinition.refcount > 1 

490 ).update({"refcount": QuestionDefinition.refcount - 1}, synchronize_session=False) 

491 

492 

493def log_score_event( 

494 session: "Session", 

495 score: Score, 

496 initial_score_value, 

497 is_new: bool, 

498 project: "Project", 

499 user: User, 

500 autoscore=False, 

501): 

502 event_class = "SCORE_CREATED" if is_new else "SCORE_UPDATED" 

503 

504 evt = AuditEvent.create( 

505 session, 

506 event_class, 

507 project=project, 

508 issue_id=score.issue_id, 

509 user_id=user.id, 

510 org_id=user.organisation.id, 

511 object_id=score.id, 

512 private=True, 

513 question_id=score.question_id, 

514 ) 

515 evt.add_change("Score", initial_score_value, score.score) 

516 

517 session.add(evt) 

518 

519 if autoscore: 

520 msg = "Autoscore Calculated for Multiple Choice Question" 

521 comment = ScoreComment( 

522 score=score, 

523 user=user, 

524 comment_time=datetime.now(), 

525 comment_text=msg, 

526 ) 

527 session.add(comment) 

528 session.flush() 

529 

530 kw = dict( 

531 project=project, 

532 issue_id=score.issue_id, 

533 user_id=user.id, 

534 org_id=user.organisation.id, 

535 object_id=comment.id, 

536 private=True, 

537 question_id=score.question_id, 

538 ) 

539 cmnt_evt = AuditEvent.create(session, "SCORE_COMMENT_ADDED", **kw) 

540 cmnt_evt.add_change("Comment", "", msg) 

541 session.add(cmnt_evt) 

542 

543 

544def label_text( 

545 project: Project, search_term: str, replace_term: str = "", dry_run=True 

546): 

547 q = ( 

548 project.qelements.filter(QElement.el_type.in_(("LB", "CB"))) 

549 .filter(QElement.label.collate("utf8mb4_bin").like(f"%{search_term}%")) 

550 .add_columns(QuestionInstance.b36_number) 

551 ) 

552 

553 for label, qnum in q: 

554 old_label = label.label 

555 new_label = label.label.replace(search_term, replace_term) 

556 if not dry_run: 

557 label.label = new_label 

558 yield dict( 

559 change_type="label", 

560 question_number=from_b36(qnum), 

561 new=new_label, 

562 old=old_label, 

563 ) 

564 

565 

566def question_titles( 

567 project: Project, search_term: str, replace_term: str = "", dry_run=True 

568): 

569 session = object_session(project) 

570 assert session is not None 

571 

572 q = ( 

573 session.query(QuestionInstance) 

574 .join(QuestionDefinition) 

575 .options(noload(QuestionInstance.question_def, QuestionDefinition.elements)) 

576 .filter(QuestionInstance.project_id == project.id) 

577 .filter( 

578 QuestionDefinition.title.collate("utf8mb4_bin").like(f"%{search_term}%") 

579 ) 

580 ) 

581 

582 for qi in q: 

583 qdef = qi.question_def 

584 old_title = qdef.title 

585 new_title = qdef.title.replace(search_term, replace_term) 

586 if not dry_run: 

587 qdef.title = new_title 

588 yield dict( 

589 change_type="title", 

590 question_number=qi.number, 

591 new=new_title, 

592 old=old_title, 

593 ) 

594 

595 

596def choices_text( 

597 project: Project, search_term: str, replace_term: str = "", dry_run=True 

598): 

599 q = ( 

600 project.qelements.filter(QElement.el_type.in_(("CR", "CC"))) 

601 .filter(func.json_search(QElement.choices, "one", search_term) != None) # noqa: E711 

602 .add_columns(QuestionInstance.b36_number) 

603 ) 

604 

605 for el, qnum in q: 

606 new_choices = deepcopy(el.choices) 

607 old_labels = [c["label"] for c in el.choices] 

608 for choice in new_choices: 

609 choice["label"] = choice["label"].replace(search_term, replace_term) 

610 new_labels = [c["label"] for c in new_choices] 

611 

612 if not dry_run: 

613 el.choices = new_choices 

614 

615 yield dict( 

616 change_type="choice", 

617 question_number=from_b36(qnum), 

618 old=old_labels, 

619 new=new_labels, 

620 ) 

621 

622 

623def pretty_choices(choices: list[dict]) -> str: 

624 if not isinstance(choices, list): 

625 return str(choices) 

626 txt = "" 

627 for c in choices: 

628 auto = (f" <{c['autoscore']}>") if c.get("autoscore", False) else "" 

629 txt += f" - {c['label']}{auto}\n" 

630 return txt 

631 

632 

633def update_create_qdef( 

634 qdef: QuestionDefinition, evt: AuditEvent, el_map: dict, el_dict: dict 

635): 

636 """ 

637 Update question elements where ID values are provided; add new elements if no id provided 

638 

639 Updated ids are removed from el_map - thus any removing elements are to be deleted 

640 """ 

641 mutable_attrs = { 

642 "colspan", 

643 "rowspan", 

644 "label", 

645 "mandatory", 

646 "regexp", 

647 "height", 

648 "width", 

649 "row", 

650 "col", 

651 "choices", 

652 } 

653 if el_dict.get("id", None) is not None: 

654 # Update existing element 

655 old_el = el_map.pop(el_dict["id"]) 

656 for attr in mutable_attrs & el_dict.keys(): 

657 new_val = el_dict[attr] 

658 old_val = getattr(old_el, attr) 

659 if new_val != old_val: 

660 setattr(old_el, attr, new_val) 

661 evt_name = f"{old_el.__class__.__name__} #{old_el.id}, {attr.title()}" 

662 if attr == "choices" and el_dict["el_type"] in ("CR", "CC"): 

663 old_val = pretty_choices(old_val) 

664 new_val = pretty_choices(new_val) 

665 evt.add_change(evt_name, old_val, new_val) 

666 else: 

667 # A new element 

668 el_type = el_dict.pop("el_type") 

669 new_el = qdef.add_element(el_type, **el_dict) 

670 evt_name = f"{new_el.__class__.__name__} Added" 

671 evt.add_change(evt_name, None, new_el.summary) 

672 

673 

674def check_for_saved_answers(session: "Session", qdef: QuestionDefinition, el_map: dict): 

675 """ 

676 If there are elements to delete with associated answers raise 

677 @raises CosmeticQuestionEditViolation 

678 """ 

679 if (not qdef.is_shared) or (len(el_map) == 0): 

680 return 

681 answer_count = ( 

682 session.query(Answer).filter(Answer.element_id.in_(el_map.keys())).count() 

683 ) 

684 if answer_count > 0: 

685 del_ids = ", ".join(str(el_id) for el_id in el_map.keys()) 

686 m = f"Cannot delete question elements that have associated answers. Element IDs: {del_ids}" 

687 raise CosmeticQuestionEditViolation(m) 

688 

689 

690def delete_project_section( 

691 session: "Session", user: User, project: Project, section: Section 

692): 

693 """ 

694 Delete the given Section and all questions and subsections contained within that 

695 section. 

696 """ 

697 

698 for descendant_section in section.descendants: 

699 delete_qinstances_update_def_refcounts( 

700 session, project.id, section_id=descendant_section.id 

701 ) 

702 session.delete(descendant_section) 

703 

704 delete_qinstances_update_def_refcounts(session, project.id, section_id=section.id) 

705 session.delete(section) 

706 

707 evt = AuditEvent.create( 

708 session, 

709 evt_types.SECTION_DELETED, 

710 project=project, 

711 user_id=user.id, 

712 org_id=user.organisation.id, 

713 object_id=section.id, 

714 private=True, 

715 ) 

716 session.add(evt) 

717 

718 

719def delete_project_section_question( 

720 session: "Session", 

721 user: User, 

722 project: Project, 

723 section: Section, 

724 qi: QuestionInstance, 

725): 

726 """ 

727 Delete the Question with the given ID 

728 

729 The return value is an array of remaining instances of the same question that may exist 

730 in other projects 

731 

732 """ 

733 

734 evt = AuditEvent.create( 

735 session, 

736 "QUESTION_DELETED", 

737 project=project, 

738 user_id=user.id, 

739 org_id=user.organisation.id, 

740 object_id=qi.id, 

741 private=True, 

742 question_id=qi.id, 

743 ) 

744 evt.add_change("Title", qi.title, None) 

745 evt.add_change("Number", qi.number, None) 

746 session.add(evt) 

747 

748 instances_remaining = [] 

749 with session.no_autoflush: 

750 qdef = qi.question_def 

751 session.delete(qi) 

752 

753 qdef.refcount -= 1 

754 if qdef.refcount == 0: 

755 session.delete(qdef) 

756 else: 

757 instances_remaining = [ 

758 dict(project_id=project.id, number=qi.number, id=qi.id) 

759 for qi in qdef.instances 

760 ] 

761 section.renumber() 

762 return instances_remaining 

763 

764 

765def batch_create_audit_events( 

766 session: "Session", 

767 event_specs: list[dict], 

768 related_objects: list | None = None, 

769): 

770 """ 

771 Generic function to batch create audit events and related objects. 

772 

773 This function provides a standardised way to create multiple audit events 

774 efficiently, reducing database roundtrips and improving performance. 

775 

776 Args: 

777 session: Database session 

778 event_specs: List of dictionaries containing event specifications: 

779 { 

780 'event_type': str, # e.g., 'SCORE_CREATED', 'ANSWER_UPDATED' 

781 'changes': list[tuple[str, old_value, new_value]], # Optional 

782 'kwargs': dict # Additional kwargs for AuditEvent.create 

783 } 

784 related_objects: Optional list of related objects to add (e.g., comments) 

785 

786 Returns: 

787 list of created AuditEvent objects 

788 

789 Example usage: 

790 # Batch create multiple answer events 

791 event_specs = [ 

792 { 

793 'event_type': 'ANSWER_UPDATED', 

794 'changes': [('Answer', 'old_value', 'new_value')], 

795 'kwargs': { 

796 'project': project, 

797 'user_id': user.id, 

798 'object_id': answer.id, 

799 'question_id': question.id 

800 } 

801 }, 

802 # ... more event specs 

803 ] 

804 events = batch_create_audit_events(session, event_specs) 

805 """ 

806 from postrfp.model import AuditEvent 

807 

808 # Batch create all audit events 

809 audit_events = [] 

810 for spec in event_specs: 

811 evt = AuditEvent.create(session, spec["event_type"], **spec["kwargs"]) 

812 

813 # Add all changes for this event 

814 for change_name, old_val, new_val in spec.get("changes", []): 

815 evt.add_change(change_name, old_val, new_val) 

816 

817 audit_events.append(evt) 

818 

819 # Batch add all events 

820 session.add_all(audit_events) 

821 

822 # Add any related objects 

823 if related_objects: 

824 session.add_all(related_objects) 

825 

826 return audit_events 

827 

828 

829def batch_log_answer_events( 

830 session: "Session", 

831 answer_changes: list[tuple], # (answer, old_value, new_value, is_new) 

832 project: Project, 

833 user: User, 

834 question: QuestionInstance, 

835): # pragma: no cover 

836 """ 

837 Batch log answer update/create events using the generic batch function. 

838 

839 Example of how the generic function can be used for different event types. 

840 """ 

841 event_specs = [] 

842 

843 for answer, old_value, new_value, is_new in answer_changes: 

844 event_type = "ANSWER_CREATED" if is_new else "ANSWER_UPDATED" 

845 event_specs.append( 

846 { 

847 "event_type": event_type, 

848 "changes": [("Answer", old_value, new_value)], 

849 "kwargs": { 

850 "project": project, 

851 "user_id": user.id, 

852 "org_id": user.organisation.id, 

853 "object_id": answer.id, 

854 "question_id": question.id, 

855 }, 

856 } 

857 ) 

858 

859 return batch_create_audit_events(session, event_specs) 

860 

861 

862def log_score_events_batch( 

863 session: "Session", 

864 scores_to_update: list[tuple], # (score, initial_score_value) 

865 scores_to_create: list, # [score, ...] 

866 project: Project, 

867 user: User, 

868 add_comments: bool = True, 

869 comment_text: str = "Autoscore Calculated for Multiple Choice Question", 

870): 

871 """ 

872 Batch log score events for multiple scores to reduce database overhead. 

873 Optionally add autoscore comments and corresponding audit events. 

874 """ 

875 # Build main score event specs 

876 main_event_specs = [] 

877 

878 for score, initial_score_value in scores_to_update: 

879 main_event_specs.append( 

880 { 

881 "event_type": "SCORE_UPDATED", 

882 "changes": [("Score", initial_score_value, score.score)], 

883 "kwargs": { 

884 "project": project, 

885 "issue_id": score.issue_id, 

886 "user_id": user.id, 

887 "org_id": user.organisation.id, 

888 "object_id": score.id, 

889 "private": True, 

890 "question_id": score.question_id, 

891 }, 

892 } 

893 ) 

894 

895 for score in scores_to_create: 

896 main_event_specs.append( 

897 { 

898 "event_type": "SCORE_CREATED", 

899 "changes": [("Score", None, score.score)], 

900 "kwargs": { 

901 "project": project, 

902 "issue_id": score.issue_id, 

903 "user_id": user.id, 

904 "org_id": user.organisation.id, 

905 "object_id": score.id, 

906 "private": True, 

907 "question_id": score.question_id, 

908 }, 

909 } 

910 ) 

911 

912 # Create the main audit events 

913 batch_create_audit_events(session, main_event_specs) 

914 

915 # Optionally add autoscore comments and their audit events 

916 if not add_comments: 

917 return 

918 

919 session.flush() # Ensure scores and events have IDs before creating comments 

920 

921 # Create ScoreComment rows 

922 score_comments = [] 

923 all_scores = [score for score, _ in scores_to_update] + scores_to_create 

924 for score in all_scores: 

925 comment = ScoreComment( 

926 score=score, 

927 user=user, 

928 comment_time=datetime.now(), 

929 comment_text=comment_text, 

930 ) 

931 score_comments.append(comment) 

932 

933 session.add_all(score_comments) 

934 session.flush() # Get comment IDs 

935 

936 # Create comment audit events for each comment 

937 comment_event_specs = [] 

938 for comment in score_comments: 

939 comment_event_specs.append( 

940 { 

941 "event_type": "SCORE_COMMENT_ADDED", 

942 "changes": [("Comment", "", comment.comment_text)], 

943 "kwargs": { 

944 "project": project, 

945 "issue_id": comment.score.issue_id, 

946 "user_id": user.id, 

947 "org_id": user.organisation.id, 

948 "object_id": comment.id, 

949 "private": True, 

950 "question_id": comment.score.question_id, 

951 }, 

952 } 

953 ) 

954 

955 batch_create_audit_events(session, comment_event_specs)