Coverage for postrfp / shared / update.py: 96%

309 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2Functions that perform SQL Update queries 

3""" 

4 

5from copy import deepcopy 

6from decimal import Decimal 

7from typing import TYPE_CHECKING 

8 

9if TYPE_CHECKING: 

10 from sqlalchemy.orm import Session 

11 

12from sqlalchemy.sql.elements import literal 

13from sqlalchemy.sql.expression import insert, select 

14from sqlalchemy import delete 

15 

16from sqlalchemy import func 

17from sqlalchemy.orm import ( 

18 object_session, 

19 noload, 

20) 

21from sqlalchemy.orm.exc import NoResultFound 

22 

23from postrfp.model import ( 

24 Answer, 

25 QuestionResponseState, 

26 ResponseStatus, 

27 Participant, 

28 ProjectPermission, 

29 SectionPermission, 

30 User, 

31 Issue, 

32 Project, 

33 QElement, 

34 AuditEvent, 

35 ScoreComment, 

36 QuestionInstance, 

37 Section, 

38 Score, 

39 QuestionDefinition, 

40 ImportType, 

41 Weighting, 

42 WeightingSet, 

43) 

44from postrfp.model.audit import evt_types, Status as EventStatus 

45 

46from postrfp.model.exc import CosmeticQuestionEditViolation 

47from postrfp.shared.exceptions import AuthorizationFailure 

48from postrfp.shared import serial, utils 

49from postrfp.model.questionnaire.b36 import from_b36 

50from .fetch import generate_autoscores 

51 

52 

53def grant_project_permission(session: "Session", project: Project, user: User): 

54 """ 

55 Grants a (restricted) User permissions for the given 

56 project. Returns the newly created ProjectPermission instance 

57 """ 

58 if not user.is_restricted: 

59 raise ValueError( 

60 "Assigning ProjectPermission to a " + "non-restricted user has no effect" 

61 ) 

62 try: 

63 participant = ( 

64 session.query(Participant) 

65 .filter_by(project_id=project.id, organisation=user.organisation) 

66 .one() 

67 ) 

68 pp = ProjectPermission() 

69 pp.participant = participant 

70 pp.user = user 

71 session.add(pp) 

72 except NoResultFound: 

73 m = "User {user} is not a Participant in project {project}" 

74 raise AuthorizationFailure(message=m) 

75 return pp 

76 

77 

78def grant_section_permissions( 

79 session: "Session", project: Project, user: User, section_id_list: list[int] 

80): 

81 """ 

82 Grants the given user access to sections in the given project. 

83 A ProjectPermission for this user/project is created if it doesn't exist 

84 """ 

85 try: 

86 project_permission = project.permissions.filter_by(user_id=user.id).one() 

87 except NoResultFound: 

88 project_permission = grant_project_permission(session, project, user) 

89 

90 for sec_id in section_id_list: 

91 sp = SectionPermission( 

92 section_id=sec_id, user=user, project_permission=project_permission 

93 ) 

94 session.add(sp) 

95 

96 

97def _update_or_create_weighting( 

98 session: "Session", 

99 entity_id: int, 

100 weight_value: Decimal, 

101 lookup: dict, 

102 valid_id_set: set, 

103 save_func, 

104 project_id: int, 

105 entity_type: str, 

106 weighting_kwargs: dict, 

107): 

108 """ 

109 Helper function to update or create a weighting with sparse storage logic. 

110 

111 Args: 

112 session: Database session 

113 entity_id: ID of the question or section 

114 weight_value: The weight value to set 

115 lookup: Dict mapping entity IDs to existing Weighting objects 

116 valid_id_set: Set of valid entity IDs for this project 

117 save_func: Function to save new Weighting objects 

118 project_id: ID of the project 

119 entity_type: Type of entity ("Question" or "Section") for error messages 

120 weighting_kwargs: Dict of kwargs to pass to Weighting constructor 

121 """ 

122 if entity_id in lookup: 

123 # Existing weight record found 

124 if weight_value == 1: 

125 # Delete record for sparse storage (weight = 1 is default) 

126 session.delete(lookup[entity_id]) 

127 else: 

128 # Update to non-default weight 

129 lookup[entity_id].value = weight_value 

130 elif entity_id in valid_id_set: 

131 # No existing record, only create if non-default weight 

132 if weight_value != 1: 

133 save_func(Weighting(**weighting_kwargs, value=weight_value)) 

134 # If weight_value == 1, do nothing (sparse: no record needed for default) 

135 else: 

136 raise ValueError( 

137 f"{entity_type} ID {entity_id} does not belong to project {project_id}" 

138 ) 

139 

140 

141def save_weightset_weightings( 

142 session: "Session", weightset: WeightingSet, weights_doc: serial.WeightingsDoc 

143): 

144 """ 

145 Save weightings to a weighting set using sparse storage approach. 

146 

147 - Weight records are only stored for non-default values (anything != 1) 

148 - Setting a weight to 1 removes any existing weight record (sparse deletion) 

149 - Missing weight records automatically default to 1 via fallback logic 

150 

151 Behavior: 

152 - Updates existing weight records when new value != 1 

153 - Deletes existing weight records when new value == 1 (sparse approach) 

154 - Creates new weight records only when new value != 1 

155 - Ignores requests to set weight = 1 on non-existing records (already sparse) 

156 """ 

157 

158 q_lookup = {} 

159 sec_lookup = {} 

160 

161 for weighting in weightset.weightings: 

162 if weighting.section_id: 

163 sec_lookup[weighting.section_id] = weighting 

164 else: 

165 q_lookup[weighting.question_instance_id] = weighting 

166 

167 save = weightset.weightings.append # reference to bound method 

168 

169 q = session.query(QuestionInstance.id).filter( 

170 QuestionInstance.project_id == weightset.project_id 

171 ) 

172 qid_set = {qi.id for qi in q} 

173 project_id = weightset.project_id 

174 

175 for question_weight in weights_doc.questions: 

176 question_id = question_weight.question_instance_id 

177 qweight = Decimal(str(question_weight.weight)) 

178 

179 _update_or_create_weighting( 

180 session=session, 

181 entity_id=question_id, 

182 weight_value=qweight, 

183 lookup=q_lookup, 

184 valid_id_set=qid_set, 

185 save_func=save, 

186 project_id=project_id, 

187 entity_type="Question", 

188 weighting_kwargs={"question_instance_id": question_id}, 

189 ) 

190 

191 sq = session.query(Section).filter_by(project_id=project_id) 

192 sec_id_set = {s.id for s in sq} 

193 

194 for sw in weights_doc.sections: 

195 section_id = sw.section_id 

196 sec_weight = Decimal(str(sw.weight)) 

197 

198 _update_or_create_weighting( 

199 session=session, 

200 entity_id=section_id, 

201 weight_value=sec_weight, 

202 lookup=sec_lookup, 

203 valid_id_set=sec_id_set, 

204 save_func=save, 

205 project_id=project_id, 

206 entity_type="Section", 

207 weighting_kwargs={"section_id": section_id}, 

208 ) 

209 

210 

211def save_default_weightings( 

212 session, project: Project, weights_doc: serial.WeightingsDoc 

213): 

214 """ 

215 Save weightings to the project's default weighting set. 

216 The method explicitly creates one if needed. 

217 """ 

218 # Get the default weighting set ID (creates one if it doesn't exist) 

219 weighting_set_id = project.get_or_create_default_weighting_set_id() 

220 

221 # Get the weighting set object for further operations 

222 default_ws = session.get(WeightingSet, weighting_set_id) 

223 

224 # Initialize with default values of 1 if this is a new weighting set 

225 if default_ws.weightings.count() == 0: 

226 set_initial_weightings(default_ws, Decimal(1)) 

227 session.flush() 

228 

229 # Now delegate to the regular weightset saving function 

230 save_weightset_weightings(session, default_ws, weights_doc) 

231 

232 

233def set_initial_weightings(weighting_set: WeightingSet, initial_value: "Decimal"): 

234 """ 

235 Create Weighting records for each QuestionInstance and Section for weighting_set.project_id 

236 and weighting_set.weighting_set_id 

237 """ 

238 session = object_session(weighting_set) 

239 assert session is not None 

240 

241 qs = select( 

242 literal(weighting_set.id), QuestionInstance.id, literal(initial_value) 

243 ).where(QuestionInstance.project_id == weighting_set.project_id) 

244 qi = insert(Weighting).from_select( 

245 ["weighting_set_id", "question_instance_id", "value"], qs 

246 ) 

247 

248 session.execute(qi) 

249 

250 ss = select(literal(weighting_set.id), Section.id, literal(initial_value)).where( 

251 Section.project_id == weighting_set.project_id 

252 ) 

253 si = insert(Weighting).from_select(["weighting_set_id", "section_id", "value"], ss) 

254 

255 session.execute(si) 

256 

257 

258def copy_weightings( 

259 source_weighting_set: WeightingSet, destination_weighting_set: WeightingSet 

260): 

261 """ 

262 Copy Weighting values from source weighting set to destination 

263 """ 

264 session = object_session(destination_weighting_set) 

265 s = select( 

266 literal(destination_weighting_set.id), 

267 Weighting.section_id, 

268 Weighting.question_instance_id, 

269 Weighting.value, 

270 ).where(Weighting.weighting_set_id == source_weighting_set.id) 

271 

272 i = insert(Weighting).from_select( 

273 ["weighting_set_id", "section_id", "question_instance_id", "value"], s 

274 ) 

275 assert session is not None 

276 session.execute(i) 

277 

278 

279def reset_scores(session: "Session", project: Project, user: User): 

280 """ 

281 Deletes all scores & score comments for the given project. 

282 Recreates Autoscores for multiple choice questions 

283 """ 

284 stmt = delete(ScoreComment).where( 

285 ScoreComment.score_id.in_( 

286 select(Score.id).join(Issue).where(Issue.project_id == project.id) 

287 ) 

288 ) 

289 session.execute(stmt) 

290 

291 issue_map = {} 

292 

293 for issue in project.scoreable_issues: 

294 del_count = issue.scores.delete() 

295 issue_map[issue.id] = dict(respondent_id=issue.respondent_id, deleted=del_count) 

296 new_counts: dict[int, int] = {} 

297 

298 for ascore in generate_autoscores(project, session, user).values(): 

299 issue_id = ascore.issue_id 

300 score = Score( 

301 question_instance_id=ascore.question_id, 

302 issue_id=issue_id, 

303 score=ascore.score, 

304 ) 

305 if issue_id in new_counts: # pragma: no cover 

306 new_counts[issue_id] = new_counts[issue_id] + 1 

307 else: 

308 new_counts[issue_id] = 1 

309 session.add(score) 

310 

311 for issue_id, new_count in new_counts.items(): 

312 issue_map[issue_id]["added"] = new_count 

313 return issue_map 

314 

315 

316def save_answers( 

317 session: "Session", 

318 user: User, 

319 question: QuestionInstance, 

320 answer_lookup: dict, 

321 issue: Issue, 

322 imported: bool = False, 

323 set_done: bool = False, 

324): 

325 res = question.validate_and_save_answers(answer_lookup, issue) 

326 if res.change_list is None or len(res.change_list) == 0: 

327 return False 

328 

329 response_state: QuestionResponseState = issue.response_state_for_q(question.id) 

330 response_state.date_updated = utils.utcnow() 

331 response_state.updated_by = user.id 

332 

333 if res.unanswered_mandatory: 

334 response_state.status = ResponseStatus.NOT_ANSWERED 

335 else: 

336 response_state.status = ResponseStatus.ANSWERED 

337 

338 evt_type = evt_types.ANSWER_CREATED if res.is_new else evt_types.ANSWER_UPDATED 

339 if imported: 

340 evt_type = evt_types.ANSWER_IMPORTED 

341 

342 evt = AuditEvent.create( 

343 session, 

344 evt_type, 

345 object_id=response_state.id, 

346 user=user, 

347 project_id=issue.project_id, 

348 issue=issue, 

349 question_id=question.id, 

350 ) 

351 if set_done: 

352 evt.status = EventStatus.done 

353 if imported: 

354 evt.add_change("Import Source", issue.project.title, "") 

355 for old_value, new_value in res.change_list: 

356 evt.add_change("Answer", old_value, new_value) 

357 session.add(evt) 

358 

359 return True 

360 

361 

362def import_section( 

363 session: "Session", src_sec: Section, des_sec: Section, type: ImportType 

364) -> tuple[list[Section], list[QuestionInstance]]: 

365 """Import Section from another Project""" 

366 imp_secs: list[Section] = [] 

367 imp_qis: list[QuestionInstance] = [] 

368 new_sec = Section( 

369 title=src_sec.title, 

370 description=src_sec.description, 

371 project_id=des_sec.project_id, 

372 ) 

373 

374 des_sec.subsections.append(new_sec) 

375 imp_secs.append(new_sec) 

376 for qi in src_sec.questions: 

377 new_qi = import_q_instance(qi, new_sec, type) 

378 imp_qis.append(new_qi) 

379 

380 for sec in src_sec.subsections: 

381 secs, ques = import_section(session, sec, new_sec, type) 

382 imp_secs += secs 

383 imp_qis += ques 

384 

385 return imp_secs, imp_qis 

386 

387 

388def import_q_instance( 

389 src_qi: QuestionInstance, des_sec: Section, type: ImportType 

390) -> QuestionInstance: 

391 """Import question instances from another Project""" 

392 src_qdef = src_qi.question_def 

393 des_qdef = src_qi.question_def 

394 des_sec_quesions = des_sec.questions 

395 

396 if type == ImportType.COPY: 

397 des_qdef = QuestionDefinition( 

398 title=src_qdef.title, 

399 refcount=1, 

400 parent_id=src_qdef.id, 

401 ) 

402 

403 for src_el in src_qdef.elements: 

404 des_element = QElement( 

405 row=src_el.row, 

406 col=src_el.col, 

407 colspan=src_el.colspan, 

408 rowspan=src_el.rowspan, 

409 el_type=src_el.el_type, 

410 label=src_el.label, 

411 mandatory=src_el.mandatory, 

412 width=src_el.width, 

413 height=src_el.height, 

414 multitopic=src_el.multitopic, 

415 regexp=src_el.regexp, 

416 choices=src_el.choices, 

417 ) 

418 des_qdef.elements.append(des_element) 

419 elif type == ImportType.SHARE: 

420 des_qdef.refcount += 1 

421 

422 des_qi: QuestionInstance = QuestionInstance( 

423 project_id=des_sec.project_id, 

424 section_id=des_sec.id, 

425 question_def=des_qdef, 

426 ) 

427 des_sec_quesions.append(des_qi) 

428 return des_qi 

429 

430 

431def copy_q_definition(original_qdef: QuestionDefinition, session: "Session"): 

432 """Make a copy of the QuestionDefinition and all associated question elements""" 

433 

434 new_qdef = QuestionDefinition() 

435 new_qdef.title = original_qdef.title 

436 new_qdef.refcount = 1 

437 new_qdef.parent_id = original_qdef.id 

438 

439 for el in original_qdef.elements: 

440 new_el = QElement( 

441 row=el.row, 

442 col=el.col, 

443 colspan=el.colspan, 

444 rowspan=el.rowspan, 

445 el_type=el.el_type, 

446 label=el.label, 

447 mandatory=el.mandatory, 

448 width=el.width, 

449 height=el.height, 

450 multitopic=el.multitopic, 

451 regexp=el.regexp, 

452 choices=el.choices, 

453 ) 

454 new_qdef.elements.append(new_el) 

455 

456 return new_qdef 

457 

458 

459def delete_qinstances_update_def_refcounts( 

460 session: "Session", project_id, section_id: int | None = None 

461): 

462 """ 

463 Delete question instances, orphan question definitions and update refcount 

464 on remaining question definitions for the given project, optionally filtering 

465 by section_id 

466 """ 

467 

468 def qf(q): 

469 q = q.filter(QuestionInstance.project_id == project_id) 

470 if section_id is not None: 

471 q = q.filter(QuestionInstance.section_id == section_id) 

472 return q 

473 

474 qiq = qf(session.query(QuestionInstance.question_def_id.label("id"))) 

475 

476 qi_set = {r.id for r in qiq} 

477 # Can't delete question defs if question instances remain, so delete 

478 # instances first, having saved the relevant IDs 

479 qf(session.query(QuestionInstance)).delete(synchronize_session=False) 

480 

481 # Delete question defs in this project with a refcount of one - not shared 

482 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter( 

483 QuestionDefinition.refcount == 1 

484 ).delete(synchronize_session=False) 

485 

486 # Decrement refcount for all remaining question definitions 

487 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter( 

488 QuestionDefinition.refcount > 1 

489 ).update({"refcount": QuestionDefinition.refcount - 1}, synchronize_session=False) 

490 

491 

492def log_score_event( 

493 session: "Session", 

494 score: Score, 

495 initial_score_value, 

496 is_new: bool, 

497 project: "Project", 

498 user: User, 

499 autoscore=False, 

500): 

501 event_class = "SCORE_CREATED" if is_new else "SCORE_UPDATED" 

502 

503 evt = AuditEvent.create( 

504 session, 

505 event_class, 

506 project=project, 

507 issue_id=score.issue_id, 

508 user_id=user.id, 

509 org_id=user.organisation.id, 

510 object_id=score.id, 

511 private=True, 

512 question_id=score.question_id, 

513 ) 

514 evt.add_change("Score", initial_score_value, score.score) 

515 

516 session.add(evt) 

517 

518 if autoscore: 

519 msg = "Autoscore Calculated for Multiple Choice Question" 

520 comment = ScoreComment( 

521 score=score, 

522 user=user, 

523 comment_time=utils.utcnow(), 

524 comment_text=msg, 

525 ) 

526 session.add(comment) 

527 session.flush() 

528 

529 kw = dict( 

530 project=project, 

531 issue_id=score.issue_id, 

532 user_id=user.id, 

533 org_id=user.organisation.id, 

534 object_id=comment.id, 

535 private=True, 

536 question_id=score.question_id, 

537 ) 

538 cmnt_evt = AuditEvent.create(session, "SCORE_COMMENT_ADDED", **kw) 

539 cmnt_evt.add_change("Comment", "", msg) 

540 session.add(cmnt_evt) 

541 

542 

543def label_text( 

544 project: Project, search_term: str, replace_term: str = "", dry_run=True 

545): 

546 q = ( 

547 project.qelements.filter(QElement.el_type.in_(("LB", "CB"))) 

548 .filter(QElement.label.collate("utf8mb4_bin").like(f"%{search_term}%")) 

549 .add_columns(QuestionInstance.b36_number) 

550 ) 

551 

552 for label, qnum in q: 

553 old_label = label.label 

554 new_label = label.label.replace(search_term, replace_term) 

555 if not dry_run: 

556 label.label = new_label 

557 yield dict( 

558 change_type="label", 

559 question_number=from_b36(qnum), 

560 new=new_label, 

561 old=old_label, 

562 ) 

563 

564 

565def question_titles( 

566 project: Project, search_term: str, replace_term: str = "", dry_run=True 

567): 

568 session = object_session(project) 

569 assert session is not None 

570 

571 q = ( 

572 session.query(QuestionInstance) 

573 .join(QuestionDefinition) 

574 .options(noload(QuestionInstance.question_def, QuestionDefinition.elements)) 

575 .filter(QuestionInstance.project_id == project.id) 

576 .filter( 

577 QuestionDefinition.title.collate("utf8mb4_bin").like(f"%{search_term}%") 

578 ) 

579 ) 

580 

581 for qi in q: 

582 qdef = qi.question_def 

583 old_title = qdef.title 

584 new_title = qdef.title.replace(search_term, replace_term) 

585 if not dry_run: 

586 qdef.title = new_title 

587 yield dict( 

588 change_type="title", 

589 question_number=qi.number, 

590 new=new_title, 

591 old=old_title, 

592 ) 

593 

594 

595def choices_text( 

596 project: Project, search_term: str, replace_term: str = "", dry_run=True 

597): 

598 q = ( 

599 project.qelements.filter(QElement.el_type.in_(("CR", "CC"))) 

600 .filter(func.json_search(QElement.choices, "one", search_term) != None) # noqa: E711 

601 .add_columns(QuestionInstance.b36_number) 

602 ) 

603 

604 for el, qnum in q: 

605 new_choices = deepcopy(el.choices) 

606 old_labels = [c["label"] for c in el.choices] 

607 for choice in new_choices: 

608 choice["label"] = choice["label"].replace(search_term, replace_term) 

609 new_labels = [c["label"] for c in new_choices] 

610 

611 if not dry_run: 

612 el.choices = new_choices 

613 

614 yield dict( 

615 change_type="choice", 

616 question_number=from_b36(qnum), 

617 old=old_labels, 

618 new=new_labels, 

619 ) 

620 

621 

622def pretty_choices(choices: list[dict]) -> str: 

623 if not isinstance(choices, list): 

624 return str(choices) 

625 txt = "" 

626 for c in choices: 

627 auto = (f" <{c['autoscore']}>") if c.get("autoscore", False) else "" 

628 txt += f" - {c['label']}{auto}\n" 

629 return txt 

630 

631 

632def update_create_qdef( 

633 qdef: QuestionDefinition, evt: AuditEvent, el_map: dict, el_dict: dict 

634): 

635 """ 

636 Update question elements where ID values are provided; add new elements if no id provided 

637 

638 Updated ids are removed from el_map - thus any removing elements are to be deleted 

639 """ 

640 mutable_attrs = { 

641 "colspan", 

642 "rowspan", 

643 "label", 

644 "mandatory", 

645 "regexp", 

646 "height", 

647 "width", 

648 "row", 

649 "col", 

650 "choices", 

651 } 

652 if el_dict.get("id", None) is not None: 

653 # Update existing element 

654 old_el = el_map.pop(el_dict["id"]) 

655 for attr in mutable_attrs & el_dict.keys(): 

656 new_val = el_dict[attr] 

657 old_val = getattr(old_el, attr) 

658 if new_val != old_val: 

659 setattr(old_el, attr, new_val) 

660 evt_name = f"{old_el.__class__.__name__} #{old_el.id}, {attr.title()}" 

661 if attr == "choices" and el_dict["el_type"] in ("CR", "CC"): 

662 old_val = pretty_choices(old_val) 

663 new_val = pretty_choices(new_val) 

664 evt.add_change(evt_name, old_val, new_val) 

665 else: 

666 # A new element 

667 el_type = el_dict.pop("el_type") 

668 new_el = qdef.add_element(el_type, **el_dict) 

669 evt_name = f"{new_el.__class__.__name__} Added" 

670 evt.add_change(evt_name, None, new_el.summary) 

671 

672 

673def check_for_saved_answers(session: "Session", qdef: QuestionDefinition, el_map: dict): 

674 """ 

675 If there are elements to delete with associated answers raise 

676 @raises CosmeticQuestionEditViolation 

677 """ 

678 if (not qdef.is_shared) or (len(el_map) == 0): 

679 return 

680 answer_count = ( 

681 session.query(Answer).filter(Answer.element_id.in_(el_map.keys())).count() 

682 ) 

683 if answer_count > 0: 

684 del_ids = ", ".join(str(el_id) for el_id in el_map.keys()) 

685 m = f"Cannot delete question elements that have associated answers. Element IDs: {del_ids}" 

686 raise CosmeticQuestionEditViolation(m) 

687 

688 

689def delete_project_section( 

690 session: "Session", user: User, project: Project, section: Section 

691): 

692 """ 

693 Delete the given Section and all questions and subsections contained within that 

694 section. 

695 """ 

696 

697 for descendant_section in section.descendants: 

698 delete_qinstances_update_def_refcounts( 

699 session, project.id, section_id=descendant_section.id 

700 ) 

701 session.delete(descendant_section) 

702 

703 delete_qinstances_update_def_refcounts(session, project.id, section_id=section.id) 

704 session.delete(section) 

705 

706 evt = AuditEvent.create( 

707 session, 

708 evt_types.SECTION_DELETED, 

709 project=project, 

710 user_id=user.id, 

711 org_id=user.organisation.id, 

712 object_id=section.id, 

713 private=True, 

714 ) 

715 session.add(evt) 

716 

717 

718def delete_project_section_question( 

719 session: "Session", 

720 user: User, 

721 project: Project, 

722 section: Section, 

723 qi: QuestionInstance, 

724): 

725 """ 

726 Delete the Question with the given ID 

727 

728 The return value is an array of remaining instances of the same question that may exist 

729 in other projects 

730 

731 """ 

732 

733 evt = AuditEvent.create( 

734 session, 

735 "QUESTION_DELETED", 

736 project=project, 

737 user_id=user.id, 

738 org_id=user.organisation.id, 

739 object_id=qi.id, 

740 private=True, 

741 question_id=qi.id, 

742 ) 

743 evt.add_change("Title", qi.title, None) 

744 evt.add_change("Number", qi.number, None) 

745 session.add(evt) 

746 

747 instances_remaining = [] 

748 with session.no_autoflush: 

749 qdef = qi.question_def 

750 session.delete(qi) 

751 

752 qdef.refcount -= 1 

753 if qdef.refcount == 0: 

754 session.delete(qdef) 

755 else: 

756 instances_remaining = [ 

757 dict(project_id=project.id, number=qi.number, id=qi.id) 

758 for qi in qdef.instances 

759 ] 

760 section.renumber() 

761 return instances_remaining 

762 

763 

764def batch_create_audit_events( 

765 session: "Session", 

766 event_specs: list[dict], 

767 related_objects: list | None = None, 

768): 

769 """ 

770 Generic function to batch create audit events and related objects. 

771 

772 This function provides a standardised way to create multiple audit events 

773 efficiently, reducing database roundtrips and improving performance. 

774 

775 Args: 

776 session: Database session 

777 event_specs: List of dictionaries containing event specifications: 

778 { 

779 'event_type': str, # e.g., 'SCORE_CREATED', 'ANSWER_UPDATED' 

780 'changes': list[tuple[str, old_value, new_value]], # Optional 

781 'kwargs': dict # Additional kwargs for AuditEvent.create 

782 } 

783 related_objects: Optional list of related objects to add (e.g., comments) 

784 

785 Returns: 

786 list of created AuditEvent objects 

787 

788 Example usage: 

789 # Batch create multiple answer events 

790 event_specs = [ 

791 { 

792 'event_type': 'ANSWER_UPDATED', 

793 'changes': [('Answer', 'old_value', 'new_value')], 

794 'kwargs': { 

795 'project': project, 

796 'user_id': user.id, 

797 'object_id': answer.id, 

798 'question_id': question.id 

799 } 

800 }, 

801 # ... more event specs 

802 ] 

803 events = batch_create_audit_events(session, event_specs) 

804 """ 

805 from postrfp.model import AuditEvent 

806 

807 # Batch create all audit events 

808 audit_events = [] 

809 for spec in event_specs: 

810 evt = AuditEvent.create(session, spec["event_type"], **spec["kwargs"]) 

811 

812 # Add all changes for this event 

813 for change_name, old_val, new_val in spec.get("changes", []): 

814 evt.add_change(change_name, old_val, new_val) 

815 

816 audit_events.append(evt) 

817 

818 # Batch add all events 

819 session.add_all(audit_events) 

820 

821 # Add any related objects 

822 if related_objects: 

823 session.add_all(related_objects) 

824 

825 return audit_events 

826 

827 

828def batch_log_answer_events( 

829 session: "Session", 

830 answer_changes: list[tuple], # (answer, old_value, new_value, is_new) 

831 project: Project, 

832 user: User, 

833 question: QuestionInstance, 

834): # pragma: no cover 

835 """ 

836 Batch log answer update/create events using the generic batch function. 

837 

838 Example of how the generic function can be used for different event types. 

839 """ 

840 event_specs = [] 

841 

842 for answer, old_value, new_value, is_new in answer_changes: 

843 event_type = "ANSWER_CREATED" if is_new else "ANSWER_UPDATED" 

844 event_specs.append( 

845 { 

846 "event_type": event_type, 

847 "changes": [("Answer", old_value, new_value)], 

848 "kwargs": { 

849 "project": project, 

850 "user_id": user.id, 

851 "org_id": user.organisation.id, 

852 "object_id": answer.id, 

853 "question_id": question.id, 

854 }, 

855 } 

856 ) 

857 

858 return batch_create_audit_events(session, event_specs) 

859 

860 

861def log_score_events_batch( 

862 session: "Session", 

863 scores_to_update: list[tuple], # (score, initial_score_value) 

864 scores_to_create: list, # [score, ...] 

865 project: Project, 

866 user: User, 

867 add_comments: bool = True, 

868 comment_text: str = "Autoscore Calculated for Multiple Choice Question", 

869): 

870 """ 

871 Batch log score events for multiple scores to reduce database overhead. 

872 Optionally add autoscore comments and corresponding audit events. 

873 """ 

874 # Build main score event specs 

875 main_event_specs = [] 

876 

877 for score, initial_score_value in scores_to_update: 

878 main_event_specs.append( 

879 { 

880 "event_type": "SCORE_UPDATED", 

881 "changes": [("Score", initial_score_value, score.score)], 

882 "kwargs": { 

883 "project": project, 

884 "issue_id": score.issue_id, 

885 "user_id": user.id, 

886 "org_id": user.organisation.id, 

887 "object_id": score.id, 

888 "private": True, 

889 "question_id": score.question_id, 

890 }, 

891 } 

892 ) 

893 

894 for score in scores_to_create: 

895 main_event_specs.append( 

896 { 

897 "event_type": "SCORE_CREATED", 

898 "changes": [("Score", None, score.score)], 

899 "kwargs": { 

900 "project": project, 

901 "issue_id": score.issue_id, 

902 "user_id": user.id, 

903 "org_id": user.organisation.id, 

904 "object_id": score.id, 

905 "private": True, 

906 "question_id": score.question_id, 

907 }, 

908 } 

909 ) 

910 

911 # Create the main audit events 

912 batch_create_audit_events(session, main_event_specs) 

913 

914 # Optionally add autoscore comments and their audit events 

915 if not add_comments: 

916 return 

917 

918 session.flush() # Ensure scores and events have IDs before creating comments 

919 

920 # Create ScoreComment rows 

921 score_comments = [] 

922 all_scores = [score for score, _ in scores_to_update] + scores_to_create 

923 for score in all_scores: 

924 comment = ScoreComment( 

925 score=score, 

926 user=user, 

927 comment_time=utils.utcnow(), 

928 comment_text=comment_text, 

929 ) 

930 score_comments.append(comment) 

931 

932 session.add_all(score_comments) 

933 session.flush() # Get comment IDs 

934 

935 # Create comment audit events for each comment 

936 comment_event_specs = [] 

937 for comment in score_comments: 

938 comment_event_specs.append( 

939 { 

940 "event_type": "SCORE_COMMENT_ADDED", 

941 "changes": [("Comment", "", comment.comment_text)], 

942 "kwargs": { 

943 "project": project, 

944 "issue_id": comment.score.issue_id, 

945 "user_id": user.id, 

946 "org_id": user.organisation.id, 

947 "object_id": comment.id, 

948 "private": True, 

949 "question_id": comment.score.question_id, 

950 }, 

951 } 

952 ) 

953 

954 batch_create_audit_events(session, comment_event_specs)