Coverage for postrfp/buyer/api/io/qbuilder.py: 100%

159 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from contextlib import contextmanager 

2from typing import Optional, Generator, TypedDict, Any, Literal 

3import logging 

4 

5from sqlalchemy import insert 

6from sqlalchemy.orm import Session 

7 

8from postrfp.model.questionnaire.qelements import ElementCode 

9from postrfp.model import Section, QuestionInstance, QuestionDefinition, QElement 

10 

11log = logging.getLogger(__name__) 

12 

13 

14class ChoiceSetData(TypedDict): 

15 label: str 

16 is_combo: bool 

17 colspan: int 

18 rowspan: int 

19 mandatory: bool 

20 choices: list[dict[str, Any]] 

21 

22 

23class NoParentQuestion(RuntimeError): 

24 pass 

25 

26 

27class QuestionnaireBuilder: 

28 """ 

29 Class for creating questionnaire tree structures. 

30 

31 It provides a convenient safe way to build a questionnaire which maps 

32 to iterative parsing of input documents such as xml. 

33 

34 The main benefit is that it maintains state so the importer functions 

35 using it doesn't have to. 

36 

37 Context managers are provided for next content, e.g. 

38 

39 with section_context(): 

40 # add some questions 

41 with question_context(): 

42 add some question elements. 

43 

44 """ 

45 

46 def __init__(self, session: Session, section: Section): 

47 self.session = session 

48 

49 assert section.id is not None 

50 assert section.project_id is not None 

51 

52 self.project_id = section.project_id 

53 self.ancestors: list[Section] = [section] 

54 self.current_qdef_id: Optional[int] = None 

55 self.question_position = 1 

56 # Position counter for subsections within a parent section 

57 self.section_positions: dict[int, int] = {} 

58 self.assumed_el_row = 1 

59 

60 self.pending_elements: list[dict] = [] 

61 # Optional attribute for choice set context 

62 self._current_choice_set: Optional[ChoiceSetData] = None 

63 

64 def __enter__(self) -> "QuestionnaireBuilder": 

65 return self 

66 

67 def __exit__( 

68 self, 

69 exc_type: type[BaseException] | None, 

70 _exc_val: BaseException | None, 

71 _exc_tb: Any, 

72 ) -> Literal[False]: 

73 if exc_type is None: 

74 self.finalise() 

75 

76 return False 

77 

78 @property 

79 def current_section(self) -> Section: 

80 return self.ancestors[-1] 

81 

82 @property 

83 def root_section(self) -> Section: 

84 return self.ancestors[0] 

85 

86 def finalise(self) -> None: 

87 """Renumber the section tree""" 

88 root = self.root_section 

89 # calling renumber is redundant if the @validator on Section.subsectins 

90 # and Section.questions is deriving numbers on the fly 

91 # but leaving it here for future - proofing 

92 root.renumber() 

93 self.flush_elements() 

94 

95 def start_section(self, title: str, description: Optional[str] = None) -> Section: 

96 """Create a new section under the current section""" 

97 

98 self.question_position = 0 

99 

100 parent_id = self.current_section.id 

101 new_position = self.section_positions.get(parent_id, 1) 

102 

103 sec = Section( 

104 title=title, 

105 description=description, 

106 parent_id=parent_id, 

107 project_id=self.project_id, 

108 position=new_position, 

109 b36_number="LL", # set to arbitrary value to stop auto-numbering in Section class 

110 ) 

111 self.current_section.subsections.append(sec) 

112 self.session.flush() 

113 assert sec.id is not None 

114 

115 # Increment the position counter for the *next* child of this parent 

116 self.section_positions[parent_id] = new_position + 1 

117 

118 # Add to ancestors list 

119 self.ancestors.append(sec) 

120 return sec 

121 

122 def end_section(self, description: Optional[str] = None) -> None: 

123 """End the current section context""" 

124 if description is not None: 

125 self.current_section.description = description 

126 self.ancestors.pop() 

127 self.flush_elements() 

128 

129 @contextmanager 

130 def section_context( 

131 self, title: str, description: Optional[str] = None 

132 ) -> Generator[Section, None, None]: 

133 """Context manager for creating a section""" 

134 section_id = self.start_section(title, description=description) 

135 yield section_id 

136 self.end_section() 

137 

138 def start_question(self, title: str) -> int: 

139 """Create a new question definition and instance""" 

140 assert title is not None 

141 

142 self.assumed_el_row = 1 

143 

144 qdef = QuestionDefinition(title=title) 

145 

146 qi = QuestionInstance( 

147 project_id=self.project_id, 

148 question_def=qdef, 

149 position=self.question_position, 

150 ) 

151 self.current_section.questions.append(qi) 

152 self.session.flush() 

153 assert qdef.id is not None 

154 self.current_qdef_id = qdef.id 

155 return qdef.id 

156 

157 def end_question(self) -> None: 

158 """End the current question context""" 

159 self.question_position += 1 

160 self.current_qdef_id = None 

161 self.assumed_el_row = 1 

162 

163 @contextmanager 

164 def question_context(self, title: str) -> Generator[int, None, None]: 

165 """Context manager for creating a question""" 

166 qdef_id = self.start_question(title) 

167 yield qdef_id 

168 self.end_question() 

169 

170 def flush_elements(self) -> None: 

171 """Execute batch insert for any pending elements""" 

172 if not self.pending_elements: 

173 return 

174 # Batch insert 

175 self.session.execute(insert(QElement), self.pending_elements) 

176 self.pending_elements.clear() 

177 

178 def add_element( 

179 self, 

180 el_code_enum: ElementCode, 

181 label: Optional[str] = None, 

182 row: Optional[int] = None, 

183 col: int = 1, 

184 rowspan: int = 1, 

185 colspan: int = 1, 

186 mandatory: bool = False, 

187 choices: Optional[list[dict]] = None, 

188 width: Optional[int] = None, 

189 height: Optional[int] = None, 

190 ) -> int: 

191 """ 

192 Add a new QuestionElement to the current question. 

193 

194 If row is omitted the the value is incremented. 

195 

196 Note: Elements are batched and not inserted until flush_elements() 

197 is called or the question context ends. 

198 """ 

199 if self.current_qdef_id is None: 

200 raise NoParentQuestion( 

201 "add_element can only be called with a new_question context" 

202 ) 

203 

204 if row is None: 

205 rowval = self.assumed_el_row 

206 self.assumed_el_row += 1 

207 else: 

208 rowval = row 

209 self.assumed_el_row = row + 1 

210 

211 # Prepare element values for batch insert 

212 element_values = { 

213 "question_id": self.current_qdef_id, 

214 "el_type": el_code_enum.value, # This is the polymorphic discriminator 

215 "label": label, 

216 "row": rowval, 

217 "col": col, 

218 "rowspan": rowspan, 

219 "colspan": colspan, 

220 "mandatory": mandatory, 

221 "choices": choices, 

222 "height": height, 

223 "width": width, 

224 } 

225 # Store for batch insert 

226 self.pending_elements.append(element_values) 

227 return len(self.pending_elements) 

228 

229 def _element( 

230 self, el_code_enum: ElementCode, label: str = "", **kwargs: Any 

231 ) -> int: 

232 """Base helper for adding elements with common defaults.""" 

233 kwargs.setdefault("colspan", 1) 

234 kwargs.setdefault("rowspan", 1) 

235 

236 return self.add_element(el_code_enum=el_code_enum, label=label, **kwargs) 

237 

238 def add_text_input(self, label: str = "", **kwargs: Any) -> int: 

239 kwargs.setdefault("width", 40) 

240 kwargs.setdefault("height", 1) 

241 

242 return self._element(ElementCode.text_input, label, **kwargs) 

243 

244 def add_checkbox(self, label: str = "", **kwargs: Any) -> int: 

245 return self._element(ElementCode.checkbox, label, **kwargs) 

246 

247 def add_label(self, label: str = "", **kwargs: Any) -> int: 

248 return self._element(ElementCode.label, label, **kwargs) 

249 

250 def add_attachment(self, label: str = "", **kwargs: Any) -> int: 

251 return self._element(ElementCode.answerable_attachment, label, **kwargs) 

252 

253 def add_question_attachment( 

254 self, label: str = "", filename: Optional[str] = None, **kwargs: Any 

255 ) -> int: 

256 """Convert question attachments to simple labels for backward compatibility""" 

257 # Create a descriptive label that includes the filename if provided 

258 if filename: 

259 display_label = ( 

260 f"{label}: {filename}" if label else f"Attachment: {filename}" 

261 ) 

262 else: 

263 display_label = label 

264 

265 # Just create a label element instead 

266 return self.add_label(label=display_label, **kwargs) 

267 

268 def add_choice_set( 

269 self, 

270 choices: list[dict], 

271 label: str = "", 

272 is_combo: bool = True, 

273 colspan: int = 1, 

274 rowspan: int = 1, 

275 mandatory: bool = False, 

276 ) -> int: 

277 """ 

278 Convenience method for adding a complete choice set element in one call. 

279 Expects choices to be in the final list[dict] format. 

280 """ 

281 el_code = ElementCode.combo_choices if is_combo else ElementCode.radio_choices 

282 return self.add_element( 

283 el_code_enum=el_code, 

284 label=label, 

285 colspan=colspan, 

286 rowspan=rowspan, 

287 mandatory=mandatory, 

288 choices=choices, # Expects list[dict] format 

289 ) 

290 

291 def start_choice_set( 

292 self, 

293 label: str = "", 

294 is_combo: bool = True, 

295 colspan: int = 1, 

296 rowspan: int = 1, 

297 mandatory: bool = False, 

298 ) -> "QuestionnaireBuilder": 

299 """Begin a choice set context to which choices can be added""" 

300 if self.current_qdef_id is None: 

301 raise NoParentQuestion( 

302 "start_choice_set can only be called within a question context" 

303 ) 

304 

305 # Initialize choices as an empty list 

306 self._current_choice_set = ChoiceSetData( 

307 label=label, 

308 is_combo=is_combo, 

309 colspan=colspan, 

310 rowspan=rowspan, 

311 mandatory=mandatory, 

312 choices=[], # Initialize as a list 

313 ) 

314 return self 

315 

316 def add_choice( 

317 self, value: str, label: Optional[str] = None 

318 ) -> "QuestionnaireBuilder": 

319 """ 

320 Add a choice to the current choice set. 

321 Appends a dictionary to the internal list. 

322 The 'value' from XML is used as the label if no explicit label is given. 

323 'autoscore' is added as None, as it's part of the expected structure 

324 in QElement but not provided by the basic XML import. 

325 """ 

326 if self.current_qdef_id is None: 

327 raise NoParentQuestion( 

328 "add_choice can only be called within a question context" 

329 ) 

330 

331 if self._current_choice_set is None: 

332 raise RuntimeError( 

333 "No active choice set context - call start_choice_set first" 

334 ) 

335 

336 choice_label = label or value # Use value as label if label is None 

337 # Append the choice dictionary in the desired format 

338 self._current_choice_set["choices"].append( 

339 {"label": choice_label, "autoscore": None} 

340 ) 

341 return self 

342 

343 def end_choice_set(self) -> "QuestionnaireBuilder": 

344 """Finalize and add the choice set element to the question""" 

345 if self.current_qdef_id is None: 

346 raise NoParentQuestion( 

347 "end_choice_set can only be called within a question context" 

348 ) 

349 

350 if self._current_choice_set is None: 

351 raise RuntimeError( 

352 "No active choice set context - call start_choice_set first" 

353 ) 

354 

355 # Retrieve the already correctly formatted list of choices 

356 choices_list = self._current_choice_set["choices"] 

357 is_combo = self._current_choice_set["is_combo"] 

358 el_code = ElementCode.combo_choices if is_combo else ElementCode.radio_choices 

359 

360 # Call add_element directly with the collected data 

361 self.add_element( 

362 el_code_enum=el_code, 

363 label=self._current_choice_set["label"], 

364 colspan=self._current_choice_set["colspan"], 

365 rowspan=self._current_choice_set["rowspan"], 

366 mandatory=self._current_choice_set["mandatory"], 

367 choices=choices_list, # Pass the correctly formatted list 

368 ) 

369 

370 # Clean up the temporary state 

371 self._current_choice_set = None 

372 return self 

373 

374 @contextmanager 

375 def choice_set_context( 

376 self, 

377 label: str = "", 

378 is_combo: bool = True, 

379 colspan: int = 1, 

380 rowspan: int = 1, 

381 mandatory: bool = False, 

382 ) -> Generator["QuestionnaireBuilder", None, None]: 

383 """Context manager for creating a choice set""" 

384 self.start_choice_set(label, is_combo, colspan, rowspan, mandatory) 

385 yield self # Yield self to allow adding choices within the 'with' block 

386 self.end_choice_set() 

387 

388 

389# Helper functions for importers 

390 

391 

392def parse_bool_attr(value: Optional[str], default: bool = False) -> bool: 

393 """Helper to convert string boolean attributes to Python booleans""" 

394 if value is None: 

395 return default 

396 try: 

397 return value.lower() == "true" 

398 except AttributeError: 

399 return bool(value)