Coverage for postrfp/shared/update.py: 96%
310 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2Functions that perform SQL Update queries
3"""
5from copy import deepcopy
6from decimal import Decimal
7from datetime import datetime
8from typing import TYPE_CHECKING
10if TYPE_CHECKING:
11 from sqlalchemy.orm import Session
13from sqlalchemy.sql.elements import literal
14from sqlalchemy.sql.expression import insert, select
15from sqlalchemy import delete
17from sqlalchemy import func
18from sqlalchemy.orm import (
19 object_session,
20 noload,
21)
22from sqlalchemy.orm.exc import NoResultFound
24from postrfp.model import (
25 Answer,
26 QuestionResponseState,
27 ResponseStatus,
28 Participant,
29 ProjectPermission,
30 SectionPermission,
31 User,
32 Issue,
33 Project,
34 QElement,
35 AuditEvent,
36 ScoreComment,
37 QuestionInstance,
38 Section,
39 Score,
40 QuestionDefinition,
41 ImportType,
42 Weighting,
43 WeightingSet,
44)
45from postrfp.model.audit import evt_types, Status as EventStatus
47from postrfp.model.exc import CosmeticQuestionEditViolation
48from postrfp.shared.exceptions import AuthorizationFailure
49from postrfp.shared import serial
50from postrfp.model.questionnaire.b36 import from_b36
51from .fetch import generate_autoscores
54def grant_project_permission(session: "Session", project: Project, user: User):
55 """
56 Grants a (restricted) User permissions for the given
57 project. Returns the newly created ProjectPermission instance
58 """
59 if not user.is_restricted:
60 raise ValueError(
61 "Assigning ProjectPermission to a " + "non-restricted user has no effect"
62 )
63 try:
64 participant = (
65 session.query(Participant)
66 .filter_by(project_id=project.id, organisation=user.organisation)
67 .one()
68 )
69 pp = ProjectPermission()
70 pp.participant = participant
71 pp.user = user
72 session.add(pp)
73 except NoResultFound:
74 m = "User {user} is not a Participant in project {project}"
75 raise AuthorizationFailure(message=m)
76 return pp
79def grant_section_permissions(
80 session: "Session", project: Project, user: User, section_id_list: list[int]
81):
82 """
83 Grants the given user access to sections in the given project.
84 A ProjectPermission for this user/project is created if it doesn't exist
85 """
86 try:
87 project_permission = project.permissions.filter_by(user_id=user.id).one()
88 except NoResultFound:
89 project_permission = grant_project_permission(session, project, user)
91 for sec_id in section_id_list:
92 sp = SectionPermission(
93 section_id=sec_id, user=user, project_permission=project_permission
94 )
95 session.add(sp)
98def _update_or_create_weighting(
99 session: "Session",
100 entity_id: int,
101 weight_value: Decimal,
102 lookup: dict,
103 valid_id_set: set,
104 save_func,
105 project_id: int,
106 entity_type: str,
107 weighting_kwargs: dict,
108):
109 """
110 Helper function to update or create a weighting with sparse storage logic.
112 Args:
113 session: Database session
114 entity_id: ID of the question or section
115 weight_value: The weight value to set
116 lookup: Dict mapping entity IDs to existing Weighting objects
117 valid_id_set: Set of valid entity IDs for this project
118 save_func: Function to save new Weighting objects
119 project_id: ID of the project
120 entity_type: Type of entity ("Question" or "Section") for error messages
121 weighting_kwargs: Dict of kwargs to pass to Weighting constructor
122 """
123 if entity_id in lookup:
124 # Existing weight record found
125 if weight_value == 1:
126 # Delete record for sparse storage (weight = 1 is default)
127 session.delete(lookup[entity_id])
128 else:
129 # Update to non-default weight
130 lookup[entity_id].value = weight_value
131 elif entity_id in valid_id_set:
132 # No existing record, only create if non-default weight
133 if weight_value != 1:
134 save_func(Weighting(**weighting_kwargs, value=weight_value))
135 # If weight_value == 1, do nothing (sparse: no record needed for default)
136 else:
137 raise ValueError(
138 f"{entity_type} ID {entity_id} does not belong to project {project_id}"
139 )
142def save_weightset_weightings(
143 session: "Session", weightset: WeightingSet, weights_doc: serial.WeightingsDoc
144):
145 """
146 Save weightings to a weighting set using sparse storage approach.
148 - Weight records are only stored for non-default values (anything != 1)
149 - Setting a weight to 1 removes any existing weight record (sparse deletion)
150 - Missing weight records automatically default to 1 via fallback logic
152 Behavior:
153 - Updates existing weight records when new value != 1
154 - Deletes existing weight records when new value == 1 (sparse approach)
155 - Creates new weight records only when new value != 1
156 - Ignores requests to set weight = 1 on non-existing records (already sparse)
157 """
159 q_lookup = {}
160 sec_lookup = {}
162 for weighting in weightset.weightings:
163 if weighting.section_id:
164 sec_lookup[weighting.section_id] = weighting
165 else:
166 q_lookup[weighting.question_instance_id] = weighting
168 save = weightset.weightings.append # reference to bound method
170 q = session.query(QuestionInstance.id).filter(
171 QuestionInstance.project_id == weightset.project_id
172 )
173 qid_set = {qi.id for qi in q}
174 project_id = weightset.project_id
176 for question_weight in weights_doc.questions:
177 question_id = question_weight.question_instance_id
178 qweight = Decimal(str(question_weight.weight))
180 _update_or_create_weighting(
181 session=session,
182 entity_id=question_id,
183 weight_value=qweight,
184 lookup=q_lookup,
185 valid_id_set=qid_set,
186 save_func=save,
187 project_id=project_id,
188 entity_type="Question",
189 weighting_kwargs={"question_instance_id": question_id},
190 )
192 sq = session.query(Section).filter_by(project_id=project_id)
193 sec_id_set = {s.id for s in sq}
195 for sw in weights_doc.sections:
196 section_id = sw.section_id
197 sec_weight = Decimal(str(sw.weight))
199 _update_or_create_weighting(
200 session=session,
201 entity_id=section_id,
202 weight_value=sec_weight,
203 lookup=sec_lookup,
204 valid_id_set=sec_id_set,
205 save_func=save,
206 project_id=project_id,
207 entity_type="Section",
208 weighting_kwargs={"section_id": section_id},
209 )
212def save_default_weightings(
213 session, project: Project, weights_doc: serial.WeightingsDoc
214):
215 """
216 Save weightings to the project's default weighting set.
217 The method explicitly creates one if needed.
218 """
219 # Get the default weighting set ID (creates one if it doesn't exist)
220 weighting_set_id = project.get_or_create_default_weighting_set_id()
222 # Get the weighting set object for further operations
223 default_ws = session.get(WeightingSet, weighting_set_id)
225 # Initialize with default values of 1 if this is a new weighting set
226 if default_ws.weightings.count() == 0:
227 set_initial_weightings(default_ws, Decimal(1))
228 session.flush()
230 # Now delegate to the regular weightset saving function
231 save_weightset_weightings(session, default_ws, weights_doc)
234def set_initial_weightings(weighting_set: WeightingSet, initial_value: "Decimal"):
235 """
236 Create Weighting records for each QuestionInstance and Section for weighting_set.project_id
237 and weighting_set.weighting_set_id
238 """
239 session = object_session(weighting_set)
240 assert session is not None
242 qs = select(
243 literal(weighting_set.id), QuestionInstance.id, literal(initial_value)
244 ).where(QuestionInstance.project_id == weighting_set.project_id)
245 qi = insert(Weighting).from_select(
246 ["weighting_set_id", "question_instance_id", "value"], qs
247 )
249 session.execute(qi)
251 ss = select(literal(weighting_set.id), Section.id, literal(initial_value)).where(
252 Section.project_id == weighting_set.project_id
253 )
254 si = insert(Weighting).from_select(["weighting_set_id", "section_id", "value"], ss)
256 session.execute(si)
259def copy_weightings(
260 source_weighting_set: WeightingSet, destination_weighting_set: WeightingSet
261):
262 """
263 Copy Weighting values from source weighting set to destination
264 """
265 session = object_session(destination_weighting_set)
266 s = select(
267 literal(destination_weighting_set.id),
268 Weighting.section_id,
269 Weighting.question_instance_id,
270 Weighting.value,
271 ).where(Weighting.weighting_set_id == source_weighting_set.id)
273 i = insert(Weighting).from_select(
274 ["weighting_set_id", "section_id", "question_instance_id", "value"], s
275 )
276 assert session is not None
277 session.execute(i)
280def reset_scores(session: "Session", project: Project, user: User):
281 """
282 Deletes all scores & score comments for the given project.
283 Recreates Autoscores for multiple choice questions
284 """
285 stmt = delete(ScoreComment).where(
286 ScoreComment.score_id.in_(
287 select(Score.id).join(Issue).where(Issue.project_id == project.id)
288 )
289 )
290 session.execute(stmt)
292 issue_map = {}
294 for issue in project.scoreable_issues:
295 del_count = issue.scores.delete()
296 issue_map[issue.id] = dict(respondent_id=issue.respondent_id, deleted=del_count)
297 new_counts: dict[int, int] = {}
299 for ascore in generate_autoscores(project, session, user).values():
300 issue_id = ascore.issue_id
301 score = Score(
302 question_instance_id=ascore.question_id,
303 issue_id=issue_id,
304 score=ascore.score,
305 )
306 if issue_id in new_counts: # pragma: no cover
307 new_counts[issue_id] = new_counts[issue_id] + 1
308 else:
309 new_counts[issue_id] = 1
310 session.add(score)
312 for issue_id, new_count in new_counts.items():
313 issue_map[issue_id]["added"] = new_count
314 return issue_map
317def save_answers(
318 session: "Session",
319 user: User,
320 question: QuestionInstance,
321 answer_lookup: dict,
322 issue: Issue,
323 imported: bool = False,
324 set_done: bool = False,
325):
326 res = question.validate_and_save_answers(answer_lookup, issue)
327 if res.change_list is None or len(res.change_list) == 0:
328 return False
330 response_state: QuestionResponseState = issue.response_state_for_q(question.id)
331 response_state.date_updated = datetime.now()
332 response_state.updated_by = user.id
334 if res.unanswered_mandatory:
335 response_state.status = ResponseStatus.NOT_ANSWERED
336 else:
337 response_state.status = ResponseStatus.ANSWERED
339 evt_type = evt_types.ANSWER_CREATED if res.is_new else evt_types.ANSWER_UPDATED
340 if imported:
341 evt_type = evt_types.ANSWER_IMPORTED
343 evt = AuditEvent.create(
344 session,
345 evt_type,
346 object_id=response_state.id,
347 user=user,
348 project_id=issue.project_id,
349 issue=issue,
350 question_id=question.id,
351 )
352 if set_done:
353 evt.status = EventStatus.done
354 if imported:
355 evt.add_change("Import Source", issue.project.title, "")
356 for old_value, new_value in res.change_list:
357 evt.add_change("Answer", old_value, new_value)
358 session.add(evt)
360 return True
363def import_section(
364 session: "Session", src_sec: Section, des_sec: Section, type: ImportType
365) -> tuple[list[Section], list[QuestionInstance]]:
366 """Import Section from another Project"""
367 imp_secs: list[Section] = []
368 imp_qis: list[QuestionInstance] = []
369 new_sec = Section(
370 title=src_sec.title,
371 description=src_sec.description,
372 project_id=des_sec.project_id,
373 )
375 des_sec.subsections.append(new_sec)
376 imp_secs.append(new_sec)
377 for qi in src_sec.questions:
378 new_qi = import_q_instance(qi, new_sec, type)
379 imp_qis.append(new_qi)
381 for sec in src_sec.subsections:
382 secs, ques = import_section(session, sec, new_sec, type)
383 imp_secs += secs
384 imp_qis += ques
386 return imp_secs, imp_qis
389def import_q_instance(
390 src_qi: QuestionInstance, des_sec: Section, type: ImportType
391) -> QuestionInstance:
392 """Import question instances from another Project"""
393 src_qdef = src_qi.question_def
394 des_qdef = src_qi.question_def
395 des_sec_quesions = des_sec.questions
397 if type == ImportType.COPY:
398 des_qdef = QuestionDefinition(
399 title=src_qdef.title,
400 refcount=1,
401 parent_id=src_qdef.id,
402 )
404 for src_el in src_qdef.elements:
405 des_element = QElement(
406 row=src_el.row,
407 col=src_el.col,
408 colspan=src_el.colspan,
409 rowspan=src_el.rowspan,
410 el_type=src_el.el_type,
411 label=src_el.label,
412 mandatory=src_el.mandatory,
413 width=src_el.width,
414 height=src_el.height,
415 multitopic=src_el.multitopic,
416 regexp=src_el.regexp,
417 choices=src_el.choices,
418 )
419 des_qdef.elements.append(des_element)
420 elif type == ImportType.SHARE:
421 des_qdef.refcount += 1
423 des_qi: QuestionInstance = QuestionInstance(
424 project_id=des_sec.project_id,
425 section_id=des_sec.id,
426 question_def=des_qdef,
427 )
428 des_sec_quesions.append(des_qi)
429 return des_qi
432def copy_q_definition(original_qdef: QuestionDefinition, session: "Session"):
433 """Make a copy of the QuestionDefinition and all associated question elements"""
435 new_qdef = QuestionDefinition()
436 new_qdef.title = original_qdef.title
437 new_qdef.refcount = 1
438 new_qdef.parent_id = original_qdef.id
440 for el in original_qdef.elements:
441 new_el = QElement(
442 row=el.row,
443 col=el.col,
444 colspan=el.colspan,
445 rowspan=el.rowspan,
446 el_type=el.el_type,
447 label=el.label,
448 mandatory=el.mandatory,
449 width=el.width,
450 height=el.height,
451 multitopic=el.multitopic,
452 regexp=el.regexp,
453 choices=el.choices,
454 )
455 new_qdef.elements.append(new_el)
457 return new_qdef
460def delete_qinstances_update_def_refcounts(
461 session: "Session", project_id, section_id: int | None = None
462):
463 """
464 Delete question instances, orphan question definitions and update refcount
465 on remaining question definitions for the given project, optionally filtering
466 by section_id
467 """
469 def qf(q):
470 q = q.filter(QuestionInstance.project_id == project_id)
471 if section_id is not None:
472 q = q.filter(QuestionInstance.section_id == section_id)
473 return q
475 qiq = qf(session.query(QuestionInstance.question_def_id.label("id")))
477 qi_set = {r.id for r in qiq}
478 # Can't delete question defs if question instances remain, so delete
479 # instances first, having saved the relevant IDs
480 qf(session.query(QuestionInstance)).delete(synchronize_session=False)
482 # Delete question defs in this project with a refcount of one - not shared
483 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter(
484 QuestionDefinition.refcount == 1
485 ).delete(synchronize_session=False)
487 # Decrement refcount for all remaining question definitions
488 session.query(QuestionDefinition).filter(QuestionDefinition.id.in_(qi_set)).filter(
489 QuestionDefinition.refcount > 1
490 ).update({"refcount": QuestionDefinition.refcount - 1}, synchronize_session=False)
493def log_score_event(
494 session: "Session",
495 score: Score,
496 initial_score_value,
497 is_new: bool,
498 project: "Project",
499 user: User,
500 autoscore=False,
501):
502 event_class = "SCORE_CREATED" if is_new else "SCORE_UPDATED"
504 evt = AuditEvent.create(
505 session,
506 event_class,
507 project=project,
508 issue_id=score.issue_id,
509 user_id=user.id,
510 org_id=user.organisation.id,
511 object_id=score.id,
512 private=True,
513 question_id=score.question_id,
514 )
515 evt.add_change("Score", initial_score_value, score.score)
517 session.add(evt)
519 if autoscore:
520 msg = "Autoscore Calculated for Multiple Choice Question"
521 comment = ScoreComment(
522 score=score,
523 user=user,
524 comment_time=datetime.now(),
525 comment_text=msg,
526 )
527 session.add(comment)
528 session.flush()
530 kw = dict(
531 project=project,
532 issue_id=score.issue_id,
533 user_id=user.id,
534 org_id=user.organisation.id,
535 object_id=comment.id,
536 private=True,
537 question_id=score.question_id,
538 )
539 cmnt_evt = AuditEvent.create(session, "SCORE_COMMENT_ADDED", **kw)
540 cmnt_evt.add_change("Comment", "", msg)
541 session.add(cmnt_evt)
544def label_text(
545 project: Project, search_term: str, replace_term: str = "", dry_run=True
546):
547 q = (
548 project.qelements.filter(QElement.el_type.in_(("LB", "CB")))
549 .filter(QElement.label.collate("utf8mb4_bin").like(f"%{search_term}%"))
550 .add_columns(QuestionInstance.b36_number)
551 )
553 for label, qnum in q:
554 old_label = label.label
555 new_label = label.label.replace(search_term, replace_term)
556 if not dry_run:
557 label.label = new_label
558 yield dict(
559 change_type="label",
560 question_number=from_b36(qnum),
561 new=new_label,
562 old=old_label,
563 )
566def question_titles(
567 project: Project, search_term: str, replace_term: str = "", dry_run=True
568):
569 session = object_session(project)
570 assert session is not None
572 q = (
573 session.query(QuestionInstance)
574 .join(QuestionDefinition)
575 .options(noload(QuestionInstance.question_def, QuestionDefinition.elements))
576 .filter(QuestionInstance.project_id == project.id)
577 .filter(
578 QuestionDefinition.title.collate("utf8mb4_bin").like(f"%{search_term}%")
579 )
580 )
582 for qi in q:
583 qdef = qi.question_def
584 old_title = qdef.title
585 new_title = qdef.title.replace(search_term, replace_term)
586 if not dry_run:
587 qdef.title = new_title
588 yield dict(
589 change_type="title",
590 question_number=qi.number,
591 new=new_title,
592 old=old_title,
593 )
596def choices_text(
597 project: Project, search_term: str, replace_term: str = "", dry_run=True
598):
599 q = (
600 project.qelements.filter(QElement.el_type.in_(("CR", "CC")))
601 .filter(func.json_search(QElement.choices, "one", search_term) != None) # noqa: E711
602 .add_columns(QuestionInstance.b36_number)
603 )
605 for el, qnum in q:
606 new_choices = deepcopy(el.choices)
607 old_labels = [c["label"] for c in el.choices]
608 for choice in new_choices:
609 choice["label"] = choice["label"].replace(search_term, replace_term)
610 new_labels = [c["label"] for c in new_choices]
612 if not dry_run:
613 el.choices = new_choices
615 yield dict(
616 change_type="choice",
617 question_number=from_b36(qnum),
618 old=old_labels,
619 new=new_labels,
620 )
623def pretty_choices(choices: list[dict]) -> str:
624 if not isinstance(choices, list):
625 return str(choices)
626 txt = ""
627 for c in choices:
628 auto = (f" <{c['autoscore']}>") if c.get("autoscore", False) else ""
629 txt += f" - {c['label']}{auto}\n"
630 return txt
633def update_create_qdef(
634 qdef: QuestionDefinition, evt: AuditEvent, el_map: dict, el_dict: dict
635):
636 """
637 Update question elements where ID values are provided; add new elements if no id provided
639 Updated ids are removed from el_map - thus any removing elements are to be deleted
640 """
641 mutable_attrs = {
642 "colspan",
643 "rowspan",
644 "label",
645 "mandatory",
646 "regexp",
647 "height",
648 "width",
649 "row",
650 "col",
651 "choices",
652 }
653 if el_dict.get("id", None) is not None:
654 # Update existing element
655 old_el = el_map.pop(el_dict["id"])
656 for attr in mutable_attrs & el_dict.keys():
657 new_val = el_dict[attr]
658 old_val = getattr(old_el, attr)
659 if new_val != old_val:
660 setattr(old_el, attr, new_val)
661 evt_name = f"{old_el.__class__.__name__} #{old_el.id}, {attr.title()}"
662 if attr == "choices" and el_dict["el_type"] in ("CR", "CC"):
663 old_val = pretty_choices(old_val)
664 new_val = pretty_choices(new_val)
665 evt.add_change(evt_name, old_val, new_val)
666 else:
667 # A new element
668 el_type = el_dict.pop("el_type")
669 new_el = qdef.add_element(el_type, **el_dict)
670 evt_name = f"{new_el.__class__.__name__} Added"
671 evt.add_change(evt_name, None, new_el.summary)
674def check_for_saved_answers(session: "Session", qdef: QuestionDefinition, el_map: dict):
675 """
676 If there are elements to delete with associated answers raise
677 @raises CosmeticQuestionEditViolation
678 """
679 if (not qdef.is_shared) or (len(el_map) == 0):
680 return
681 answer_count = (
682 session.query(Answer).filter(Answer.element_id.in_(el_map.keys())).count()
683 )
684 if answer_count > 0:
685 del_ids = ", ".join(str(el_id) for el_id in el_map.keys())
686 m = f"Cannot delete question elements that have associated answers. Element IDs: {del_ids}"
687 raise CosmeticQuestionEditViolation(m)
690def delete_project_section(
691 session: "Session", user: User, project: Project, section: Section
692):
693 """
694 Delete the given Section and all questions and subsections contained within that
695 section.
696 """
698 for descendant_section in section.descendants:
699 delete_qinstances_update_def_refcounts(
700 session, project.id, section_id=descendant_section.id
701 )
702 session.delete(descendant_section)
704 delete_qinstances_update_def_refcounts(session, project.id, section_id=section.id)
705 session.delete(section)
707 evt = AuditEvent.create(
708 session,
709 evt_types.SECTION_DELETED,
710 project=project,
711 user_id=user.id,
712 org_id=user.organisation.id,
713 object_id=section.id,
714 private=True,
715 )
716 session.add(evt)
719def delete_project_section_question(
720 session: "Session",
721 user: User,
722 project: Project,
723 section: Section,
724 qi: QuestionInstance,
725):
726 """
727 Delete the Question with the given ID
729 The return value is an array of remaining instances of the same question that may exist
730 in other projects
732 """
734 evt = AuditEvent.create(
735 session,
736 "QUESTION_DELETED",
737 project=project,
738 user_id=user.id,
739 org_id=user.organisation.id,
740 object_id=qi.id,
741 private=True,
742 question_id=qi.id,
743 )
744 evt.add_change("Title", qi.title, None)
745 evt.add_change("Number", qi.number, None)
746 session.add(evt)
748 instances_remaining = []
749 with session.no_autoflush:
750 qdef = qi.question_def
751 session.delete(qi)
753 qdef.refcount -= 1
754 if qdef.refcount == 0:
755 session.delete(qdef)
756 else:
757 instances_remaining = [
758 dict(project_id=project.id, number=qi.number, id=qi.id)
759 for qi in qdef.instances
760 ]
761 section.renumber()
762 return instances_remaining
765def batch_create_audit_events(
766 session: "Session",
767 event_specs: list[dict],
768 related_objects: list | None = None,
769):
770 """
771 Generic function to batch create audit events and related objects.
773 This function provides a standardised way to create multiple audit events
774 efficiently, reducing database roundtrips and improving performance.
776 Args:
777 session: Database session
778 event_specs: List of dictionaries containing event specifications:
779 {
780 'event_type': str, # e.g., 'SCORE_CREATED', 'ANSWER_UPDATED'
781 'changes': list[tuple[str, old_value, new_value]], # Optional
782 'kwargs': dict # Additional kwargs for AuditEvent.create
783 }
784 related_objects: Optional list of related objects to add (e.g., comments)
786 Returns:
787 list of created AuditEvent objects
789 Example usage:
790 # Batch create multiple answer events
791 event_specs = [
792 {
793 'event_type': 'ANSWER_UPDATED',
794 'changes': [('Answer', 'old_value', 'new_value')],
795 'kwargs': {
796 'project': project,
797 'user_id': user.id,
798 'object_id': answer.id,
799 'question_id': question.id
800 }
801 },
802 # ... more event specs
803 ]
804 events = batch_create_audit_events(session, event_specs)
805 """
806 from postrfp.model import AuditEvent
808 # Batch create all audit events
809 audit_events = []
810 for spec in event_specs:
811 evt = AuditEvent.create(session, spec["event_type"], **spec["kwargs"])
813 # Add all changes for this event
814 for change_name, old_val, new_val in spec.get("changes", []):
815 evt.add_change(change_name, old_val, new_val)
817 audit_events.append(evt)
819 # Batch add all events
820 session.add_all(audit_events)
822 # Add any related objects
823 if related_objects:
824 session.add_all(related_objects)
826 return audit_events
829def batch_log_answer_events(
830 session: "Session",
831 answer_changes: list[tuple], # (answer, old_value, new_value, is_new)
832 project: Project,
833 user: User,
834 question: QuestionInstance,
835): # pragma: no cover
836 """
837 Batch log answer update/create events using the generic batch function.
839 Example of how the generic function can be used for different event types.
840 """
841 event_specs = []
843 for answer, old_value, new_value, is_new in answer_changes:
844 event_type = "ANSWER_CREATED" if is_new else "ANSWER_UPDATED"
845 event_specs.append(
846 {
847 "event_type": event_type,
848 "changes": [("Answer", old_value, new_value)],
849 "kwargs": {
850 "project": project,
851 "user_id": user.id,
852 "org_id": user.organisation.id,
853 "object_id": answer.id,
854 "question_id": question.id,
855 },
856 }
857 )
859 return batch_create_audit_events(session, event_specs)
862def log_score_events_batch(
863 session: "Session",
864 scores_to_update: list[tuple], # (score, initial_score_value)
865 scores_to_create: list, # [score, ...]
866 project: Project,
867 user: User,
868 add_comments: bool = True,
869 comment_text: str = "Autoscore Calculated for Multiple Choice Question",
870):
871 """
872 Batch log score events for multiple scores to reduce database overhead.
873 Optionally add autoscore comments and corresponding audit events.
874 """
875 # Build main score event specs
876 main_event_specs = []
878 for score, initial_score_value in scores_to_update:
879 main_event_specs.append(
880 {
881 "event_type": "SCORE_UPDATED",
882 "changes": [("Score", initial_score_value, score.score)],
883 "kwargs": {
884 "project": project,
885 "issue_id": score.issue_id,
886 "user_id": user.id,
887 "org_id": user.organisation.id,
888 "object_id": score.id,
889 "private": True,
890 "question_id": score.question_id,
891 },
892 }
893 )
895 for score in scores_to_create:
896 main_event_specs.append(
897 {
898 "event_type": "SCORE_CREATED",
899 "changes": [("Score", None, score.score)],
900 "kwargs": {
901 "project": project,
902 "issue_id": score.issue_id,
903 "user_id": user.id,
904 "org_id": user.organisation.id,
905 "object_id": score.id,
906 "private": True,
907 "question_id": score.question_id,
908 },
909 }
910 )
912 # Create the main audit events
913 batch_create_audit_events(session, main_event_specs)
915 # Optionally add autoscore comments and their audit events
916 if not add_comments:
917 return
919 session.flush() # Ensure scores and events have IDs before creating comments
921 # Create ScoreComment rows
922 score_comments = []
923 all_scores = [score for score, _ in scores_to_update] + scores_to_create
924 for score in all_scores:
925 comment = ScoreComment(
926 score=score,
927 user=user,
928 comment_time=datetime.now(),
929 comment_text=comment_text,
930 )
931 score_comments.append(comment)
933 session.add_all(score_comments)
934 session.flush() # Get comment IDs
936 # Create comment audit events for each comment
937 comment_event_specs = []
938 for comment in score_comments:
939 comment_event_specs.append(
940 {
941 "event_type": "SCORE_COMMENT_ADDED",
942 "changes": [("Comment", "", comment.comment_text)],
943 "kwargs": {
944 "project": project,
945 "issue_id": comment.score.issue_id,
946 "user_id": user.id,
947 "org_id": user.organisation.id,
948 "object_id": comment.id,
949 "private": True,
950 "question_id": comment.score.question_id,
951 },
952 }
953 )
955 batch_create_audit_events(session, comment_event_specs)