Coverage for postrfp / shared / update.py: 96%
309 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1"""
2Functions that perform SQL Update queries
3"""
5from copy import deepcopy
6from decimal import Decimal
7from typing import TYPE_CHECKING
9if TYPE_CHECKING:
10 from sqlalchemy.orm import Session
12from sqlalchemy.sql.elements import literal
13from sqlalchemy.sql.expression import insert, select
14from sqlalchemy import delete
16from sqlalchemy import func
17from sqlalchemy.orm import (
18 object_session,
19 noload,
20)
21from sqlalchemy.orm.exc import NoResultFound
23from postrfp.model import (
24 Answer,
25 QuestionResponseState,
26 ResponseStatus,
27 Participant,
28 ProjectPermission,
29 SectionPermission,
30 User,
31 Issue,
32 Project,
33 QElement,
34 AuditEvent,
35 ScoreComment,
36 QuestionInstance,
37 Section,
38 Score,
39 QuestionDefinition,
40 ImportType,
41 Weighting,
42 WeightingSet,
43)
44from postrfp.model.audit import evt_types, Status as EventStatus
46from postrfp.model.exc import CosmeticQuestionEditViolation
47from postrfp.shared.exceptions import AuthorizationFailure
48from postrfp.shared import serial, utils
49from postrfp.model.questionnaire.b36 import from_b36
50from .fetch import generate_autoscores
53def grant_project_permission(session: "Session", project: Project, user: User):
54 """
55 Grants a (restricted) User permissions for the given
56 project. Returns the newly created ProjectPermission instance
57 """
58 if not user.is_restricted:
59 raise ValueError(
60 "Assigning ProjectPermission to a " + "non-restricted user has no effect"
61 )
62 try:
63 participant = (
64 session.query(Participant)
65 .filter_by(project_id=project.id, organisation=user.organisation)
66 .one()
67 )
68 pp = ProjectPermission()
69 pp.participant = participant
70 pp.user = user
71 session.add(pp)
72 except NoResultFound:
73 m = "User {user} is not a Participant in project {project}"
74 raise AuthorizationFailure(message=m)
75 return pp
78def grant_section_permissions(
79 session: "Session", project: Project, user: User, section_id_list: list[int]
80):
81 """
82 Grants the given user access to sections in the given project.
83 A ProjectPermission for this user/project is created if it doesn't exist
84 """
85 try:
86 project_permission = project.permissions.filter_by(user_id=user.id).one()
87 except NoResultFound:
88 project_permission = grant_project_permission(session, project, user)
90 for sec_id in section_id_list:
91 sp = SectionPermission(
92 section_id=sec_id, user=user, project_permission=project_permission
93 )
94 session.add(sp)
97def _update_or_create_weighting(
98 session: "Session",
99 entity_id: int,
100 weight_value: Decimal,
101 lookup: dict,
102 valid_id_set: set,
103 save_func,
104 project_id: int,
105 entity_type: str,
106 weighting_kwargs: dict,
107):
108 """
109 Helper function to update or create a weighting with sparse storage logic.
111 Args:
112 session: Database session
113 entity_id: ID of the question or section
114 weight_value: The weight value to set
115 lookup: Dict mapping entity IDs to existing Weighting objects
116 valid_id_set: Set of valid entity IDs for this project
117 save_func: Function to save new Weighting objects
118 project_id: ID of the project
119 entity_type: Type of entity ("Question" or "Section") for error messages
120 weighting_kwargs: Dict of kwargs to pass to Weighting constructor
121 """
122 if entity_id in lookup:
123 # Existing weight record found
124 if weight_value == 1:
125 # Delete record for sparse storage (weight = 1 is default)
126 session.delete(lookup[entity_id])
127 else:
128 # Update to non-default weight
129 lookup[entity_id].value = weight_value
130 elif entity_id in valid_id_set:
131 # No existing record, only create if non-default weight
132 if weight_value != 1:
133 save_func(Weighting(**weighting_kwargs, value=weight_value))
134 # If weight_value == 1, do nothing (sparse: no record needed for default)
135 else:
136 raise ValueError(
137 f"{entity_type} ID {entity_id} does not belong to project {project_id}"
138 )
141def save_weightset_weightings(
142 session: "Session", weightset: WeightingSet, weights_doc: serial.WeightingsDoc
143):
144 """
145 Save weightings to a weighting set using sparse storage approach.
147 - Weight records are only stored for non-default values (anything != 1)
148 - Setting a weight to 1 removes any existing weight record (sparse deletion)
149 - Missing weight records automatically default to 1 via fallback logic
151 Behavior:
152 - Updates existing weight records when new value != 1
153 - Deletes existing weight records when new value == 1 (sparse approach)
154 - Creates new weight records only when new value != 1
155 - Ignores requests to set weight = 1 on non-existing records (already sparse)
156 """
158 q_lookup = {}
159 sec_lookup = {}
161 for weighting in weightset.weightings:
162 if weighting.section_id:
163 sec_lookup[weighting.section_id] = weighting
164 else:
165 q_lookup[weighting.question_instance_id] = weighting
167 save = weightset.weightings.append # reference to bound method
169 q = session.query(QuestionInstance.id).filter(
170 QuestionInstance.project_id == weightset.project_id
171 )
172 qid_set = {qi.id for qi in q}
173 project_id = weightset.project_id
175 for question_weight in weights_doc.questions:
176 question_id = question_weight.question_instance_id
177 qweight = Decimal(str(question_weight.weight))
179 _update_or_create_weighting(
180 session=session,
181 entity_id=question_id,
182 weight_value=qweight,
183 lookup=q_lookup,
184 valid_id_set=qid_set,
185 save_func=save,
186 project_id=project_id,
187 entity_type="Question",
188 weighting_kwargs={"question_instance_id": question_id},
189 )
191 sq = session.query(Section).filter_by(project_id=project_id)
192 sec_id_set = {s.id for s in sq}
194 for sw in weights_doc.sections:
195 section_id = sw.section_id
196 sec_weight = Decimal(str(sw.weight))
198 _update_or_create_weighting(
199 session=session,
200 entity_id=section_id,
201 weight_value=sec_weight,
202 lookup=sec_lookup,
203 valid_id_set=sec_id_set,
204 save_func=save,
205 project_id=project_id,
206 entity_type="Section",
207 weighting_kwargs={"section_id": section_id},
208 )
211def save_default_weightings(
212 session, project: Project, weights_doc: serial.WeightingsDoc
213):
214 """
215 Save weightings to the project's default weighting set.
216 The method explicitly creates one if needed.
217 """
218 # Get the default weighting set ID (creates one if it doesn't exist)
219 weighting_set_id = project.get_or_create_default_weighting_set_id()
221 # Get the weighting set object for further operations
222 default_ws = session.get(WeightingSet, weighting_set_id)
224 # Initialize with default values of 1 if this is a new weighting set
225 if default_ws.weightings.count() == 0:
226 set_initial_weightings(default_ws, Decimal(1))
227 session.flush()
229 # Now delegate to the regular weightset saving function
230 save_weightset_weightings(session, default_ws, weights_doc)
233def set_initial_weightings(weighting_set: WeightingSet, initial_value: "Decimal"):
234 """
235 Create Weighting records for each QuestionInstance and Section for weighting_set.project_id
236 and weighting_set.weighting_set_id
237 """
238 session = object_session(weighting_set)
239 assert session is not None
241 qs = select(
242 literal(weighting_set.id), QuestionInstance.id, literal(initial_value)
243 ).where(QuestionInstance.project_id == weighting_set.project_id)
244 qi = insert(Weighting).from_select(
245 ["weighting_set_id", "question_instance_id", "value"], qs
246 )
248 session.execute(qi)
250 ss = select(literal(weighting_set.id), Section.id, literal(initial_value)).where(
251 Section.project_id == weighting_set.project_id
252 )
253 si = insert(Weighting).from_select(["weighting_set_id", "section_id", "value"], ss)
255 session.execute(si)
258def copy_weightings(
259 source_weighting_set: WeightingSet, destination_weighting_set: WeightingSet
260):
261 """
262 Copy Weighting values from source weighting set to destination
263 """
264 session = object_session(destination_weighting_set)
265 s = select(
266 literal(destination_weighting_set.id),
267 Weighting.section_id,
268 Weighting.question_instance_id,
269 Weighting.value,
270 ).where(Weighting.weighting_set_id == source_weighting_set.id)
272 i = insert(Weighting).from_select(
273 ["weighting_set_id", "section_id", "question_instance_id", "value"], s
274 )
275 assert session is not None
276 session.execute(i)
279def reset_scores(session: "Session", project: Project, user: User):
280 """
281 Deletes all scores & score comments for the given project.
282 Recreates Autoscores for multiple choice questions
283 """
284 stmt = delete(ScoreComment).where(
285 ScoreComment.score_id.in_(
286 select(Score.id).join(Issue).where(Issue.project_id == project.id)
287 )
288 )
289 session.execute(stmt)
291 issue_map = {}
293 for issue in project.scoreable_issues:
294 del_count = issue.scores.delete()
295 issue_map[issue.id] = dict(respondent_id=issue.respondent_id, deleted=del_count)
296 new_counts: dict[int, int] = {}
298 for ascore in generate_autoscores(project, session, user).values():
299 issue_id = ascore.issue_id
300 score = Score(
301 question_instance_id=ascore.question_id,
302 issue_id=issue_id,
303 score=ascore.score,
304 )
305 if issue_id in new_counts: # pragma: no cover
306 new_counts[issue_id] = new_counts[issue_id] + 1
307 else:
308 new_counts[issue_id] = 1
309 session.add(score)
311 for issue_id, new_count in new_counts.items():
312 issue_map[issue_id]["added"] = new_count
313 return issue_map
316def save_answers(
317 session: "Session",
318 user: User,
319 question: QuestionInstance,
320 answer_lookup: dict,
321 issue: Issue,
322 imported: bool = False,
323 set_done: bool = False,
324):
325 res = question.validate_and_save_answers(answer_lookup, issue)
326 if res.change_list is None or len(res.change_list) == 0:
327 return False
329 response_state: QuestionResponseState = issue.response_state_for_q(question.id)
330 response_state.date_updated = utils.utcnow()
331 response_state.updated_by = user.id
333 if res.unanswered_mandatory:
334 response_state.status = ResponseStatus.NOT_ANSWERED
335 else:
336 response_state.status = ResponseStatus.ANSWERED
338 evt_type = evt_types.ANSWER_CREATED if res.is_new else evt_types.ANSWER_UPDATED
339 if imported:
340 evt_type = evt_types.ANSWER_IMPORTED
342 evt = AuditEvent.create(
343 session,
344 evt_type,
345 object_id=response_state.id,
346 user=user,
347 project_id=issue.project_id,
348 issue=issue,
349 question_id=question.id,
350 )
351 if set_done:
352 evt.status = EventStatus.done
353 if imported:
354 evt.add_change("Import Source", issue.project.title, "")
355 for old_value, new_value in res.change_list:
356 evt.add_change("Answer", old_value, new_value)
357 session.add(evt)
359 return True
362def import_section(
363 session: "Session", src_sec: Section, des_sec: Section, type: ImportType
364) -> tuple[list[Section], list[QuestionInstance]]:
365 """Import Section from another Project"""
366 imp_secs: list[Section] = []
367 imp_qis: list[QuestionInstance] = []
368 new_sec = Section(
369 title=src_sec.title,
370 description=src_sec.description,
371 project_id=des_sec.project_id,
372 )
374 des_sec.subsections.append(new_sec)
375 imp_secs.append(new_sec)
376 for qi in src_sec.questions:
377 new_qi = import_q_instance(qi, new_sec, type)
378 imp_qis.append(new_qi)
380 for sec in src_sec.subsections:
381 secs, ques = import_section(session, sec, new_sec, type)
382 imp_secs += secs
383 imp_qis += ques
385 return imp_secs, imp_qis
388def import_q_instance(
389 src_qi: QuestionInstance, des_sec: Section, type: ImportType
390) -> QuestionInstance:
391 """Import question instances from another Project"""
392 src_qdef = src_qi.question_def
393 des_qdef = src_qi.question_def
394 des_sec_quesions = des_sec.questions
396 if type == ImportType.COPY:
397 des_qdef = QuestionDefinition(
398 title=src_qdef.title,
399 refcount=1,
400 parent_id=src_qdef.id,
401 )
403 for src_el in src_qdef.elements:
404 des_element = QElement(
405 row=src_el.row,
406 col=src_el.col,
407 colspan=src_el.colspan,
408 rowspan=src_el.rowspan,
409 el_type=src_el.el_type,
410 label=src_el.label,
411 mandatory=src_el.mandatory,
412 width=src_el.width,
413 height=src_el.height,
414 multitopic=src_el.multitopic,
415 regexp=src_el.regexp,
416 choices=src_el.choices,
417 )
418 des_qdef.elements.append(des_element)
419 elif type == ImportType.SHARE:
420 des_qdef.refcount += 1
422 des_qi: QuestionInstance = QuestionInstance(
423 project_id=des_sec.project_id,
424 section_id=des_sec.id,
425 question_def=des_qdef,
426 )
427 des_sec_quesions.append(des_qi)
428 return des_qi
431def copy_q_definition(original_qdef: QuestionDefinition, session: "Session"):
432 """Make a copy of the QuestionDefinition and all associated question elements"""
434 new_qdef = QuestionDefinition()
435 new_qdef.title = original_qdef.title
436 new_qdef.refcount = 1
437 new_qdef.parent_id = original_qdef.id
439 for el in original_qdef.elements:
440 new_el = QElement(
441 row=el.row,
442 col=el.col,
443 colspan=el.colspan,
444 rowspan=el.rowspan,
445 el_type=el.el_type,
446 label=el.label,
447 mandatory=el.mandatory,
448 width=el.width,
449 height=el.height,
450 multitopic=el.multitopic,
451 regexp=el.regexp,
452 choices=el.choices,
453 )
454 new_qdef.elements.append(new_el)
456 return new_qdef
459def delete_qinstances_update_def_refcounts(
460 session: "Session", project_id, section_id: int | None = None
461):
462 """
463 Delete question instances, orphan question definitions and update refcount
464 on remaining question definitions for the given project, optionally filtering
465 by section_id
466 """
468 def qf(q):
469 q = q.filter(QuestionInstance.project_id == project_id)
470 if section_id is not None:
471 q = q.filter(QuestionInstance.section_id == section_id)
472 return q
474 qiq = qf(session.query(QuestionInstance.question_def_id.label("id")))
476 qi_set = {r.id for r in qiq}
477 # Can't delete question defs if question instances remain, so delete
478 # instances first, having saved the relevant IDs
479 qf(session.query(QuestionInstance)).delete(synchronize_session=False)
481 # Delete question defs in this project with a refcount of one - not shared
482 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter(
483 QuestionDefinition.refcount == 1
484 ).delete(synchronize_session=False)
486 # Decrement refcount for all remaining question definitions
487 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter(
488 QuestionDefinition.refcount > 1
489 ).update({"refcount": QuestionDefinition.refcount - 1}, synchronize_session=False)
492def log_score_event(
493 session: "Session",
494 score: Score,
495 initial_score_value,
496 is_new: bool,
497 project: "Project",
498 user: User,
499 autoscore=False,
500):
501 event_class = "SCORE_CREATED" if is_new else "SCORE_UPDATED"
503 evt = AuditEvent.create(
504 session,
505 event_class,
506 project=project,
507 issue_id=score.issue_id,
508 user_id=user.id,
509 org_id=user.organisation.id,
510 object_id=score.id,
511 private=True,
512 question_id=score.question_id,
513 )
514 evt.add_change("Score", initial_score_value, score.score)
516 session.add(evt)
518 if autoscore:
519 msg = "Autoscore Calculated for Multiple Choice Question"
520 comment = ScoreComment(
521 score=score,
522 user=user,
523 comment_time=utils.utcnow(),
524 comment_text=msg,
525 )
526 session.add(comment)
527 session.flush()
529 kw = dict(
530 project=project,
531 issue_id=score.issue_id,
532 user_id=user.id,
533 org_id=user.organisation.id,
534 object_id=comment.id,
535 private=True,
536 question_id=score.question_id,
537 )
538 cmnt_evt = AuditEvent.create(session, "SCORE_COMMENT_ADDED", **kw)
539 cmnt_evt.add_change("Comment", "", msg)
540 session.add(cmnt_evt)
543def label_text(
544 project: Project, search_term: str, replace_term: str = "", dry_run=True
545):
546 q = (
547 project.qelements.filter(QElement.el_type.in_(("LB", "CB")))
548 .filter(QElement.label.collate("utf8mb4_bin").like(f"%{search_term}%"))
549 .add_columns(QuestionInstance.b36_number)
550 )
552 for label, qnum in q:
553 old_label = label.label
554 new_label = label.label.replace(search_term, replace_term)
555 if not dry_run:
556 label.label = new_label
557 yield dict(
558 change_type="label",
559 question_number=from_b36(qnum),
560 new=new_label,
561 old=old_label,
562 )
565def question_titles(
566 project: Project, search_term: str, replace_term: str = "", dry_run=True
567):
568 session = object_session(project)
569 assert session is not None
571 q = (
572 session.query(QuestionInstance)
573 .join(QuestionDefinition)
574 .options(noload(QuestionInstance.question_def, QuestionDefinition.elements))
575 .filter(QuestionInstance.project_id == project.id)
576 .filter(
577 QuestionDefinition.title.collate("utf8mb4_bin").like(f"%{search_term}%")
578 )
579 )
581 for qi in q:
582 qdef = qi.question_def
583 old_title = qdef.title
584 new_title = qdef.title.replace(search_term, replace_term)
585 if not dry_run:
586 qdef.title = new_title
587 yield dict(
588 change_type="title",
589 question_number=qi.number,
590 new=new_title,
591 old=old_title,
592 )
595def choices_text(
596 project: Project, search_term: str, replace_term: str = "", dry_run=True
597):
598 q = (
599 project.qelements.filter(QElement.el_type.in_(("CR", "CC")))
600 .filter(func.json_search(QElement.choices, "one", search_term) != None) # noqa: E711
601 .add_columns(QuestionInstance.b36_number)
602 )
604 for el, qnum in q:
605 new_choices = deepcopy(el.choices)
606 old_labels = [c["label"] for c in el.choices]
607 for choice in new_choices:
608 choice["label"] = choice["label"].replace(search_term, replace_term)
609 new_labels = [c["label"] for c in new_choices]
611 if not dry_run:
612 el.choices = new_choices
614 yield dict(
615 change_type="choice",
616 question_number=from_b36(qnum),
617 old=old_labels,
618 new=new_labels,
619 )
622def pretty_choices(choices: list[dict]) -> str:
623 if not isinstance(choices, list):
624 return str(choices)
625 txt = ""
626 for c in choices:
627 auto = (f" <{c['autoscore']}>") if c.get("autoscore", False) else ""
628 txt += f" - {c['label']}{auto}\n"
629 return txt
632def update_create_qdef(
633 qdef: QuestionDefinition, evt: AuditEvent, el_map: dict, el_dict: dict
634):
635 """
636 Update question elements where ID values are provided; add new elements if no id provided
638 Updated ids are removed from el_map - thus any removing elements are to be deleted
639 """
640 mutable_attrs = {
641 "colspan",
642 "rowspan",
643 "label",
644 "mandatory",
645 "regexp",
646 "height",
647 "width",
648 "row",
649 "col",
650 "choices",
651 }
652 if el_dict.get("id", None) is not None:
653 # Update existing element
654 old_el = el_map.pop(el_dict["id"])
655 for attr in mutable_attrs & el_dict.keys():
656 new_val = el_dict[attr]
657 old_val = getattr(old_el, attr)
658 if new_val != old_val:
659 setattr(old_el, attr, new_val)
660 evt_name = f"{old_el.__class__.__name__} #{old_el.id}, {attr.title()}"
661 if attr == "choices" and el_dict["el_type"] in ("CR", "CC"):
662 old_val = pretty_choices(old_val)
663 new_val = pretty_choices(new_val)
664 evt.add_change(evt_name, old_val, new_val)
665 else:
666 # A new element
667 el_type = el_dict.pop("el_type")
668 new_el = qdef.add_element(el_type, **el_dict)
669 evt_name = f"{new_el.__class__.__name__} Added"
670 evt.add_change(evt_name, None, new_el.summary)
673def check_for_saved_answers(session: "Session", qdef: QuestionDefinition, el_map: dict):
674 """
675 If there are elements to delete with associated answers raise
676 @raises CosmeticQuestionEditViolation
677 """
678 if (not qdef.is_shared) or (len(el_map) == 0):
679 return
680 answer_count = (
681 session.query(Answer).filter(Answer.element_id.in_(el_map.keys())).count()
682 )
683 if answer_count > 0:
684 del_ids = ", ".join(str(el_id) for el_id in el_map.keys())
685 m = f"Cannot delete question elements that have associated answers. Element IDs: {del_ids}"
686 raise CosmeticQuestionEditViolation(m)
689def delete_project_section(
690 session: "Session", user: User, project: Project, section: Section
691):
692 """
693 Delete the given Section and all questions and subsections contained within that
694 section.
695 """
697 for descendant_section in section.descendants:
698 delete_qinstances_update_def_refcounts(
699 session, project.id, section_id=descendant_section.id
700 )
701 session.delete(descendant_section)
703 delete_qinstances_update_def_refcounts(session, project.id, section_id=section.id)
704 session.delete(section)
706 evt = AuditEvent.create(
707 session,
708 evt_types.SECTION_DELETED,
709 project=project,
710 user_id=user.id,
711 org_id=user.organisation.id,
712 object_id=section.id,
713 private=True,
714 )
715 session.add(evt)
718def delete_project_section_question(
719 session: "Session",
720 user: User,
721 project: Project,
722 section: Section,
723 qi: QuestionInstance,
724):
725 """
726 Delete the Question with the given ID
728 The return value is an array of remaining instances of the same question that may exist
729 in other projects
731 """
733 evt = AuditEvent.create(
734 session,
735 "QUESTION_DELETED",
736 project=project,
737 user_id=user.id,
738 org_id=user.organisation.id,
739 object_id=qi.id,
740 private=True,
741 question_id=qi.id,
742 )
743 evt.add_change("Title", qi.title, None)
744 evt.add_change("Number", qi.number, None)
745 session.add(evt)
747 instances_remaining = []
748 with session.no_autoflush:
749 qdef = qi.question_def
750 session.delete(qi)
752 qdef.refcount -= 1
753 if qdef.refcount == 0:
754 session.delete(qdef)
755 else:
756 instances_remaining = [
757 dict(project_id=project.id, number=qi.number, id=qi.id)
758 for qi in qdef.instances
759 ]
760 section.renumber()
761 return instances_remaining
764def batch_create_audit_events(
765 session: "Session",
766 event_specs: list[dict],
767 related_objects: list | None = None,
768):
769 """
770 Generic function to batch create audit events and related objects.
772 This function provides a standardised way to create multiple audit events
773 efficiently, reducing database roundtrips and improving performance.
775 Args:
776 session: Database session
777 event_specs: List of dictionaries containing event specifications:
778 {
779 'event_type': str, # e.g., 'SCORE_CREATED', 'ANSWER_UPDATED'
780 'changes': list[tuple[str, old_value, new_value]], # Optional
781 'kwargs': dict # Additional kwargs for AuditEvent.create
782 }
783 related_objects: Optional list of related objects to add (e.g., comments)
785 Returns:
786 list of created AuditEvent objects
788 Example usage:
789 # Batch create multiple answer events
790 event_specs = [
791 {
792 'event_type': 'ANSWER_UPDATED',
793 'changes': [('Answer', 'old_value', 'new_value')],
794 'kwargs': {
795 'project': project,
796 'user_id': user.id,
797 'object_id': answer.id,
798 'question_id': question.id
799 }
800 },
801 # ... more event specs
802 ]
803 events = batch_create_audit_events(session, event_specs)
804 """
805 from postrfp.model import AuditEvent
807 # Batch create all audit events
808 audit_events = []
809 for spec in event_specs:
810 evt = AuditEvent.create(session, spec["event_type"], **spec["kwargs"])
812 # Add all changes for this event
813 for change_name, old_val, new_val in spec.get("changes", []):
814 evt.add_change(change_name, old_val, new_val)
816 audit_events.append(evt)
818 # Batch add all events
819 session.add_all(audit_events)
821 # Add any related objects
822 if related_objects:
823 session.add_all(related_objects)
825 return audit_events
828def batch_log_answer_events(
829 session: "Session",
830 answer_changes: list[tuple], # (answer, old_value, new_value, is_new)
831 project: Project,
832 user: User,
833 question: QuestionInstance,
834): # pragma: no cover
835 """
836 Batch log answer update/create events using the generic batch function.
838 Example of how the generic function can be used for different event types.
839 """
840 event_specs = []
842 for answer, old_value, new_value, is_new in answer_changes:
843 event_type = "ANSWER_CREATED" if is_new else "ANSWER_UPDATED"
844 event_specs.append(
845 {
846 "event_type": event_type,
847 "changes": [("Answer", old_value, new_value)],
848 "kwargs": {
849 "project": project,
850 "user_id": user.id,
851 "org_id": user.organisation.id,
852 "object_id": answer.id,
853 "question_id": question.id,
854 },
855 }
856 )
858 return batch_create_audit_events(session, event_specs)
861def log_score_events_batch(
862 session: "Session",
863 scores_to_update: list[tuple], # (score, initial_score_value)
864 scores_to_create: list, # [score, ...]
865 project: Project,
866 user: User,
867 add_comments: bool = True,
868 comment_text: str = "Autoscore Calculated for Multiple Choice Question",
869):
870 """
871 Batch log score events for multiple scores to reduce database overhead.
872 Optionally add autoscore comments and corresponding audit events.
873 """
874 # Build main score event specs
875 main_event_specs = []
877 for score, initial_score_value in scores_to_update:
878 main_event_specs.append(
879 {
880 "event_type": "SCORE_UPDATED",
881 "changes": [("Score", initial_score_value, score.score)],
882 "kwargs": {
883 "project": project,
884 "issue_id": score.issue_id,
885 "user_id": user.id,
886 "org_id": user.organisation.id,
887 "object_id": score.id,
888 "private": True,
889 "question_id": score.question_id,
890 },
891 }
892 )
894 for score in scores_to_create:
895 main_event_specs.append(
896 {
897 "event_type": "SCORE_CREATED",
898 "changes": [("Score", None, score.score)],
899 "kwargs": {
900 "project": project,
901 "issue_id": score.issue_id,
902 "user_id": user.id,
903 "org_id": user.organisation.id,
904 "object_id": score.id,
905 "private": True,
906 "question_id": score.question_id,
907 },
908 }
909 )
911 # Create the main audit events
912 batch_create_audit_events(session, main_event_specs)
914 # Optionally add autoscore comments and their audit events
915 if not add_comments:
916 return
918 session.flush() # Ensure scores and events have IDs before creating comments
920 # Create ScoreComment rows
921 score_comments = []
922 all_scores = [score for score, _ in scores_to_update] + scores_to_create
923 for score in all_scores:
924 comment = ScoreComment(
925 score=score,
926 user=user,
927 comment_time=utils.utcnow(),
928 comment_text=comment_text,
929 )
930 score_comments.append(comment)
932 session.add_all(score_comments)
933 session.flush() # Get comment IDs
935 # Create comment audit events for each comment
936 comment_event_specs = []
937 for comment in score_comments:
938 comment_event_specs.append(
939 {
940 "event_type": "SCORE_COMMENT_ADDED",
941 "changes": [("Comment", "", comment.comment_text)],
942 "kwargs": {
943 "project": project,
944 "issue_id": comment.score.issue_id,
945 "user_id": user.id,
946 "org_id": user.organisation.id,
947 "object_id": comment.id,
948 "private": True,
949 "question_id": comment.score.question_id,
950 },
951 }
952 )
954 batch_create_audit_events(session, comment_event_specs)