Coverage for postrfp/model/questionnaire/nodes.py: 95%

265 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import logging 

2from decimal import Decimal 

3from enum import IntEnum 

4from typing import Optional, TYPE_CHECKING, Any, Union, Protocol 

5 

6from sqlalchemy import ( 

7 Integer, 

8 ForeignKey, 

9 Index, 

10 text, 

11 UniqueConstraint, 

12 ForeignKeyConstraint, 

13) 

14from sqlalchemy.orm import ( 

15 Mapped, 

16 DynamicMapped, 

17 mapped_column, 

18 relationship, 

19 validates, 

20 object_session, 

21 Session, 

22 Query, 

23) 

24from sqlalchemy.ext.orderinglist import ordering_list 

25from sqlalchemy.types import VARCHAR, INTEGER, TEXT 

26 

27from postrfp.model.meta import Base, Visitor 

28from postrfp.model.questionnaire.answering import Answer 

29from postrfp.model.questionnaire.b36 import ( 

30 to_b36, 

31 from_b36, 

32) 

33from postrfp.model.questionnaire.weightings import TotalWeighting 

34 

35from postrfp.model.questionnaire.qelements import MultipleChoice, QElement, el_keys 

36 

37from ..exc import ( 

38 ValidationFailure, 

39 WeightingsNotLoadedException, 

40) 

41 

42if TYPE_CHECKING: 

43 from postrfp.model.composite import QuestionMeta 

44 from postrfp.model.project import Project 

45 from postrfp.model.acl import SectionPermission 

46 from postrfp.model.tags import Tag 

47 from sqlalchemy import Subquery 

48 

49""" 

50Note - default SQL JOIN / load strategy is defined by arguments 

51to 'relationship' 

52 

53Lots of joins involved when loading questionnaires. Current strategy is: 

54 

55Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default, 

56 returns a query 

57 

58QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join 

59QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join 

60 

61For loading large collections (e.g. full questionnaire), these loading 

62strategies are overridden, e.g. loading question elements in a subquery load 

63""" 

64 

65log = logging.getLogger(__name__) 

66 

67 

68class _ModelDumpable(Protocol): 

69 """Protocol for objects that have a model_dump method""" 

70 

71 def model_dump(self) -> dict[str, Any]: ... 

72 

73 

74class ImportType(IntEnum): 

75 SHARE = 0 

76 COPY = 10 

77 

78 

79class NodeMixin: 

80 position: Mapped[int] = mapped_column(Integer, default=1, nullable=False) 

81 b36_number: Mapped[str] = mapped_column(VARCHAR(length=30), nullable=True) 

82 

83 @property 

84 def number(self) -> str | None: 

85 return from_b36(self.b36_number) 

86 

87 @number.setter 

88 def number(self, value: str) -> None: 

89 self.b36_number = to_b36(value) 

90 

91 

92class Section(Base, NodeMixin): 

93 __tablename__ = "sections" 

94 __table_args__ = (UniqueConstraint("id", "project_id", name="uk_section_project"),) 

95 

96 id: Mapped[int] = mapped_column(INTEGER, primary_key=True) 

97 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False) 

98 

99 description: Mapped[Optional[str]] = mapped_column(TEXT()) 

100 

101 project_id: Mapped[int] = mapped_column( 

102 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False 

103 ) 

104 parent_id: Mapped[Optional[int]] = mapped_column( 

105 INTEGER, ForeignKey("sections.id", ondelete="CASCADE") 

106 ) 

107 

108 subsections: Mapped[list["Section"]] = relationship( 

109 "Section", 

110 order_by="Section.position", 

111 collection_class=ordering_list("position", count_from=1), 

112 ) 

113 

114 project: Mapped["Project"] = relationship( 

115 "Project", 

116 primaryjoin="Section.project_id==Project.id", 

117 back_populates="sections", 

118 ) 

119 

120 questions: Mapped[list["QuestionInstance"]] = relationship( 

121 "QuestionInstance", 

122 order_by="QuestionInstance.position", 

123 collection_class=ordering_list("position", count_from=1), 

124 back_populates="section", 

125 cascade="all, delete-orphan", 

126 passive_deletes=True, 

127 overlaps="section", 

128 ) 

129 

130 parent: Mapped[Optional["Section"]] = relationship( 

131 "Section", 

132 remote_side=[id], 

133 back_populates="subsections", 

134 cascade_backrefs=False, 

135 ) 

136 

137 questions_query: DynamicMapped["QuestionInstance"] = relationship( 

138 "QuestionInstance", 

139 lazy="dynamic", 

140 order_by="QuestionInstance.position", 

141 viewonly=True, 

142 ) 

143 

144 perms: DynamicMapped["SectionPermission"] = relationship( 

145 "SectionPermission", lazy="dynamic" 

146 ) 

147 

148 descendants: DynamicMapped["Section"] = relationship( 

149 "Section", 

150 lazy="dynamic", 

151 viewonly=True, 

152 order_by="Section.b36_number", 

153 primaryjoin="and_(" 

154 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), " 

155 "remote(foreign(Section.project_id)) == Section.project_id, " 

156 "remote(foreign(Section.id)) != Section.id)", 

157 ) 

158 

159 immediate_children: DynamicMapped["Section"] = relationship( 

160 "Section", 

161 lazy="dynamic", 

162 viewonly=True, 

163 order_by="Section.position", 

164 primaryjoin="and_(" 

165 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), " 

166 "remote(foreign(Section.project_id)) == Section.project_id, " 

167 "remote(foreign(Section.id)) != Section.id, " 

168 "func.length(remote(foreign(Section.b36_number))) == func.length(Section.b36_number) + 2" 

169 ")", 

170 ) 

171 

172 ancestors: Mapped[list["Section"]] = relationship( 

173 "Section", 

174 viewonly=True, 

175 order_by="Section.b36_number", 

176 primaryjoin="and_(" 

177 "remote(foreign(Section.b36_number)) == func.LEFT(Section.b36_number, func.LENGTH(remote(foreign(Section.b36_number)))), " 

178 "remote(foreign(Section.project_id)) == Section.project_id, " 

179 "remote(foreign(Section.id)) != Section.id)", 

180 ) 

181 

182 def __repr__(self) -> str: 

183 return f"<Sec id:{self.id} Num:{self.number} Parent: {self.parent_id} - {self.title}>" 

184 

185 def as_dict(self) -> dict[str, Any]: 

186 return { 

187 "id": self.id, 

188 "type": "section", 

189 "title": self.title, 

190 "description": self.description, 

191 "number": self.number, 

192 "number36": self.b36_number, 

193 "parent_id": self.parent_id, 

194 "subsections": [], 

195 "questions": [], 

196 } 

197 

198 def renumber(self) -> None: 

199 from postrfp.model.questionnaire.renumber import renumber_tree 

200 

201 session = object_session(self) # type: ignore 

202 session.flush() # type: ignore 

203 renumber_tree(session, self) # type: ignore 

204 

205 def accept(self, visitor: Visitor) -> None: 

206 """ 

207 Call hello_section, visit_question(s), goodbye_section 

208 on the passed visitor and recurse to questions and subsections 

209 """ 

210 visitor.hello_section(self) 

211 

212 # check if method is implemented in visitor's own class, not in parent 

213 # i.e. only load questions if the visitor has its own implementation 

214 # of this method 

215 if "visit_question" in visitor.__class__.__dict__: 

216 for q in self.questions: 

217 visitor.visit_question(q) 

218 

219 for subsec in self.subsections: 

220 subsec.accept(visitor) 

221 

222 visitor.goodbye_section(self) 

223 

224 @property 

225 def is_top_level(self) -> bool: 

226 return self.parent_id is None 

227 

228 @property 

229 def absolute_weight(self) -> Decimal: 

230 if self.is_top_level: 

231 return Decimal(1) 

232 try: 

233 return self._absolute_weight # type: ignore 

234 except AttributeError: 

235 try: 

236 nm = self.normalised_weight # type: ignore 

237 except AttributeError: 

238 m = "normalised weights not loaded - run HierarchicalWeightingVisitor" 

239 raise WeightingsNotLoadedException(m) 

240 self._absolute_weight = nm * self.parent.absolute_weight # type: ignore 

241 return self._absolute_weight 

242 

243 @absolute_weight.setter 

244 def absolute_weight(self, abs_weight: Decimal) -> None: 

245 self._absolute_weight = abs_weight 

246 

247 def set_question_positions(self) -> None: 

248 """ 

249 Set the 'position' attribute for questions in this section indexing from one. 

250 Note: This is typically not needed when using OrderingList which maintains positions automatically. 

251 """ 

252 for pos, question in enumerate(self.questions, 1): 

253 question.position = pos 

254 

255 def set_subsection_positions(self) -> None: 

256 """ 

257 Set the 'position' attribute for subsections of this section indexing from one. 

258 Note: This is typically not needed when using OrderingList which maintains positions automatically. 

259 """ 

260 for pos, sec in enumerate(self.subsections, 1): 

261 sec.position = pos 

262 

263 @validates("questions", "subsections", include_backrefs=False) 

264 def assign_project_id( 

265 self, attr_name: str, instance: Union["Section", "QuestionInstance"] 

266 ) -> Union["Section", "QuestionInstance"]: 

267 if instance.project_id is None: 

268 instance.project_id = self.project_id 

269 

270 return instance 

271 

272 def all_descendants_positioned(self) -> bool: 

273 return self.descendants.filter("Section".position > 0).count() == 0 # type: ignore 

274 

275 def get_immediate_children_subquery(self, session: Session) -> "Subquery": 

276 """Get immediate children as a subquery for complex queries.""" 

277 return self.immediate_children.subquery() 

278 

279 def get_all_descendant_questions( 

280 self, session: Session 

281 ) -> Query["QuestionInstance"]: 

282 """Get all questions in this section and all descendant sections.""" 

283 return ( 

284 session.query(QuestionInstance) 

285 .join(Section, QuestionInstance.section_id == Section.id) 

286 .filter( 

287 Section.b36_number.startswith(self.b36_number), 

288 Section.project_id == self.project_id, 

289 ) 

290 ) 

291 

292 

293class QuestionDefinition(Base): 

294 __tablename__ = "questions" 

295 

296 title: Mapped[str] = mapped_column(TEXT(), nullable=False) 

297 

298 parent_id: Mapped[Optional[int]] = mapped_column(Integer, index=True) 

299 refcount: Mapped[int] = mapped_column( 

300 "referenceCount", Integer, nullable=False, server_default=text("'1'") 

301 ) 

302 elements: Mapped[list["QElement"]] = relationship( 

303 "QElement", 

304 lazy="joined", 

305 order_by="QElement.row,QElement.col", 

306 cascade="all, delete-orphan", 

307 passive_deletes=True, 

308 back_populates="question_def", 

309 ) 

310 

311 _elements: DynamicMapped["QElement"] = relationship( 

312 "QElement", lazy="dynamic", viewonly=True 

313 ) 

314 

315 instances: DynamicMapped["QuestionInstance"] = relationship( 

316 "QuestionInstance", lazy="dynamic", back_populates="question_def" 

317 ) 

318 

319 meta: Mapped[Optional["QuestionMeta"]] = relationship( 

320 "QuestionMeta", 

321 uselist=False, 

322 back_populates="question_def", 

323 cascade="all, delete", 

324 passive_deletes=True, 

325 ) 

326 

327 def __repr__(self) -> str: 

328 return f'<QDef Id: {self.id}, Title: "{self.title}">' 

329 

330 def __eq__(self, o: object) -> bool: 

331 if isinstance(o, QuestionDefinition): 

332 src_elements = sorted( 

333 self.elements, key=lambda e: [getattr(e, k) for k in el_keys] 

334 ) 

335 des_elements = sorted( 

336 o.elements, key=lambda e: [getattr(e, k) for k in el_keys] 

337 ) 

338 return self.title == o.title and src_elements == des_elements 

339 return False 

340 

341 def __hash__(self) -> int: 

342 return super().__hash__() 

343 

344 @property 

345 def is_shared(self) -> bool: 

346 if self.refcount is None: 

347 return False 

348 return self.refcount > 1 

349 

350 def get_element(self, element_id: int) -> QElement: 

351 return self._elements.filter(QElement.id == element_id).one() 

352 

353 def as_dict( 

354 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False 

355 ) -> list[Any]: 

356 """ 

357 Returns a list of question element lists, each inner list corresponding 

358 to a row in a question table 

359 

360 Groups elements into rows - returning a list of lists 

361 

362 """ 

363 row_list: list[list[dict[str, Any]]] = [] 

364 current_row = -1 

365 for el in self.elements: 

366 if el.row > current_row: 

367 row_list.append([]) 

368 current_row = el.row 

369 

370 if el.contains_choices: 

371 el_dict = el.as_dict(vendor_view=vendor_view) 

372 else: 

373 el_dict = el.as_dict() 

374 

375 if el.is_answerable and answer_lookup is not None: 

376 el_dict["answer"] = answer_lookup.get(el.id, None) 

377 

378 row_list[-1].append(el_dict) 

379 

380 return row_list 

381 

382 @property 

383 def answerable_elements(self) -> list[QElement]: 

384 return [e for e in self.elements if e.is_answerable] 

385 

386 def add_element(self, el_name: str, **kwargs: Any) -> QElement: 

387 el = QElement.build(el_name, **kwargs) 

388 self.elements.append(el) 

389 return el 

390 

391 def is_tabular(self) -> bool: 

392 return self.column_count != 1 

393 

394 @property 

395 def column_count(self) -> int: 

396 if not self.elements: 

397 return 0 

398 return max(el.col for el in self.elements) 

399 

400 @property 

401 def row_count(self) -> int: 

402 if not self.elements: 

403 return 0 

404 return max(el.row for el in self.elements) 

405 

406 @property 

407 def grid_area(self) -> int: 

408 """Number of available grid cells available according to row and col values of Elements""" 

409 return self.row_count * self.column_count 

410 

411 @property 

412 def occupied_area(self) -> int: 

413 """ 

414 The number of grid cells occupied by cells according to their rowspan and colspan values 

415 If rowspans or colspans are set correctly this value will be equal to self.grid_area 

416 """ 

417 return sum(el.cell_area for el in self.elements) 

418 

419 @property 

420 def is_table_valid(self) -> bool: 

421 return self.grid_area == self.occupied_area 

422 

423 @classmethod 

424 def build( 

425 cls, qdict: Union[dict[str, Any], "_ModelDumpable"], strip_ids: bool = False 

426 ) -> "QuestionDefinition": 

427 """ 

428 Create a QuestionDefinition, complete with QElements, from a 

429 a dict datastructure as provided by serial.QuestionDef 

430 """ 

431 if not isinstance(qdict, dict): 

432 qdict = qdict.model_dump() 

433 

434 qdef = cls(title=qdict["title"]) 

435 for row_idx, row in enumerate(qdict["elements"], start=1): 

436 for col_idx, el in enumerate(row, start=1): 

437 el["row"] = row_idx 

438 el["col"] = col_idx 

439 if strip_ids: 

440 el["id"] = None 

441 qdef.add_element(el["el_type"], **el) 

442 return qdef 

443 

444 

445qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id" 

446 

447 

448class SaveAnswersResult: 

449 def __init__( 

450 self, 

451 is_new: bool, 

452 change_list: list[tuple[str | None, str]], 

453 unanswered_mandatory: bool, 

454 ) -> None: 

455 self.is_new = is_new 

456 self.change_list = change_list 

457 self.unanswered_mandatory = unanswered_mandatory 

458 

459 

460class QuestionInstance(Base, NodeMixin): 

461 __tablename__ = "question_instances" 

462 

463 __table_args__ = ( # type: ignore 

464 ForeignKeyConstraint( 

465 ["section_id", "project_id"], 

466 ["sections.id", "sections.project_id"], 

467 name="fk_qi_section_project", 

468 ondelete="CASCADE", 

469 ), 

470 Index( 

471 "unique_question_project", 

472 "project_id", 

473 "question_id", 

474 unique=True, 

475 ), 

476 ) 

477 

478 public_attrs = "id,title,position,section_id,number,url".split(",") 

479 

480 # section_id no longer needs the ForeignKey definition here, as it's part of the composite FK 

481 section_id: Mapped[int] = mapped_column( 

482 "section_id", 

483 INTEGER, 

484 # ForeignKey("sections.id", ondelete="CASCADE"), # Removed simple FK 

485 nullable=False, 

486 ) 

487 question_def_id: Mapped[int] = mapped_column( 

488 "question_id", 

489 INTEGER, 

490 ForeignKey(QuestionDefinition.id), # Keep this FK to questions table 

491 nullable=False, 

492 ) 

493 # project_id still needs its own FK definition to the projects table 

494 project_id: Mapped[int] = mapped_column( 

495 "project_id", 

496 INTEGER, 

497 ForeignKey("projects.id", ondelete="CASCADE"), # Keep this FK to projects table 

498 nullable=False, 

499 ) 

500 

501 question_def: Mapped["QuestionDefinition"] = relationship( 

502 QuestionDefinition, lazy="joined", innerjoin=True, back_populates="instances" 

503 ) 

504 project: Mapped["Project"] = relationship( 

505 "Project", back_populates="questions", overlaps="questions" 

506 ) 

507 section: Mapped["Section"] = relationship( 

508 "Section", back_populates="questions", overlaps="project" 

509 ) 

510 parent: Mapped["Section"] = relationship("Section", viewonly=True) 

511 _answers: DynamicMapped["Answer"] = relationship( 

512 "Answer", 

513 lazy="dynamic", 

514 cascade="all, delete", 

515 passive_deletes=True, 

516 back_populates="question_instance", 

517 ) 

518 

519 total_weightings: Mapped["TotalWeighting"] = relationship( 

520 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade="all" 

521 ) 

522 

523 meta: Mapped[Optional["QuestionMeta"]] = relationship( 

524 "QuestionMeta", 

525 uselist=False, 

526 back_populates="question_instance", 

527 secondary="questions", 

528 viewonly=True, 

529 ) 

530 tags: Mapped[list["Tag"]] = relationship( 

531 "Tag", 

532 secondary="tags_qinstances", 

533 back_populates="question_instances", 

534 passive_deletes=True, 

535 ) 

536 

537 def __repr__(self) -> str: 

538 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>" 

539 

540 def __eq__(self, o: object) -> bool: 

541 if isinstance(o, QuestionInstance): 

542 return self.question_def == o.question_def 

543 return False 

544 

545 def __hash__(self) -> int: 

546 return super().__hash__() 

547 

548 def answers_for_issue(self, issue_id: Any) -> Any: 

549 return self._answers.filter(Answer.issue_id == issue_id) 

550 

551 def single_vendor_dict(self, issue: Any) -> dict[str, Any]: 

552 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)} 

553 return { 

554 "number": self.number, 

555 "id": self.id, 

556 "title": self.title, 

557 "respondent": issue.respondent.as_dict(), 

558 "elements": self.question_def.as_dict( 

559 vendor_view=True, answer_lookup=lookup 

560 ), 

561 } 

562 

563 @property 

564 def title(self) -> str: 

565 return self.question_def.title 

566 

567 @property 

568 def elements(self) -> Any: 

569 return self.question_def.elements 

570 

571 def as_dict( 

572 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False 

573 ) -> dict[str, Any]: 

574 return { 

575 "id": self.id, 

576 "title": self.title, 

577 "number36": self.b36_number, 

578 "number": self.number, 

579 "elements": self.question_def.as_dict( 

580 answer_lookup=answer_lookup, vendor_view=vendor_view 

581 ), 

582 "type": "question", 

583 } 

584 

585 @property 

586 def is_autoscored(self) -> bool: 

587 for element in self.elements: 

588 if isinstance(element, MultipleChoice) and element.choices: # type: ignore 

589 for choice in element.choices: # type: ignore 

590 if choice.get("autoscore", None): 

591 return True 

592 return False 

593 

594 @property 

595 def absolute_weight(self) -> Decimal: 

596 if not hasattr(self, "_absolute_weight"): 

597 try: 

598 self._absolute_weight = ( 

599 self.normalised_weight * self.section.absolute_weight # type: ignore 

600 ) 

601 except AttributeError: 

602 m = "normalised weights not loaded - run HierarchicalWeightingVisitor" 

603 raise WeightingsNotLoadedException(m) 

604 return self._absolute_weight 

605 

606 @absolute_weight.setter 

607 def absolute_weight(self, abs_weight: Decimal) -> None: 

608 self._absolute_weight = abs_weight 

609 

610 def validate_and_save_answers( 

611 self, answer_doc: dict[int, str], issue: Any 

612 ) -> SaveAnswersResult: 

613 changes = [] 

614 unanswered_mandatory = False 

615 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)} 

616 for element in self.question_def.answerable_elements: 

617 el_id = element.id 

618 if el_id in answer_doc: 

619 new_answer = answer_doc.pop(el_id) 

620 if new_answer is None: 

621 continue 

622 element.validate(new_answer) 

623 if el_id in current_answers: 

624 current_answer = current_answers[el_id] 

625 old_answer = current_answer.answer 

626 if old_answer != new_answer: 

627 current_answer.answer = new_answer 

628 changes.append((old_answer, new_answer)) 

629 else: 

630 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id) 

631 self._answers.append(a) 

632 changes.append((None, new_answer)) 

633 else: 

634 if element.mandatory and el_id not in current_answers: 

635 unanswered_mandatory = True 

636 

637 if len(answer_doc) > 0: 

638 tmpl = "Unable to match all answers with element IDs: {}" 

639 ids = ", ".join(str(k) for k in answer_doc.keys()) 

640 raise ValidationFailure(tmpl.format(ids)) 

641 

642 is_new = len(current_answers) == 0 

643 

644 return SaveAnswersResult(is_new, changes, unanswered_mandatory)