Coverage for postrfp / model / questionnaire / nodes.py: 97%

259 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1import logging 

2from decimal import Decimal 

3from enum import IntEnum 

4from typing import Optional, TYPE_CHECKING, Any, Union, Protocol 

5 

6from sqlalchemy import ( 

7 Integer, 

8 ForeignKey, 

9 Index, 

10 text, 

11 UniqueConstraint, 

12 ForeignKeyConstraint, 

13) 

14from sqlalchemy.orm import ( 

15 Mapped, 

16 DynamicMapped, 

17 mapped_column, 

18 relationship, 

19 validates, 

20 object_session, 

21) 

22from sqlalchemy.ext.orderinglist import ordering_list 

23from sqlalchemy.types import VARCHAR, INTEGER, TEXT 

24 

25from postrfp.model.meta import Base, Visitor 

26from postrfp.model.questionnaire.answering import Answer 

27from postrfp.model.questionnaire.b36 import ( 

28 to_b36, 

29 from_b36, 

30) 

31from postrfp.model.questionnaire.weightings import TotalWeighting 

32 

33from postrfp.model.questionnaire.qelements import MultipleChoice, QElement, el_keys 

34 

35from ..exc import ( 

36 ValidationFailure, 

37 WeightingsNotLoadedException, 

38) 

39 

40if TYPE_CHECKING: 

41 from postrfp.model.composite import QuestionMeta 

42 from postrfp.model.project import Project 

43 from postrfp.model.acl import SectionPermission 

44 from postrfp.model.tags import Tag 

45 

46""" 

47Note - default SQL JOIN / load strategy is defined by arguments 

48to 'relationship' 

49 

50Lots of joins involved when loading questionnaires. Current strategy is: 

51 

52Section -> questions (QuestionInstance): 'dynamic' - doesn't join by default, 

53 returns a query 

54 

55QuestionInstance -> QuestionDefinition: 'joined' - eager loading / inner join 

56QuestionDefinition -> [QElement] : 'joined' - eager loading by inner join 

57 

58For loading large collections (e.g. full questionnaire), these loading 

59strategies are overridden, e.g. loading question elements in a subquery load 

60""" 

61 

62log = logging.getLogger(__name__) 

63 

64 

65class _ModelDumpable(Protocol): 

66 """Protocol for objects that have a model_dump method""" 

67 

68 def model_dump(self) -> dict[str, Any]: ... 

69 

70 

71class ImportType(IntEnum): 

72 SHARE = 0 

73 COPY = 10 

74 

75 

76class NodeMixin: 

77 position: Mapped[int] = mapped_column(Integer, default=1, nullable=False) 

78 b36_number: Mapped[str] = mapped_column(VARCHAR(length=30), nullable=True) 

79 

80 @property 

81 def number(self) -> str | None: 

82 return from_b36(self.b36_number) 

83 

84 @number.setter 

85 def number(self, value: str) -> None: 

86 self.b36_number = to_b36(value) 

87 

88 

89class Section(Base, NodeMixin): 

90 __tablename__ = "sections" 

91 __table_args__ = ( 

92 UniqueConstraint("id", "project_id", name="uk_section_project"), 

93 Index("idx_sections_project_b36", "project_id", "b36_number"), 

94 ) 

95 

96 id: Mapped[int] = mapped_column(INTEGER, primary_key=True) 

97 title: Mapped[str] = mapped_column(VARCHAR(length=255), nullable=False) 

98 

99 description: Mapped[Optional[str]] = mapped_column(TEXT()) 

100 

101 project_id: Mapped[int] = mapped_column( 

102 INTEGER, ForeignKey("projects.id", ondelete="CASCADE"), nullable=False 

103 ) 

104 parent_id: Mapped[Optional[int]] = mapped_column( 

105 INTEGER, ForeignKey("sections.id", ondelete="CASCADE") 

106 ) 

107 

108 subsections: Mapped[list["Section"]] = relationship( 

109 "Section", 

110 order_by="Section.position", 

111 collection_class=ordering_list("position", count_from=1), 

112 ) 

113 

114 project: Mapped["Project"] = relationship( 

115 "Project", 

116 primaryjoin="Section.project_id==Project.id", 

117 back_populates="sections", 

118 ) 

119 

120 questions: Mapped[list["QuestionInstance"]] = relationship( 

121 "QuestionInstance", 

122 order_by="QuestionInstance.position", 

123 collection_class=ordering_list("position", count_from=1), 

124 back_populates="section", 

125 cascade="all, delete-orphan", 

126 passive_deletes=True, 

127 overlaps="section", 

128 ) 

129 

130 parent: Mapped[Optional["Section"]] = relationship( 

131 "Section", 

132 remote_side=[id], 

133 back_populates="subsections", 

134 cascade_backrefs=False, 

135 ) 

136 

137 questions_query: DynamicMapped["QuestionInstance"] = relationship( 

138 "QuestionInstance", 

139 lazy="dynamic", 

140 order_by="QuestionInstance.position", 

141 viewonly=True, 

142 ) 

143 

144 perms: DynamicMapped["SectionPermission"] = relationship( 

145 "SectionPermission", lazy="dynamic" 

146 ) 

147 

148 descendants: DynamicMapped["Section"] = relationship( 

149 "Section", 

150 lazy="dynamic", 

151 viewonly=True, 

152 order_by="Section.b36_number", 

153 primaryjoin="and_(" 

154 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), " 

155 "remote(foreign(Section.project_id)) == Section.project_id, " 

156 "remote(foreign(Section.id)) != Section.id)", 

157 ) 

158 

159 immediate_children: DynamicMapped["Section"] = relationship( 

160 "Section", 

161 lazy="dynamic", 

162 viewonly=True, 

163 order_by="Section.position", 

164 primaryjoin="and_(" 

165 "remote(foreign(Section.b36_number)).startswith(Section.b36_number), " 

166 "remote(foreign(Section.project_id)) == Section.project_id, " 

167 "remote(foreign(Section.id)) != Section.id, " 

168 "func.length(remote(foreign(Section.b36_number))) == func.length(Section.b36_number) + 2" 

169 ")", 

170 ) 

171 

172 ancestors: Mapped[list["Section"]] = relationship( 

173 "Section", 

174 viewonly=True, 

175 order_by="Section.b36_number", 

176 primaryjoin="and_(" 

177 "remote(foreign(Section.b36_number)) == func.LEFT(Section.b36_number, func.LENGTH(remote(foreign(Section.b36_number)))), " 

178 "remote(foreign(Section.project_id)) == Section.project_id, " 

179 "remote(foreign(Section.id)) != Section.id)", 

180 ) 

181 

182 def __repr__(self) -> str: 

183 return f"<Sec id:{self.id} Num:{self.number} Parent: {self.parent_id} - {self.title}>" 

184 

185 def as_dict(self) -> dict[str, Any]: 

186 return { 

187 "id": self.id, 

188 "type": "section", 

189 "title": self.title, 

190 "description": self.description, 

191 "number": self.number, 

192 "number36": self.b36_number, 

193 "parent_id": self.parent_id, 

194 "subsections": [], 

195 "questions": [], 

196 } 

197 

198 def renumber(self) -> None: 

199 from postrfp.model.questionnaire.renumber import renumber_tree 

200 

201 session = object_session(self) # type: ignore 

202 session.flush() # type: ignore 

203 renumber_tree(session, self) # type: ignore 

204 

205 def accept(self, visitor: Visitor) -> None: 

206 """ 

207 Call hello_section, visit_question(s), goodbye_section 

208 on the passed visitor and recurse to questions and subsections 

209 """ 

210 visitor.hello_section(self) 

211 

212 # check if method is implemented in visitor's own class, not in parent 

213 # i.e. only load questions if the visitor has its own implementation 

214 # of this method 

215 if "visit_question" in visitor.__class__.__dict__: 

216 for q in self.questions: 

217 visitor.visit_question(q) 

218 

219 for subsec in self.subsections: 

220 subsec.accept(visitor) 

221 

222 visitor.goodbye_section(self) 

223 

224 @property 

225 def is_top_level(self) -> bool: 

226 return self.parent_id is None 

227 

228 @property 

229 def absolute_weight(self) -> Decimal: 

230 if self.is_top_level: 

231 return Decimal(1) 

232 try: 

233 return self._absolute_weight # type: ignore 

234 except AttributeError: 

235 try: 

236 nm = self.normalised_weight # type: ignore 

237 except AttributeError: 

238 m = "normalised weights not loaded - run HierarchicalWeightingVisitor" 

239 raise WeightingsNotLoadedException(m) 

240 self._absolute_weight = nm * self.parent.absolute_weight # type: ignore 

241 return self._absolute_weight 

242 

243 @absolute_weight.setter 

244 def absolute_weight(self, abs_weight: Decimal) -> None: 

245 self._absolute_weight = abs_weight 

246 

247 def set_question_positions(self) -> None: 

248 """ 

249 Set the 'position' attribute for questions in this section indexing from one. 

250 Note: This is typically not needed when using OrderingList which maintains positions automatically. 

251 """ 

252 for pos, question in enumerate(self.questions, 1): 

253 question.position = pos 

254 

255 def set_subsection_positions(self) -> None: 

256 """ 

257 Set the 'position' attribute for subsections of this section indexing from one. 

258 Note: This is typically not needed when using OrderingList which maintains positions automatically. 

259 """ 

260 for pos, sec in enumerate(self.subsections, 1): 

261 sec.position = pos 

262 

263 @validates("questions", "subsections", include_backrefs=False) 

264 def assign_project_id( 

265 self, attr_name: str, instance: Union["Section", "QuestionInstance"] 

266 ) -> Union["Section", "QuestionInstance"]: 

267 if instance.project_id is None: 

268 instance.project_id = self.project_id 

269 

270 return instance 

271 

272 

273class QuestionDefinition(Base): 

274 __tablename__ = "questions" 

275 

276 title: Mapped[str] = mapped_column(TEXT(), nullable=False) 

277 

278 parent_id: Mapped[Optional[int]] = mapped_column(Integer, index=True) 

279 refcount: Mapped[int] = mapped_column( 

280 "referenceCount", Integer, nullable=False, server_default=text("'1'") 

281 ) 

282 elements: Mapped[list["QElement"]] = relationship( 

283 "QElement", 

284 lazy="joined", 

285 order_by="QElement.row,QElement.col", 

286 cascade="all, delete-orphan", 

287 passive_deletes=True, 

288 back_populates="question_def", 

289 ) 

290 

291 _elements: DynamicMapped["QElement"] = relationship( 

292 "QElement", lazy="dynamic", viewonly=True 

293 ) 

294 

295 instances: DynamicMapped["QuestionInstance"] = relationship( 

296 "QuestionInstance", lazy="dynamic", back_populates="question_def" 

297 ) 

298 

299 meta: Mapped[Optional["QuestionMeta"]] = relationship( 

300 "QuestionMeta", 

301 uselist=False, 

302 back_populates="question_def", 

303 cascade="all, delete", 

304 passive_deletes=True, 

305 ) 

306 

307 def __repr__(self) -> str: 

308 return f'<QDef Id: {self.id}, Title: "{self.title}">' 

309 

310 def __eq__(self, o: object) -> bool: 

311 if isinstance(o, QuestionDefinition): 

312 src_elements = sorted( 

313 self.elements, key=lambda e: [getattr(e, k) for k in el_keys] 

314 ) 

315 des_elements = sorted( 

316 o.elements, key=lambda e: [getattr(e, k) for k in el_keys] 

317 ) 

318 return self.title == o.title and src_elements == des_elements 

319 return False 

320 

321 def __hash__(self) -> int: 

322 return super().__hash__() 

323 

324 @property 

325 def is_shared(self) -> bool: 

326 if self.refcount is None: 

327 return False 

328 return self.refcount > 1 

329 

330 def get_element(self, element_id: int) -> QElement: 

331 return self._elements.filter(QElement.id == element_id).one() 

332 

333 def as_dict( 

334 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False 

335 ) -> list[Any]: 

336 """ 

337 Returns a list of question element lists, each inner list corresponding 

338 to a row in a question table 

339 

340 Groups elements into rows - returning a list of lists 

341 

342 """ 

343 row_list: list[list[dict[str, Any]]] = [] 

344 current_row = -1 

345 for el in self.elements: 

346 if el.row > current_row: 

347 row_list.append([]) 

348 current_row = el.row 

349 

350 if el.contains_choices: 

351 el_dict = el.as_dict(vendor_view=vendor_view) 

352 else: 

353 el_dict = el.as_dict() 

354 

355 if el.is_answerable and answer_lookup is not None: 

356 el_dict["answer"] = answer_lookup.get(el.id, None) 

357 

358 row_list[-1].append(el_dict) 

359 

360 return row_list 

361 

362 @property 

363 def answerable_elements(self) -> list[QElement]: 

364 return [e for e in self.elements if e.is_answerable] 

365 

366 def add_element(self, el_name: str, **kwargs: Any) -> QElement: 

367 el = QElement.build(el_name, **kwargs) 

368 self.elements.append(el) 

369 return el 

370 

371 def is_tabular(self) -> bool: 

372 return self.column_count != 1 

373 

374 @property 

375 def column_count(self) -> int: 

376 if not self.elements: 

377 return 0 

378 return max(el.col for el in self.elements) 

379 

380 @property 

381 def row_count(self) -> int: 

382 if not self.elements: 

383 return 0 

384 return max(el.row for el in self.elements) 

385 

386 @property 

387 def grid_area(self) -> int: 

388 """Number of available grid cells available according to row and col values of Elements""" 

389 return self.row_count * self.column_count 

390 

391 @property 

392 def occupied_area(self) -> int: 

393 """ 

394 The number of grid cells occupied by cells according to their rowspan and colspan values 

395 If rowspans or colspans are set correctly this value will be equal to self.grid_area 

396 """ 

397 return sum(el.cell_area for el in self.elements) 

398 

399 @property 

400 def is_table_valid(self) -> bool: 

401 return self.grid_area == self.occupied_area 

402 

403 @classmethod 

404 def build( 

405 cls, qdict: Union[dict[str, Any], "_ModelDumpable"], strip_ids: bool = False 

406 ) -> "QuestionDefinition": 

407 """ 

408 Create a QuestionDefinition, complete with QElements, from a 

409 a dict datastructure as provided by serial.QuestionDef 

410 """ 

411 if not isinstance(qdict, dict): 

412 qdict = qdict.model_dump() 

413 

414 qdef = cls(title=qdict["title"]) 

415 for row_idx, row in enumerate(qdict["elements"], start=1): 

416 for col_idx, el in enumerate(row, start=1): 

417 el["row"] = row_idx 

418 el["col"] = col_idx 

419 if strip_ids: 

420 el["id"] = None 

421 qdef.add_element(el["el_type"], **el) 

422 return qdef 

423 

424 

425qi_weight_join = "foreign(TotalWeighting.question_instance_id)==QuestionInstance.id" 

426 

427 

428class SaveAnswersResult: 

429 def __init__( 

430 self, 

431 is_new: bool, 

432 change_list: list[tuple[str | None, str]], 

433 unanswered_mandatory: bool, 

434 ) -> None: 

435 self.is_new = is_new 

436 self.change_list = change_list 

437 self.unanswered_mandatory = unanswered_mandatory 

438 

439 

440class QuestionInstance(Base, NodeMixin): 

441 __tablename__ = "question_instances" 

442 

443 __table_args__ = ( # type: ignore 

444 ForeignKeyConstraint( 

445 ["section_id", "project_id"], 

446 ["sections.id", "sections.project_id"], 

447 name="fk_qi_section_project", 

448 ondelete="CASCADE", 

449 ), 

450 Index( 

451 "unique_question_project", 

452 "project_id", 

453 "question_id", 

454 unique=True, 

455 ), 

456 ) 

457 

458 public_attrs = "id,title,position,section_id,number,url".split(",") 

459 

460 # section_id no longer needs the ForeignKey definition here, as it's part of the composite FK 

461 section_id: Mapped[int] = mapped_column( 

462 "section_id", 

463 INTEGER, 

464 # ForeignKey("sections.id", ondelete="CASCADE"), # Removed simple FK 

465 nullable=False, 

466 ) 

467 question_def_id: Mapped[int] = mapped_column( 

468 "question_id", 

469 INTEGER, 

470 ForeignKey(QuestionDefinition.id), # Keep this FK to questions table 

471 nullable=False, 

472 ) 

473 # project_id still needs its own FK definition to the projects table 

474 project_id: Mapped[int] = mapped_column( 

475 "project_id", 

476 INTEGER, 

477 ForeignKey("projects.id", ondelete="CASCADE"), # Keep this FK to projects table 

478 nullable=False, 

479 ) 

480 

481 question_def: Mapped["QuestionDefinition"] = relationship( 

482 QuestionDefinition, lazy="joined", innerjoin=True, back_populates="instances" 

483 ) 

484 project: Mapped["Project"] = relationship( 

485 "Project", back_populates="questions", overlaps="questions" 

486 ) 

487 section: Mapped["Section"] = relationship( 

488 "Section", back_populates="questions", overlaps="project" 

489 ) 

490 parent: Mapped["Section"] = relationship("Section", viewonly=True) 

491 _answers: DynamicMapped["Answer"] = relationship( 

492 "Answer", 

493 lazy="dynamic", 

494 cascade="all, delete", 

495 passive_deletes=True, 

496 back_populates="question_instance", 

497 ) 

498 

499 total_weightings: Mapped["TotalWeighting"] = relationship( 

500 "TotalWeighting", primaryjoin=qi_weight_join, backref="question", cascade="all" 

501 ) 

502 

503 meta: Mapped[Optional["QuestionMeta"]] = relationship( 

504 "QuestionMeta", 

505 uselist=False, 

506 back_populates="question_instance", 

507 secondary="questions", 

508 viewonly=True, 

509 ) 

510 tags: Mapped[list["Tag"]] = relationship( 

511 "Tag", 

512 secondary="tags_qinstances", 

513 back_populates="question_instances", 

514 passive_deletes=True, 

515 ) 

516 

517 def __repr__(self) -> str: 

518 return f"<Question - InstanceId: {self.id}, DefId: {self.question_def_id}>" 

519 

520 def __eq__(self, o: object) -> bool: 

521 if isinstance(o, QuestionInstance): 

522 return self.question_def == o.question_def 

523 return False 

524 

525 def __hash__(self) -> int: 

526 return super().__hash__() 

527 

528 def answers_for_issue(self, issue_id: Any) -> Any: 

529 return self._answers.filter(Answer.issue_id == issue_id) 

530 

531 def single_vendor_dict(self, issue: Any) -> dict[str, Any]: 

532 lookup = {a.element_id: a.answer for a in self.answers_for_issue(issue.id)} 

533 return { 

534 "number": self.number, 

535 "id": self.id, 

536 "title": self.title, 

537 "respondent": issue.respondent.as_dict(), 

538 "elements": self.question_def.as_dict( 

539 vendor_view=True, answer_lookup=lookup 

540 ), 

541 } 

542 

543 @property 

544 def title(self) -> str: 

545 return self.question_def.title 

546 

547 @property 

548 def elements(self) -> Any: 

549 return self.question_def.elements 

550 

551 def as_dict( 

552 self, answer_lookup: dict[int, str] | None = None, vendor_view: bool = False 

553 ) -> dict[str, Any]: 

554 return { 

555 "id": self.id, 

556 "title": self.title, 

557 "number36": self.b36_number, 

558 "number": self.number, 

559 "elements": self.question_def.as_dict( 

560 answer_lookup=answer_lookup, vendor_view=vendor_view 

561 ), 

562 "type": "question", 

563 } 

564 

565 @property 

566 def is_autoscored(self) -> bool: 

567 for element in self.elements: 

568 if isinstance(element, MultipleChoice) and element.choices: # type: ignore 

569 for choice in element.choices: # type: ignore 

570 if choice.get("autoscore", None): 

571 return True 

572 return False 

573 

574 @property 

575 def absolute_weight(self) -> Decimal: 

576 if not hasattr(self, "_absolute_weight"): 

577 try: 

578 self._absolute_weight = ( 

579 self.normalised_weight * self.section.absolute_weight # type: ignore 

580 ) 

581 except AttributeError: 

582 m = "normalised weights not loaded - run HierarchicalWeightingVisitor" 

583 raise WeightingsNotLoadedException(m) 

584 return self._absolute_weight 

585 

586 @absolute_weight.setter 

587 def absolute_weight(self, abs_weight: Decimal) -> None: 

588 self._absolute_weight = abs_weight 

589 

590 def validate_and_save_answers( 

591 self, answer_doc: dict[int, str], issue: Any 

592 ) -> SaveAnswersResult: 

593 changes = [] 

594 unanswered_mandatory = False 

595 current_answers = {a.element_id: a for a in self.answers_for_issue(issue.id)} 

596 for element in self.question_def.answerable_elements: 

597 el_id = element.id 

598 if el_id in answer_doc: 

599 new_answer = answer_doc.pop(el_id) 

600 if new_answer is None: 

601 continue 

602 element.validate(new_answer) 

603 if el_id in current_answers: 

604 current_answer = current_answers[el_id] 

605 old_answer = current_answer.answer 

606 if old_answer != new_answer: 

607 current_answer.answer = new_answer 

608 changes.append((old_answer, new_answer)) 

609 else: 

610 a = Answer(element_id=el_id, answer=new_answer, issue_id=issue.id) 

611 self._answers.append(a) 

612 changes.append((None, new_answer)) 

613 else: 

614 if element.mandatory and el_id not in current_answers: 

615 unanswered_mandatory = True 

616 

617 if len(answer_doc) > 0: 

618 tmpl = "Unable to match all answers with element IDs: {}" 

619 ids = ", ".join(str(k) for k in answer_doc.keys()) 

620 raise ValidationFailure(tmpl.format(ids)) 

621 

622 is_new = len(current_answers) == 0 

623 

624 return SaveAnswersResult(is_new, changes, unanswered_mandatory)