Coverage for postrfp/buyer/api/io/qbuilder.py: 100%
159 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from contextlib import contextmanager
2from typing import Optional, Generator, TypedDict, Any, Literal
3import logging
5from sqlalchemy import insert
6from sqlalchemy.orm import Session
8from postrfp.model.questionnaire.qelements import ElementCode
9from postrfp.model import Section, QuestionInstance, QuestionDefinition, QElement
11log = logging.getLogger(__name__)
14class ChoiceSetData(TypedDict):
15 label: str
16 is_combo: bool
17 colspan: int
18 rowspan: int
19 mandatory: bool
20 choices: list[dict[str, Any]]
23class NoParentQuestion(RuntimeError):
24 pass
27class QuestionnaireBuilder:
28 """
29 Class for creating questionnaire tree structures.
31 It provides a convenient safe way to build a questionnaire which maps
32 to iterative parsing of input documents such as xml.
34 The main benefit is that it maintains state so the importer functions
35 using it doesn't have to.
37 Context managers are provided for next content, e.g.
39 with section_context():
40 # add some questions
41 with question_context():
42 add some question elements.
44 """
46 def __init__(self, session: Session, section: Section):
47 self.session = session
49 assert section.id is not None
50 assert section.project_id is not None
52 self.project_id = section.project_id
53 self.ancestors: list[Section] = [section]
54 self.current_qdef_id: Optional[int] = None
55 self.question_position = 1
56 # Position counter for subsections within a parent section
57 self.section_positions: dict[int, int] = {}
58 self.assumed_el_row = 1
60 self.pending_elements: list[dict] = []
61 # Optional attribute for choice set context
62 self._current_choice_set: Optional[ChoiceSetData] = None
64 def __enter__(self) -> "QuestionnaireBuilder":
65 return self
67 def __exit__(
68 self,
69 exc_type: type[BaseException] | None,
70 _exc_val: BaseException | None,
71 _exc_tb: Any,
72 ) -> Literal[False]:
73 if exc_type is None:
74 self.finalise()
76 return False
78 @property
79 def current_section(self) -> Section:
80 return self.ancestors[-1]
82 @property
83 def root_section(self) -> Section:
84 return self.ancestors[0]
86 def finalise(self) -> None:
87 """Renumber the section tree"""
88 root = self.root_section
89 # calling renumber is redundant if the @validator on Section.subsectins
90 # and Section.questions is deriving numbers on the fly
91 # but leaving it here for future - proofing
92 root.renumber()
93 self.flush_elements()
95 def start_section(self, title: str, description: Optional[str] = None) -> Section:
96 """Create a new section under the current section"""
98 self.question_position = 0
100 parent_id = self.current_section.id
101 new_position = self.section_positions.get(parent_id, 1)
103 sec = Section(
104 title=title,
105 description=description,
106 parent_id=parent_id,
107 project_id=self.project_id,
108 position=new_position,
109 b36_number="LL", # set to arbitrary value to stop auto-numbering in Section class
110 )
111 self.current_section.subsections.append(sec)
112 self.session.flush()
113 assert sec.id is not None
115 # Increment the position counter for the *next* child of this parent
116 self.section_positions[parent_id] = new_position + 1
118 # Add to ancestors list
119 self.ancestors.append(sec)
120 return sec
122 def end_section(self, description: Optional[str] = None) -> None:
123 """End the current section context"""
124 if description is not None:
125 self.current_section.description = description
126 self.ancestors.pop()
127 self.flush_elements()
129 @contextmanager
130 def section_context(
131 self, title: str, description: Optional[str] = None
132 ) -> Generator[Section, None, None]:
133 """Context manager for creating a section"""
134 section_id = self.start_section(title, description=description)
135 yield section_id
136 self.end_section()
138 def start_question(self, title: str) -> int:
139 """Create a new question definition and instance"""
140 assert title is not None
142 self.assumed_el_row = 1
144 qdef = QuestionDefinition(title=title)
146 qi = QuestionInstance(
147 project_id=self.project_id,
148 question_def=qdef,
149 position=self.question_position,
150 )
151 self.current_section.questions.append(qi)
152 self.session.flush()
153 assert qdef.id is not None
154 self.current_qdef_id = qdef.id
155 return qdef.id
157 def end_question(self) -> None:
158 """End the current question context"""
159 self.question_position += 1
160 self.current_qdef_id = None
161 self.assumed_el_row = 1
163 @contextmanager
164 def question_context(self, title: str) -> Generator[int, None, None]:
165 """Context manager for creating a question"""
166 qdef_id = self.start_question(title)
167 yield qdef_id
168 self.end_question()
170 def flush_elements(self) -> None:
171 """Execute batch insert for any pending elements"""
172 if not self.pending_elements:
173 return
174 # Batch insert
175 self.session.execute(insert(QElement), self.pending_elements)
176 self.pending_elements.clear()
178 def add_element(
179 self,
180 el_code_enum: ElementCode,
181 label: Optional[str] = None,
182 row: Optional[int] = None,
183 col: int = 1,
184 rowspan: int = 1,
185 colspan: int = 1,
186 mandatory: bool = False,
187 choices: Optional[list[dict]] = None,
188 width: Optional[int] = None,
189 height: Optional[int] = None,
190 ) -> int:
191 """
192 Add a new QuestionElement to the current question.
194 If row is omitted the the value is incremented.
196 Note: Elements are batched and not inserted until flush_elements()
197 is called or the question context ends.
198 """
199 if self.current_qdef_id is None:
200 raise NoParentQuestion(
201 "add_element can only be called with a new_question context"
202 )
204 if row is None:
205 rowval = self.assumed_el_row
206 self.assumed_el_row += 1
207 else:
208 rowval = row
209 self.assumed_el_row = row + 1
211 # Prepare element values for batch insert
212 element_values = {
213 "question_id": self.current_qdef_id,
214 "el_type": el_code_enum.value, # This is the polymorphic discriminator
215 "label": label,
216 "row": rowval,
217 "col": col,
218 "rowspan": rowspan,
219 "colspan": colspan,
220 "mandatory": mandatory,
221 "choices": choices,
222 "height": height,
223 "width": width,
224 }
225 # Store for batch insert
226 self.pending_elements.append(element_values)
227 return len(self.pending_elements)
229 def _element(
230 self, el_code_enum: ElementCode, label: str = "", **kwargs: Any
231 ) -> int:
232 """Base helper for adding elements with common defaults."""
233 kwargs.setdefault("colspan", 1)
234 kwargs.setdefault("rowspan", 1)
236 return self.add_element(el_code_enum=el_code_enum, label=label, **kwargs)
238 def add_text_input(self, label: str = "", **kwargs: Any) -> int:
239 kwargs.setdefault("width", 40)
240 kwargs.setdefault("height", 1)
242 return self._element(ElementCode.text_input, label, **kwargs)
244 def add_checkbox(self, label: str = "", **kwargs: Any) -> int:
245 return self._element(ElementCode.checkbox, label, **kwargs)
247 def add_label(self, label: str = "", **kwargs: Any) -> int:
248 return self._element(ElementCode.label, label, **kwargs)
250 def add_attachment(self, label: str = "", **kwargs: Any) -> int:
251 return self._element(ElementCode.answerable_attachment, label, **kwargs)
253 def add_question_attachment(
254 self, label: str = "", filename: Optional[str] = None, **kwargs: Any
255 ) -> int:
256 """Convert question attachments to simple labels for backward compatibility"""
257 # Create a descriptive label that includes the filename if provided
258 if filename:
259 display_label = (
260 f"{label}: {filename}" if label else f"Attachment: {filename}"
261 )
262 else:
263 display_label = label
265 # Just create a label element instead
266 return self.add_label(label=display_label, **kwargs)
268 def add_choice_set(
269 self,
270 choices: list[dict],
271 label: str = "",
272 is_combo: bool = True,
273 colspan: int = 1,
274 rowspan: int = 1,
275 mandatory: bool = False,
276 ) -> int:
277 """
278 Convenience method for adding a complete choice set element in one call.
279 Expects choices to be in the final list[dict] format.
280 """
281 el_code = ElementCode.combo_choices if is_combo else ElementCode.radio_choices
282 return self.add_element(
283 el_code_enum=el_code,
284 label=label,
285 colspan=colspan,
286 rowspan=rowspan,
287 mandatory=mandatory,
288 choices=choices, # Expects list[dict] format
289 )
291 def start_choice_set(
292 self,
293 label: str = "",
294 is_combo: bool = True,
295 colspan: int = 1,
296 rowspan: int = 1,
297 mandatory: bool = False,
298 ) -> "QuestionnaireBuilder":
299 """Begin a choice set context to which choices can be added"""
300 if self.current_qdef_id is None:
301 raise NoParentQuestion(
302 "start_choice_set can only be called within a question context"
303 )
305 # Initialize choices as an empty list
306 self._current_choice_set = ChoiceSetData(
307 label=label,
308 is_combo=is_combo,
309 colspan=colspan,
310 rowspan=rowspan,
311 mandatory=mandatory,
312 choices=[], # Initialize as a list
313 )
314 return self
316 def add_choice(
317 self, value: str, label: Optional[str] = None
318 ) -> "QuestionnaireBuilder":
319 """
320 Add a choice to the current choice set.
321 Appends a dictionary to the internal list.
322 The 'value' from XML is used as the label if no explicit label is given.
323 'autoscore' is added as None, as it's part of the expected structure
324 in QElement but not provided by the basic XML import.
325 """
326 if self.current_qdef_id is None:
327 raise NoParentQuestion(
328 "add_choice can only be called within a question context"
329 )
331 if self._current_choice_set is None:
332 raise RuntimeError(
333 "No active choice set context - call start_choice_set first"
334 )
336 choice_label = label or value # Use value as label if label is None
337 # Append the choice dictionary in the desired format
338 self._current_choice_set["choices"].append(
339 {"label": choice_label, "autoscore": None}
340 )
341 return self
343 def end_choice_set(self) -> "QuestionnaireBuilder":
344 """Finalize and add the choice set element to the question"""
345 if self.current_qdef_id is None:
346 raise NoParentQuestion(
347 "end_choice_set can only be called within a question context"
348 )
350 if self._current_choice_set is None:
351 raise RuntimeError(
352 "No active choice set context - call start_choice_set first"
353 )
355 # Retrieve the already correctly formatted list of choices
356 choices_list = self._current_choice_set["choices"]
357 is_combo = self._current_choice_set["is_combo"]
358 el_code = ElementCode.combo_choices if is_combo else ElementCode.radio_choices
360 # Call add_element directly with the collected data
361 self.add_element(
362 el_code_enum=el_code,
363 label=self._current_choice_set["label"],
364 colspan=self._current_choice_set["colspan"],
365 rowspan=self._current_choice_set["rowspan"],
366 mandatory=self._current_choice_set["mandatory"],
367 choices=choices_list, # Pass the correctly formatted list
368 )
370 # Clean up the temporary state
371 self._current_choice_set = None
372 return self
374 @contextmanager
375 def choice_set_context(
376 self,
377 label: str = "",
378 is_combo: bool = True,
379 colspan: int = 1,
380 rowspan: int = 1,
381 mandatory: bool = False,
382 ) -> Generator["QuestionnaireBuilder", None, None]:
383 """Context manager for creating a choice set"""
384 self.start_choice_set(label, is_combo, colspan, rowspan, mandatory)
385 yield self # Yield self to allow adding choices within the 'with' block
386 self.end_choice_set()
389# Helper functions for importers
392def parse_bool_attr(value: Optional[str], default: bool = False) -> bool:
393 """Helper to convert string boolean attributes to Python booleans"""
394 if value is None:
395 return default
396 try:
397 return value.lower() == "true"
398 except AttributeError:
399 return bool(value)