Coverage for postrfp/buyer/api/io/streaming.py: 99%
97 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from typing import Any
3from postrfp.model import (
4 Section,
5 Project,
6 QuestionInstance,
7 QuestionDefinition,
8 QElement,
9)
10from postrfp.shared.fetch import weightq
13class StreamingVisitor:
14 """
15 A visitor that processes sections and questions incrementally without
16 loading the entire hierarchy into memory at once.
17 """
19 def __init__(self, session: Any, project_id: int) -> None:
20 self.session = session
21 self.project_id = project_id
22 self.streaming_session = session
24 def visit(self) -> None:
25 """Start the streaming traversal process"""
26 # Get the root section ID
27 root_id = (
28 self.streaming_session.query(Project.section_id)
29 .filter(Project.id == self.project_id)
30 .scalar()
31 )
33 # Begin traversal from root
34 self._visit_section(root_id, [], 0)
36 def _visit_section(
37 self, section_id: int, parent_path: list[int], depth: int
38 ) -> None:
39 """Visit a single section, its questions, then its subsections"""
40 # Load just this section
41 section = (
42 self.streaming_session.query(Section).filter(Section.id == section_id).one()
43 )
45 # Build current path
46 current_path = parent_path + [section_id]
48 # Process this section
49 self.process_section(section, current_path, depth)
51 # Load and process questions for this section only
52 question_instances = (
53 self.streaming_session.query(QuestionInstance)
54 .filter(QuestionInstance.section_id == section_id)
55 .order_by(QuestionInstance.position)
56 .all()
57 )
59 for qinstance in question_instances:
60 self.process_question(qinstance, current_path, depth)
62 # Optionally load question definition and elements if needed
63 if self.should_load_question_details(qinstance):
64 qdef = (
65 self.streaming_session.query(QuestionDefinition)
66 .filter(QuestionDefinition.id == qinstance.question_def_id)
67 .one()
68 )
70 elements = (
71 self.streaming_session.query(QElement)
72 .filter(QElement.question_id == qinstance.question_def_id)
73 .order_by(QElement.row, QElement.col)
74 .all()
75 )
77 self.process_question_details(
78 qinstance, qdef, elements, current_path, depth
79 )
81 # Expunge elements to free memory
82 for element in elements:
83 self.streaming_session.expunge(element)
84 self.streaming_session.expunge(qdef)
86 # Expunge the question to free memory
87 self.streaming_session.expunge(qinstance)
89 # Find child sections IDs (not loading the objects)
90 child_section_ids = (
91 self.streaming_session.query(Section.id)
92 .filter(Section.parent_id == section_id)
93 .filter(Section.project_id == self.project_id)
94 .order_by(Section.position)
95 .all()
96 )
98 # Expunge the current section before moving to children
99 self.streaming_session.expunge(section)
101 # Process child sections one at a time
102 for (child_id,) in child_section_ids:
103 self._visit_section(child_id, current_path, depth + 1)
105 self.finish_section(section)
107 def finish_section(self, section: Section) -> None:
108 pass
110 def process_section(self, section: Section, path: list[int], depth: int) -> None:
111 """Override this method to process each section"""
112 pass
114 def process_question(
115 self, question: QuestionInstance, section_path: list[int], depth: int
116 ) -> None:
117 """Override this method to process each question"""
118 pass
120 def process_question_details(
121 self,
122 question: QuestionInstance,
123 qdef: QuestionDefinition,
124 elements: list[QElement],
125 section_path: list[int],
126 depth: int,
127 ) -> None:
128 """Override this method to process question definitions and elements"""
129 pass
131 def should_load_question_details(self, question: QuestionInstance) -> bool:
132 """Override to control when to load question details"""
133 return True
135 def cleanup(self) -> None:
136 """Clean up resources"""
137 self.streaming_session.close()
140class XmlExportVisitor(StreamingVisitor):
141 def __init__(self, session: Any, project_id: int, file_path: str) -> None:
142 super().__init__(session, project_id)
143 self.file_path = file_path
144 self.xml_file: Any = None
145 self.xml: Any = None
147 # Load project and weights for efficient access
148 self.project: Project = (
149 session.query(Project).filter(Project.id == project_id).one()
150 )
151 weightset_id = self.project.get_or_create_default_weighting_set_id()
153 weights_data = weightq.total_weightings_dict(self.project, weightset_id)
154 # Convert to lookup dicts
155 self.question_weights: dict[int, float] = {
156 q["question_instance_id"]: q["weight"] for q in weights_data["questions"]
157 }
158 self.section_weights: dict[int, float] = {
159 s["section_id"]: s["weight"] for s in weights_data["sections"]
160 }
162 def visit(self) -> None:
163 """Process the entire questionnaire into XML"""
164 from xml.sax.saxutils import XMLGenerator
166 # Set up XML file
167 self.xml_file = open(self.file_path, "wb")
168 self.xml = XMLGenerator(self.xml_file, "utf-8")
169 self.xml.startDocument()
170 self.xml.startElement("questionnaire", {"project_id": str(self.project_id)})
172 # Traverse the structure
173 super().visit()
175 # Close XML document
176 self.xml.endElement("questionnaire")
177 self.xml.endDocument()
178 self.cleanup()
180 def process_section(self, section: Section, _path: list[int], depth: int) -> None:
181 """Write section start to XML"""
182 attrs = {
183 "id": str(section.id),
184 "number": section.number or "",
185 "weight": str(self.section_weights.get(section.id, 1)),
186 }
187 self.xml.startElement("section", attrs)
189 # Write title and description immediately
190 self.xml.startElement("title", {})
191 self.xml.characters(section.title)
192 self.xml.endElement("title")
194 if section.description:
195 self.xml.startElement("description", {})
196 self.xml.characters(section.description)
197 self.xml.endElement("description")
199 def finish_section(self, section: Section) -> None:
200 """Close section element immediately after processing all its content"""
201 self.xml.endElement("section")
203 def process_question(
204 self, question: QuestionInstance, _section_path: list[int], depth: int
205 ) -> None:
206 """Write complete question to XML"""
207 attrs = {
208 "id": str(question.id),
209 "number": question.number or "",
210 "weight": str(self.question_weights.get(question.id, 1)),
211 }
212 self.xml.startElement("question", attrs)
214 def process_question_details(
215 self,
216 question: QuestionInstance,
217 qdef: QuestionDefinition,
218 elements: list[QElement],
219 _section_path: list[int],
220 _depth: int,
221 ) -> None:
222 """Write question details and close question element"""
223 # Write title and description
224 self.xml.startElement("title", {})
225 self.xml.characters(qdef.title)
226 self.xml.endElement("title")
228 # Write elements
229 if elements:
230 self.xml.startElement("elements", {})
231 for element in elements:
232 self._write_element(element)
233 self.xml.endElement("elements")
235 # Close question element immediately
236 self.xml.endElement("question")
238 def _write_element(self, element: QElement) -> None:
239 """Write a question element to XML"""
240 attrs = {"id": str(element.id), "type": element.el_type}
242 # Add specific attributes based on element type
243 if hasattr(element, "multichoice"):
244 attrs["multichoice"] = str(element.multichoice)
246 self.xml.startElement("element", attrs)
247 if element.label:
248 self.xml.characters(element.label)
249 self.xml.endElement("element")
251 def cleanup(self) -> None:
252 """Ensure proper cleanup"""
253 # Close file if we exited early
254 if self.xml_file and not self.xml_file.closed:
255 self.xml_file.close()
257 super().cleanup()