Coverage for postrfp/model/questionnaire/nodes.py: 95%
265 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import logging
2from decimal import Decimal
3from enum import IntEnum
4from typing import Optional, TYPE_CHECKING, Any, Union, Protocol
6from sqlalchemy import (
7 Integer,
8 ForeignKey,
9 Index,
10 text,
11 UniqueConstraint,
12 ForeignKeyConstraint,
13)
14from sqlalchemy.orm import (
15 Mapped,
16 DynamicMapped,
17 mapped_column,
18 relationship,
19 validates,
20 object_session,
21 Session,
22 Query,
23)
24from sqlalchemy.ext.orderinglist import ordering_list
25from sqlalchemy.types import VARCHAR, INTEGER, TEXT
27from postrfp.model.meta import Base, Visitor
28from postrfp.model.questionnaire.answering import Answer
29from postrfp.model.questionnaire.b36 import (
30 to_b36,
31 from_b36,
32)
33from postrfp.model.questionnaire.weightings import TotalWeighting
35from postrfp.model.questionnaire.qelements import MultipleChoice, QElement, el_keys
37from ..exc import (
38 ValidationFailure,
39 WeightingsNotLoadedException,
40)
42if TYPE_CHECKING:
43 from postrfp.model.composite import QuestionMeta
44 from postrfp.model.project import Project
45 from postrfp.model.acl import SectionPermission
46 from postrfp.model.tags import Tag
47 from sqlalchemy import Subquery
49"""
50Note - default SQL JOIN / load strategy is defined by arguments
51to 'relationship'
53Lots of joins involved when loading questionnaires. Current strategy is:
55Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default,
56 returns a query
58QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join
59QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join
61For loading large collections (e.g. full questionnaire), these loading
62strategies are overridden, e.g. loading question elements in a subquery load
63"""
65log = logging.getLogger(__name__)
68class _ModelDumpable(Protocol):
69 """Protocol for objects that have a model_dump method"""
71 def model_dump(self) -> dict[str, Any]: ...
74class ImportType(IntEnum):
75 SHARE = 0
76 COPY = 10
79class NodeMixin:
80 position: Mapped[int] = mapped_column(Integer, default=1, nullable=False)
81 b36_number: Mapped[str] = mapped_column(VARCHAR(length=30), nullable=True)
83 @property
84 def number(self) -> str | None:
85 return from_b36(self.b36_number)
87 @number.setter
88 def number(self, value: str) -> None:
89 self.b36_number = to_b36(value)
92class Section(Base, NodeMixin):
93 __tablename__ = "sections"
94 __table_args__ = (UniqueConstraint("id", "project_id", name="uk_section_project"),)
96 id: Mapped[int] = mapped_column(INTEGER, primary_key=True)
97 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False)
99 description: Mapped[Optional[str]] = mapped_column(TEXT())
101 project_id: Mapped[int] = mapped_column(
102 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False
103 )
104 parent_id: Mapped[Optional[int]] = mapped_column(
105 INTEGER, ForeignKey("sections.id", ondelete="CASCADE")
106 )
108 subsections: Mapped[list["Section"]] = relationship(
109 "Section",
110 order_by="Section.position",
111 collection_class=ordering_list("position", count_from=1),
112 )
114 project: Mapped["Project"] = relationship(
115 "Project",
116 primaryjoin="Section.project_id==Project.id",
117 back_populates="sections",
118 )
120 questions: Mapped[list["QuestionInstance"]] = relationship(
121 "QuestionInstance",
122 order_by="QuestionInstance.position",
123 collection_class=ordering_list("position", count_from=1),
124 back_populates="section",
125 cascade="all, delete-orphan",
126 passive_deletes=True,
127 overlaps="section",
128 )
130 parent: Mapped[Optional["Section"]] = relationship(
131 "Section",
132 remote_side=[id],
133 back_populates="subsections",
134 cascade_backrefs=False,
135 )
137 questions_query: DynamicMapped["QuestionInstance"] = relationship(
138 "QuestionInstance",
139 lazy="dynamic",
140 order_by="QuestionInstance.position",
141 viewonly=True,
142 )
144 perms: DynamicMapped["SectionPermission"] = relationship(
145 "SectionPermission", lazy="dynamic"
146 )
148 descendants: DynamicMapped["Section"] = relationship(
149 "Section",
150 lazy="dynamic",
151 viewonly=True,
152 order_by="Section.b36_number",
153 primaryjoin="and_("
154 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), "
155 "remote(foreign(Section.project_id)) == Section.project_id, "
156 "remote(foreign(Section.id)) != Section.id)",
157 )
159 immediate_children: DynamicMapped["Section"] = relationship(
160 "Section",
161 lazy="dynamic",
162 viewonly=True,
163 order_by="Section.position",
164 primaryjoin="and_("
165 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), "
166 "remote(foreign(Section.project_id)) == Section.project_id, "
167 "remote(foreign(Section.id)) != Section.id, "
168 "func.length(remote(foreign(Section.b36_number))) == func.length(Section.b36_number) + 2"
169 ")",
170 )
172 ancestors: Mapped[list["Section"]] = relationship(
173 "Section",
174 viewonly=True,
175 order_by="Section.b36_number",
176 primaryjoin="and_("
177 "remote(foreign(Section.b36_number)) == func.LEFT(Section.b36_number, func.LENGTH(remote(foreign(Section.b36_number)))), "
178 "remote(foreign(Section.project_id)) == Section.project_id, "
179 "remote(foreign(Section.id)) != Section.id)",
180 )
182 def __repr__(self) -> str:
183 return f"<Sec id:{self.id} Num:{self.number} Parent: {self.parent_id} - {self.title}>"
185 def as_dict(self) -> dict[str, Any]:
186 return {
187 "id": self.id,
188 "type": "section",
189 "title": self.title,
190 "description": self.description,
191 "number": self.number,
192 "number36": self.b36_number,
193 "parent_id": self.parent_id,
194 "subsections": [],
195 "questions": [],
196 }
198 def renumber(self) -> None:
199 from postrfp.model.questionnaire.renumber import renumber_tree
201 session = object_session(self) # type: ignore
202 session.flush() # type: ignore
203 renumber_tree(session, self) # type: ignore
205 def accept(self, visitor: Visitor) -> None:
206 """
207 Call hello_section, visit_question(s), goodbye_section
208 on the passed visitor and recurse to questions and subsections
209 """
210 visitor.hello_section(self)
212 # check if method is implemented in visitor's own class, not in parent
213 # i.e. only load questions if the visitor has its own implementation
214 # of this method
215 if "visit_question" in visitor.__class__.__dict__:
216 for q in self.questions:
217 visitor.visit_question(q)
219 for subsec in self.subsections:
220 subsec.accept(visitor)
222 visitor.goodbye_section(self)
224 @property
225 def is_top_level(self) -> bool:
226 return self.parent_id is None
228 @property
229 def absolute_weight(self) -> Decimal:
230 if self.is_top_level:
231 return Decimal(1)
232 try:
233 return self._absolute_weight # type: ignore
234 except AttributeError:
235 try:
236 nm = self.normalised_weight # type: ignore
237 except AttributeError:
238 m = "normalised weights not loaded - run HierarchicalWeightingVisitor"
239 raise WeightingsNotLoadedException(m)
240 self._absolute_weight = nm * self.parent.absolute_weight # type: ignore
241 return self._absolute_weight
243 @absolute_weight.setter
244 def absolute_weight(self, abs_weight: Decimal) -> None:
245 self._absolute_weight = abs_weight
247 def set_question_positions(self) -> None:
248 """
249 Set the 'position' attribute for questions in this section indexing from one.
250 Note: This is typically not needed when using OrderingList which maintains positions automatically.
251 """
252 for pos, question in enumerate(self.questions, 1):
253 question.position = pos
255 def set_subsection_positions(self) -> None:
256 """
257 Set the 'position' attribute for subsections of this section indexing from one.
258 Note: This is typically not needed when using OrderingList which maintains positions automatically.
259 """
260 for pos, sec in enumerate(self.subsections, 1):
261 sec.position = pos
263 @validates("questions", "subsections", include_backrefs=False)
264 def assign_project_id(
265 self, attr_name: str, instance: Union["Section", "QuestionInstance"]
266 ) -> Union["Section", "QuestionInstance"]:
267 if instance.project_id is None:
268 instance.project_id = self.project_id
270 return instance
272 def all_descendants_positioned(self) -> bool:
273 return self.descendants.filter("Section".position > 0).count() == 0 # type: ignore
275 def get_immediate_children_subquery(self, session: Session) -> "Subquery":
276 """Get immediate children as a subquery for complex queries."""
277 return self.immediate_children.subquery()
279 def get_all_descendant_questions(
280 self, session: Session
281 ) -> Query["QuestionInstance"]:
282 """Get all questions in this section and all descendant sections."""
283 return (
284 session.query(QuestionInstance)
285 .join(Section, QuestionInstance.section_id == Section.id)
286 .filter(
287 Section.b36_number.startswith(self.b36_number),
288 Section.project_id == self.project_id,
289 )
290 )
293class QuestionDefinition(Base):
294 __tablename__ = "questions"
296 title: Mapped[str] = mapped_column(TEXT(), nullable=False)
298 parent_id: Mapped[Optional[int]] = mapped_column(Integer, index=True)
299 refcount: Mapped[int] = mapped_column(
300 "referenceCount", Integer, nullable=False, server_default=text("'1'")
301 )
302 elements: Mapped[list["QElement"]] = relationship(
303 "QElement",
304 lazy="joined",
305 order_by="QElement.row,QElement.col",
306 cascade="all, delete-orphan",
307 passive_deletes=True,
308 back_populates="question_def",
309 )
311 _elements: DynamicMapped["QElement"] = relationship(
312 "QElement", lazy="dynamic", viewonly=True
313 )
315 instances: DynamicMapped["QuestionInstance"] = relationship(
316 "QuestionInstance", lazy="dynamic", back_populates="question_def"
317 )
319 meta: Mapped[Optional["QuestionMeta"]] = relationship(
320 "QuestionMeta",
321 uselist=False,
322 back_populates="question_def",
323 cascade="all, delete",
324 passive_deletes=True,
325 )
327 def __repr__(self) -> str:
328 return f'<QDef Id: {self.id}, Title: "{self.title}">'
330 def __eq__(self, o: object) -> bool:
331 if isinstance(o, QuestionDefinition):
332 src_elements = sorted(
333 self.elements, key=lambda e: [getattr(e, k) for k in el_keys]
334 )
335 des_elements = sorted(
336 o.elements, key=lambda e: [getattr(e, k) for k in el_keys]
337 )
338 return self.title == o.title and src_elements == des_elements
339 return False
341 def __hash__(self) -> int:
342 return super().__hash__()
344 @property
345 def is_shared(self) -> bool:
346 if self.refcount is None:
347 return False
348 return self.refcount > 1
350 def get_element(self, element_id: int) -> QElement:
351 return self._elements.filter(QElement.id == element_id).one()
353 def as_dict(
354 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False
355 ) -> list[Any]:
356 """
357 Returns a list of question element lists, each inner list corresponding
358 to a row in a question table
360 Groups elements into rows - returning a list of lists
362 """
363 row_list: list[list[dict[str, Any]]] = []
364 current_row = -1
365 for el in self.elements:
366 if el.row > current_row:
367 row_list.append([])
368 current_row = el.row
370 if el.contains_choices:
371 el_dict = el.as_dict(vendor_view=vendor_view)
372 else:
373 el_dict = el.as_dict()
375 if el.is_answerable and answer_lookup is not None:
376 el_dict["answer"] = answer_lookup.get(el.id, None)
378 row_list[-1].append(el_dict)
380 return row_list
382 @property
383 def answerable_elements(self) -> list[QElement]:
384 return [e for e in self.elements if e.is_answerable]
386 def add_element(self, el_name: str, **kwargs: Any) -> QElement:
387 el = QElement.build(el_name, **kwargs)
388 self.elements.append(el)
389 return el
391 def is_tabular(self) -> bool:
392 return self.column_count != 1
394 @property
395 def column_count(self) -> int:
396 if not self.elements:
397 return 0
398 return max(el.col for el in self.elements)
400 @property
401 def row_count(self) -> int:
402 if not self.elements:
403 return 0
404 return max(el.row for el in self.elements)
406 @property
407 def grid_area(self) -> int:
408 """Number of available grid cells available according to row and col values of Elements"""
409 return self.row_count * self.column_count
411 @property
412 def occupied_area(self) -> int:
413 """
414 The number of grid cells occupied by cells according to their rowspan and colspan values
415 If rowspans or colspans are set correctly this value will be equal to self.grid_area
416 """
417 return sum(el.cell_area for el in self.elements)
419 @property
420 def is_table_valid(self) -> bool:
421 return self.grid_area == self.occupied_area
423 @classmethod
424 def build(
425 cls, qdict: Union[dict[str, Any], "_ModelDumpable"], strip_ids: bool = False
426 ) -> "QuestionDefinition":
427 """
428 Create a QuestionDefinition, complete with QElements, from a
429 a dict datastructure as provided by serial.QuestionDef
430 """
431 if not isinstance(qdict, dict):
432 qdict = qdict.model_dump()
434 qdef = cls(title=qdict["title"])
435 for row_idx, row in enumerate(qdict["elements"], start=1):
436 for col_idx, el in enumerate(row, start=1):
437 el["row"] = row_idx
438 el["col"] = col_idx
439 if strip_ids:
440 el["id"] = None
441 qdef.add_element(el["el_type"], **el)
442 return qdef
445qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id"
448class SaveAnswersResult:
449 def __init__(
450 self,
451 is_new: bool,
452 change_list: list[tuple[str | None, str]],
453 unanswered_mandatory: bool,
454 ) -> None:
455 self.is_new = is_new
456 self.change_list = change_list
457 self.unanswered_mandatory = unanswered_mandatory
460class QuestionInstance(Base, NodeMixin):
461 __tablename__ = "question_instances"
463 __table_args__ = ( # type: ignore
464 ForeignKeyConstraint(
465 ["section_id", "project_id"],
466 ["sections.id", "sections.project_id"],
467 name="fk_qi_section_project",
468 ondelete="CASCADE",
469 ),
470 Index(
471 "unique_question_project",
472 "project_id",
473 "question_id",
474 unique=True,
475 ),
476 )
478 public_attrs = "id,title,position,section_id,number,url".split(",")
480 # section_id no longer needs the ForeignKey definition here, as it's part of the composite FK
481 section_id: Mapped[int] = mapped_column(
482 "section_id",
483 INTEGER,
484 # ForeignKey("sections.id", ondelete="CASCADE"), # Removed simple FK
485 nullable=False,
486 )
487 question_def_id: Mapped[int] = mapped_column(
488 "question_id",
489 INTEGER,
490 ForeignKey(QuestionDefinition.id), # Keep this FK to questions table
491 nullable=False,
492 )
493 # project_id still needs its own FK definition to the projects table
494 project_id: Mapped[int] = mapped_column(
495 "project_id",
496 INTEGER,
497 ForeignKey("projects.id", ondelete="CASCADE"), # Keep this FK to projects table
498 nullable=False,
499 )
501 question_def: Mapped["QuestionDefinition"] = relationship(
502 QuestionDefinition, lazy="joined", innerjoin=True, back_populates="instances"
503 )
504 project: Mapped["Project"] = relationship(
505 "Project", back_populates="questions", overlaps="questions"
506 )
507 section: Mapped["Section"] = relationship(
508 "Section", back_populates="questions", overlaps="project"
509 )
510 parent: Mapped["Section"] = relationship("Section", viewonly=True)
511 _answers: DynamicMapped["Answer"] = relationship(
512 "Answer",
513 lazy="dynamic",
514 cascade="all, delete",
515 passive_deletes=True,
516 back_populates="question_instance",
517 )
519 total_weightings: Mapped["TotalWeighting"] = relationship(
520 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade="all"
521 )
523 meta: Mapped[Optional["QuestionMeta"]] = relationship(
524 "QuestionMeta",
525 uselist=False,
526 back_populates="question_instance",
527 secondary="questions",
528 viewonly=True,
529 )
530 tags: Mapped[list["Tag"]] = relationship(
531 "Tag",
532 secondary="tags_qinstances",
533 back_populates="question_instances",
534 passive_deletes=True,
535 )
537 def __repr__(self) -> str:
538 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>"
540 def __eq__(self, o: object) -> bool:
541 if isinstance(o, QuestionInstance):
542 return self.question_def == o.question_def
543 return False
545 def __hash__(self) -> int:
546 return super().__hash__()
548 def answers_for_issue(self, issue_id: Any) -> Any:
549 return self._answers.filter(Answer.issue_id == issue_id)
551 def single_vendor_dict(self, issue: Any) -> dict[str, Any]:
552 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)}
553 return {
554 "number": self.number,
555 "id": self.id,
556 "title": self.title,
557 "respondent": issue.respondent.as_dict(),
558 "elements": self.question_def.as_dict(
559 vendor_view=True, answer_lookup=lookup
560 ),
561 }
563 @property
564 def title(self) -> str:
565 return self.question_def.title
567 @property
568 def elements(self) -> Any:
569 return self.question_def.elements
571 def as_dict(
572 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False
573 ) -> dict[str, Any]:
574 return {
575 "id": self.id,
576 "title": self.title,
577 "number36": self.b36_number,
578 "number": self.number,
579 "elements": self.question_def.as_dict(
580 answer_lookup=answer_lookup, vendor_view=vendor_view
581 ),
582 "type": "question",
583 }
585 @property
586 def is_autoscored(self) -> bool:
587 for element in self.elements:
588 if isinstance(element, MultipleChoice) and element.choices: # type: ignore
589 for choice in element.choices: # type: ignore
590 if choice.get("autoscore", None):
591 return True
592 return False
594 @property
595 def absolute_weight(self) -> Decimal:
596 if not hasattr(self, "_absolute_weight"):
597 try:
598 self._absolute_weight = (
599 self.normalised_weight * self.section.absolute_weight # type: ignore
600 )
601 except AttributeError:
602 m = "normalised weights not loaded - run HierarchicalWeightingVisitor"
603 raise WeightingsNotLoadedException(m)
604 return self._absolute_weight
606 @absolute_weight.setter
607 def absolute_weight(self, abs_weight: Decimal) -> None:
608 self._absolute_weight = abs_weight
610 def validate_and_save_answers(
611 self, answer_doc: dict[int, str], issue: Any
612 ) -> SaveAnswersResult:
613 changes = []
614 unanswered_mandatory = False
615 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)}
616 for element in self.question_def.answerable_elements:
617 el_id = element.id
618 if el_id in answer_doc:
619 new_answer = answer_doc.pop(el_id)
620 if new_answer is None:
621 continue
622 element.validate(new_answer)
623 if el_id in current_answers:
624 current_answer = current_answers[el_id]
625 old_answer = current_answer.answer
626 if old_answer != new_answer:
627 current_answer.answer = new_answer
628 changes.append((old_answer, new_answer))
629 else:
630 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id)
631 self._answers.append(a)
632 changes.append((None, new_answer))
633 else:
634 if element.mandatory and el_id not in current_answers:
635 unanswered_mandatory = True
637 if len(answer_doc) > 0:
638 tmpl = "Unable to match all answers with element IDs: {}"
639 ids = ", ".join(str(k) for k in answer_doc.keys())
640 raise ValidationFailure(tmpl.format(ids))
642 is_new = len(current_answers) == 0
644 return SaveAnswersResult(is_new, changes, unanswered_mandatory)