Coverage for postrfp / model / questionnaire / nodes.py: 97%
259 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1import logging
2from decimal import Decimal
3from enum import IntEnum
4from typing import Optional, TYPE_CHECKING, Any, Union, Protocol
6from sqlalchemy import (
7 Integer,
8 ForeignKey,
9 Index,
10 text,
11 UniqueConstraint,
12 ForeignKeyConstraint,
13)
14from sqlalchemy.orm import (
15 Mapped,
16 DynamicMapped,
17 mapped_column,
18 relationship,
19 validates,
20 object_session,
21)
22from sqlalchemy.ext.orderinglist import ordering_list
23from sqlalchemy.types import VARCHAR, INTEGER, TEXT
25from postrfp.model.meta import Base, Visitor
26from postrfp.model.questionnaire.answering import Answer
27from postrfp.model.questionnaire.b36 import (
28 to_b36,
29 from_b36,
30)
31from postrfp.model.questionnaire.weightings import TotalWeighting
33from postrfp.model.questionnaire.qelements import MultipleChoice, QElement, el_keys
35from ..exc import (
36 ValidationFailure,
37 WeightingsNotLoadedException,
38)
40if TYPE_CHECKING:
41 from postrfp.model.composite import QuestionMeta
42 from postrfp.model.project import Project
43 from postrfp.model.acl import SectionPermission
44 from postrfp.model.tags import Tag
46"""
47Note - default SQL JOIN / load strategy is defined by arguments
48to 'relationship'
50Lots of joins involved when loading questionnaires. Current strategy is:
52Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default,
53 returns a query
55QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join
56QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join
58For loading large collections (e.g. full questionnaire), these loading
59strategies are overridden, e.g. loading question elements in a subquery load
60"""
62log = logging.getLogger(__name__)
65class _ModelDumpable(Protocol):
66 """Protocol for objects that have a model_dump method"""
68 def model_dump(self) -> dict[str, Any]: ...
71class ImportType(IntEnum):
72 SHARE = 0
73 COPY = 10
76class NodeMixin:
77 position: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
78 b36_number: Mapped[str] = mapped_column(VARCHAR(length=30), nullable=True)
80 @property
81 def number(self) -> str | None:
82 return from_b36(self.b36_number)
84 @number.setter
85 def number(self, value: str) -> None:
86 self.b36_number = to_b36(value)
89class Section(Base, NodeMixin):
90 __tablename__ = "sections"
91 __table_args__ = (
92 UniqueConstraint("id", "project_id", name="uk_section_project"),
93 Index("idx_sections_project_b36", "project_id", "b36_number"),
94 )
96 id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
97 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False)
99 description: Mapped[Optional[str]] = mapped_column(TEXT())
101 project_id: Mapped[int] = mapped_column(
102 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False
103 )
104 parent_id: Mapped[Optional[int]] = mapped_column(
105 INTEGER, ForeignKey("sections.id", ondelete="CASCADE")
106 )
108 subsections: Mapped[list["Section"]] = relationship(
109 "Section",
110 order_by="Section.position",
111 collection_class=ordering_list("position", count_from=1),
112 )
114 project: Mapped["Project"] = relationship(
115 "Project",
116 primaryjoin="Section.project_id==Project.id",
117 back_populates="sections",
118 )
120 questions: Mapped[list["QuestionInstance"]] = relationship(
121 "QuestionInstance",
122 order_by="QuestionInstance.position",
123 collection_class=ordering_list("position", count_from=1),
124 back_populates="section",
125 cascade="all, delete-orphan",
126 passive_deletes=True,
127 overlaps="section",
128 )
130 parent: Mapped[Optional["Section"]] = relationship(
131 "Section",
132 remote_side=[id],
133 back_populates="subsections",
134 cascade_backrefs=False,
135 )
137 questions_query: DynamicMapped["QuestionInstance"] = relationship(
138 "QuestionInstance",
139 lazy="dynamic",
140 order_by="QuestionInstance.position",
141 viewonly=True,
142 )
144 perms: DynamicMapped["SectionPermission"] = relationship(
145 "SectionPermission", lazy="dynamic"
146 )
148 descendants: DynamicMapped["Section"] = relationship(
149 "Section",
150 lazy="dynamic",
151 viewonly=True,
152 order_by="Section.b36_number",
153 primaryjoin="and_("
154 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), "
155 "remote(foreign(Section.project_id)) == Section.project_id, "
156 "remote(foreign(Section.id)) != Section.id)",
157 )
159 immediate_children: DynamicMapped["Section"] = relationship(
160 "Section",
161 lazy="dynamic",
162 viewonly=True,
163 order_by="Section.position",
164 primaryjoin="and_("
165 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), "
166 "remote(foreign(Section.project_id)) == Section.project_id, "
167 "remote(foreign(Section.id)) != Section.id, "
168 "func.length(remote(foreign(Section.b36_number))) == func.length(Section.b36_number) + 2"
169 ")",
170 )
172 ancestors: Mapped[list["Section"]] = relationship(
173 "Section",
174 viewonly=True,
175 order_by="Section.b36_number",
176 primaryjoin="and_("
177 "remote(foreign(Section.b36_number)) == func.LEFT(Section.b36_number, func.LENGTH(remote(foreign(Section.b36_number)))), "
178 "remote(foreign(Section.project_id)) == Section.project_id, "
179 "remote(foreign(Section.id)) != Section.id)",
180 )
182 def __repr__(self) -> str:
183 return f"<Sec id:{self.id} Num:{self.number} Parent: {self.parent_id} - {self.title}>"
185 def as_dict(self) -> dict[str, Any]:
186 return {
187 "id": self.id,
188 "type": "section",
189 "title": self.title,
190 "description": self.description,
191 "number": self.number,
192 "number36": self.b36_number,
193 "parent_id": self.parent_id,
194 "subsections": [],
195 "questions": [],
196 }
198 def renumber(self) -> None:
199 from postrfp.model.questionnaire.renumber import renumber_tree
201 session = object_session(self) # type: ignore
202 session.flush() # type: ignore
203 renumber_tree(session, self) # type: ignore
205 def accept(self, visitor: Visitor) -> None:
206 """
207 Call hello_section, visit_question(s), goodbye_section
208 on the passed visitor and recurse to questions and subsections
209 """
210 visitor.hello_section(self)
212 # check if method is implemented in visitor's own class, not in parent
213 # i.e. only load questions if the visitor has its own implementation
214 # of this method
215 if "visit_question" in visitor.__class__.__dict__:
216 for q in self.questions:
217 visitor.visit_question(q)
219 for subsec in self.subsections:
220 subsec.accept(visitor)
222 visitor.goodbye_section(self)
224 @property
225 def is_top_level(self) -> bool:
226 return self.parent_id is None
228 @property
229 def absolute_weight(self) -> Decimal:
230 if self.is_top_level:
231 return Decimal(1)
232 try:
233 return self._absolute_weight # type: ignore
234 except AttributeError:
235 try:
236 nm = self.normalised_weight # type: ignore
237 except AttributeError:
238 m = "normalised weights not loaded - run HierarchicalWeightingVisitor"
239 raise WeightingsNotLoadedException(m)
240 self._absolute_weight = nm * self.parent.absolute_weight # type: ignore
241 return self._absolute_weight
243 @absolute_weight.setter
244 def absolute_weight(self, abs_weight: Decimal) -> None:
245 self._absolute_weight = abs_weight
247 def set_question_positions(self) -> None:
248 """
249 Set the 'position' attribute for questions in this section indexing from one.
250 Note: This is typically not needed when using OrderingList which maintains positions automatically.
251 """
252 for pos, question in enumerate(self.questions, 1):
253 question.position = pos
255 def set_subsection_positions(self) -> None:
256 """
257 Set the 'position' attribute for subsections of this section indexing from one.
258 Note: This is typically not needed when using OrderingList which maintains positions automatically.
259 """
260 for pos, sec in enumerate(self.subsections, 1):
261 sec.position = pos
263 @validates("questions", "subsections", include_backrefs=False)
264 def assign_project_id(
265 self, attr_name: str, instance: Union["Section", "QuestionInstance"]
266 ) -> Union["Section", "QuestionInstance"]:
267 if instance.project_id is None:
268 instance.project_id = self.project_id
270 return instance
273class QuestionDefinition(Base):
274 __tablename__ = "questions"
276 title: Mapped[str] = mapped_column(TEXT(), nullable=False)
278 parent_id: Mapped[Optional[int]] = mapped_column(Integer, index=True)
279 refcount: Mapped[int] = mapped_column(
280 "referenceCount", Integer, nullable=False, server_default=text("'1'")
281 )
282 elements: Mapped[list["QElement"]] = relationship(
283 "QElement",
284 lazy="joined",
285 order_by="QElement.row,QElement.col",
286 cascade="all, delete-orphan",
287 passive_deletes=True,
288 back_populates="question_def",
289 )
291 _elements: DynamicMapped["QElement"] = relationship(
292 "QElement", lazy="dynamic", viewonly=True
293 )
295 instances: DynamicMapped["QuestionInstance"] = relationship(
296 "QuestionInstance", lazy="dynamic", back_populates="question_def"
297 )
299 meta: Mapped[Optional["QuestionMeta"]] = relationship(
300 "QuestionMeta",
301 uselist=False,
302 back_populates="question_def",
303 cascade="all, delete",
304 passive_deletes=True,
305 )
307 def __repr__(self) -> str:
308 return f'<QDef Id: {self.id}, Title: "{self.title}">'
310 def __eq__(self, o: object) -> bool:
311 if isinstance(o, QuestionDefinition):
312 src_elements = sorted(
313 self.elements, key=lambda e: [getattr(e, k) for k in el_keys]
314 )
315 des_elements = sorted(
316 o.elements, key=lambda e: [getattr(e, k) for k in el_keys]
317 )
318 return self.title == o.title and src_elements == des_elements
319 return False
321 def __hash__(self) -> int:
322 return super().__hash__()
324 @property
325 def is_shared(self) -> bool:
326 if self.refcount is None:
327 return False
328 return self.refcount > 1
330 def get_element(self, element_id: int) -> QElement:
331 return self._elements.filter(QElement.id == element_id).one()
333 def as_dict(
334 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False
335 ) -> list[Any]:
336 """
337 Returns a list of question element lists, each inner list corresponding
338 to a row in a question table
340 Groups elements into rows - returning a list of lists
342 """
343 row_list: list[list[dict[str, Any]]] = []
344 current_row = -1
345 for el in self.elements:
346 if el.row > current_row:
347 row_list.append([])
348 current_row = el.row
350 if el.contains_choices:
351 el_dict = el.as_dict(vendor_view=vendor_view)
352 else:
353 el_dict = el.as_dict()
355 if el.is_answerable and answer_lookup is not None:
356 el_dict["answer"] = answer_lookup.get(el.id, None)
358 row_list[-1].append(el_dict)
360 return row_list
362 @property
363 def answerable_elements(self) -> list[QElement]:
364 return [e for e in self.elements if e.is_answerable]
366 def add_element(self, el_name: str, **kwargs: Any) -> QElement:
367 el = QElement.build(el_name, **kwargs)
368 self.elements.append(el)
369 return el
371 def is_tabular(self) -> bool:
372 return self.column_count != 1
374 @property
375 def column_count(self) -> int:
376 if not self.elements:
377 return 0
378 return max(el.col for el in self.elements)
380 @property
381 def row_count(self) -> int:
382 if not self.elements:
383 return 0
384 return max(el.row for el in self.elements)
386 @property
387 def grid_area(self) -> int:
388 """Number of available grid cells available according to row and col values of Elements"""
389 return self.row_count * self.column_count
391 @property
392 def occupied_area(self) -> int:
393 """
394 The number of grid cells occupied by cells according to their rowspan and colspan values
395 If rowspans or colspans are set correctly this value will be equal to self.grid_area
396 """
397 return sum(el.cell_area for el in self.elements)
399 @property
400 def is_table_valid(self) -> bool:
401 return self.grid_area == self.occupied_area
403 @classmethod
404 def build(
405 cls, qdict: Union[dict[str, Any], "_ModelDumpable"], strip_ids: bool = False
406 ) -> "QuestionDefinition":
407 """
408 Create a QuestionDefinition, complete with QElements, from a
409 a dict datastructure as provided by serial.QuestionDef
410 """
411 if not isinstance(qdict, dict):
412 qdict = qdict.model_dump()
414 qdef = cls(title=qdict["title"])
415 for row_idx, row in enumerate(qdict["elements"], start=1):
416 for col_idx, el in enumerate(row, start=1):
417 el["row"] = row_idx
418 el["col"] = col_idx
419 if strip_ids:
420 el["id"] = None
421 qdef.add_element(el["el_type"], **el)
422 return qdef
425qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id"
428class SaveAnswersResult:
429 def __init__(
430 self,
431 is_new: bool,
432 change_list: list[tuple[str | None, str]],
433 unanswered_mandatory: bool,
434 ) -> None:
435 self.is_new = is_new
436 self.change_list = change_list
437 self.unanswered_mandatory = unanswered_mandatory
440class QuestionInstance(Base, NodeMixin):
441 __tablename__ = "question_instances"
443 __table_args__ = ( # type: ignore
444 ForeignKeyConstraint(
445 ["section_id", "project_id"],
446 ["sections.id", "sections.project_id"],
447 name="fk_qi_section_project",
448 ondelete="CASCADE",
449 ),
450 Index(
451 "unique_question_project",
452 "project_id",
453 "question_id",
454 unique=True,
455 ),
456 )
458 public_attrs = "id,title,position,section_id,number,url".split(",")
460 # section_id no longer needs the ForeignKey definition here, as it's part of the composite FK
461 section_id: Mapped[int] = mapped_column(
462 "section_id",
463 INTEGER,
464 # ForeignKey("sections.id", ondelete="CASCADE"), # Removed simple FK
465 nullable=False,
466 )
467 question_def_id: Mapped[int] = mapped_column(
468 "question_id",
469 INTEGER,
470 ForeignKey(QuestionDefinition.id), # Keep this FK to questions table
471 nullable=False,
472 )
473 # project_id still needs its own FK definition to the projects table
474 project_id: Mapped[int] = mapped_column(
475 "project_id",
476 INTEGER,
477 ForeignKey("projects.id", ondelete="CASCADE"), # Keep this FK to projects table
478 nullable=False,
479 )
481 question_def: Mapped["QuestionDefinition"] = relationship(
482 QuestionDefinition, lazy="joined", innerjoin=True, back_populates="instances"
483 )
484 project: Mapped["Project"] = relationship(
485 "Project", back_populates="questions", overlaps="questions"
486 )
487 section: Mapped["Section"] = relationship(
488 "Section", back_populates="questions", overlaps="project"
489 )
490 parent: Mapped["Section"] = relationship("Section", viewonly=True)
491 _answers: DynamicMapped["Answer"] = relationship(
492 "Answer",
493 lazy="dynamic",
494 cascade="all, delete",
495 passive_deletes=True,
496 back_populates="question_instance",
497 )
499 total_weightings: Mapped["TotalWeighting"] = relationship(
500 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade="all"
501 )
503 meta: Mapped[Optional["QuestionMeta"]] = relationship(
504 "QuestionMeta",
505 uselist=False,
506 back_populates="question_instance",
507 secondary="questions",
508 viewonly=True,
509 )
510 tags: Mapped[list["Tag"]] = relationship(
511 "Tag",
512 secondary="tags_qinstances",
513 back_populates="question_instances",
514 passive_deletes=True,
515 )
517 def __repr__(self) -> str:
518 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>"
520 def __eq__(self, o: object) -> bool:
521 if isinstance(o, QuestionInstance):
522 return self.question_def == o.question_def
523 return False
525 def __hash__(self) -> int:
526 return super().__hash__()
528 def answers_for_issue(self, issue_id: Any) -> Any:
529 return self._answers.filter(Answer.issue_id == issue_id)
531 def single_vendor_dict(self, issue: Any) -> dict[str, Any]:
532 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)}
533 return {
534 "number": self.number,
535 "id": self.id,
536 "title": self.title,
537 "respondent": issue.respondent.as_dict(),
538 "elements": self.question_def.as_dict(
539 vendor_view=True, answer_lookup=lookup
540 ),
541 }
543 @property
544 def title(self) -> str:
545 return self.question_def.title
547 @property
548 def elements(self) -> Any:
549 return self.question_def.elements
551 def as_dict(
552 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False
553 ) -> dict[str, Any]:
554 return {
555 "id": self.id,
556 "title": self.title,
557 "number36": self.b36_number,
558 "number": self.number,
559 "elements": self.question_def.as_dict(
560 answer_lookup=answer_lookup, vendor_view=vendor_view
561 ),
562 "type": "question",
563 }
565 @property
566 def is_autoscored(self) -> bool:
567 for element in self.elements:
568 if isinstance(element, MultipleChoice) and element.choices: # type: ignore
569 for choice in element.choices: # type: ignore
570 if choice.get("autoscore", None):
571 return True
572 return False
574 @property
575 def absolute_weight(self) -> Decimal:
576 if not hasattr(self, "_absolute_weight"):
577 try:
578 self._absolute_weight = (
579 self.normalised_weight * self.section.absolute_weight # type: ignore
580 )
581 except AttributeError:
582 m = "normalised weights not loaded - run HierarchicalWeightingVisitor"
583 raise WeightingsNotLoadedException(m)
584 return self._absolute_weight
586 @absolute_weight.setter
587 def absolute_weight(self, abs_weight: Decimal) -> None:
588 self._absolute_weight = abs_weight
590 def validate_and_save_answers(
591 self, answer_doc: dict[int, str], issue: Any
592 ) -> SaveAnswersResult:
593 changes = []
594 unanswered_mandatory = False
595 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)}
596 for element in self.question_def.answerable_elements:
597 el_id = element.id
598 if el_id in answer_doc:
599 new_answer = answer_doc.pop(el_id)
600 if new_answer is None:
601 continue
602 element.validate(new_answer)
603 if el_id in current_answers:
604 current_answer = current_answers[el_id]
605 old_answer = current_answer.answer
606 if old_answer != new_answer:
607 current_answer.answer = new_answer
608 changes.append((old_answer, new_answer))
609 else:
610 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id)
611 self._answers.append(a)
612 changes.append((None, new_answer))
613 else:
614 if element.mandatory and el_id not in current_answers:
615 unanswered_mandatory = True
617 if len(answer_doc) > 0:
618 tmpl = "Unable to match all answers with element IDs: {}"
619 ids = ", ".join(str(k) for k in answer_doc.keys())
620 raise ValidationFailure(tmpl.format(ids))
622 is_new = len(current_answers) == 0
624 return SaveAnswersResult(is_new, changes, unanswered_mandatory)