Coverage for postrfp/buyer/api/io/streaming.py: 99%

97 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from typing import Any 

2 

3from postrfp.model import ( 

4 Section, 

5 Project, 

6 QuestionInstance, 

7 QuestionDefinition, 

8 QElement, 

9) 

10from postrfp.shared.fetch import weightq 

11 

12 

13class StreamingVisitor: 

14 """ 

15 A visitor that processes sections and questions incrementally without 

16 loading the entire hierarchy into memory at once. 

17 """ 

18 

19 def __init__(self, session: Any, project_id: int) -> None: 

20 self.session = session 

21 self.project_id = project_id 

22 self.streaming_session = session 

23 

24 def visit(self) -> None: 

25 """Start the streaming traversal process""" 

26 # Get the root section ID 

27 root_id = ( 

28 self.streaming_session.query(Project.section_id) 

29 .filter(Project.id == self.project_id) 

30 .scalar() 

31 ) 

32 

33 # Begin traversal from root 

34 self._visit_section(root_id, [], 0) 

35 

36 def _visit_section( 

37 self, section_id: int, parent_path: list[int], depth: int 

38 ) -> None: 

39 """Visit a single section, its questions, then its subsections""" 

40 # Load just this section 

41 section = ( 

42 self.streaming_session.query(Section).filter(Section.id == section_id).one() 

43 ) 

44 

45 # Build current path 

46 current_path = parent_path + [section_id] 

47 

48 # Process this section 

49 self.process_section(section, current_path, depth) 

50 

51 # Load and process questions for this section only 

52 question_instances = ( 

53 self.streaming_session.query(QuestionInstance) 

54 .filter(QuestionInstance.section_id == section_id) 

55 .order_by(QuestionInstance.position) 

56 .all() 

57 ) 

58 

59 for qinstance in question_instances: 

60 self.process_question(qinstance, current_path, depth) 

61 

62 # Optionally load question definition and elements if needed 

63 if self.should_load_question_details(qinstance): 

64 qdef = ( 

65 self.streaming_session.query(QuestionDefinition) 

66 .filter(QuestionDefinition.id == qinstance.question_def_id) 

67 .one() 

68 ) 

69 

70 elements = ( 

71 self.streaming_session.query(QElement) 

72 .filter(QElement.question_id == qinstance.question_def_id) 

73 .order_by(QElement.row, QElement.col) 

74 .all() 

75 ) 

76 

77 self.process_question_details( 

78 qinstance, qdef, elements, current_path, depth 

79 ) 

80 

81 # Expunge elements to free memory 

82 for element in elements: 

83 self.streaming_session.expunge(element) 

84 self.streaming_session.expunge(qdef) 

85 

86 # Expunge the question to free memory 

87 self.streaming_session.expunge(qinstance) 

88 

89 # Find child sections IDs (not loading the objects) 

90 child_section_ids = ( 

91 self.streaming_session.query(Section.id) 

92 .filter(Section.parent_id == section_id) 

93 .filter(Section.project_id == self.project_id) 

94 .order_by(Section.position) 

95 .all() 

96 ) 

97 

98 # Expunge the current section before moving to children 

99 self.streaming_session.expunge(section) 

100 

101 # Process child sections one at a time 

102 for (child_id,) in child_section_ids: 

103 self._visit_section(child_id, current_path, depth + 1) 

104 

105 self.finish_section(section) 

106 

107 def finish_section(self, section: Section) -> None: 

108 pass 

109 

110 def process_section(self, section: Section, path: list[int], depth: int) -> None: 

111 """Override this method to process each section""" 

112 pass 

113 

114 def process_question( 

115 self, question: QuestionInstance, section_path: list[int], depth: int 

116 ) -> None: 

117 """Override this method to process each question""" 

118 pass 

119 

120 def process_question_details( 

121 self, 

122 question: QuestionInstance, 

123 qdef: QuestionDefinition, 

124 elements: list[QElement], 

125 section_path: list[int], 

126 depth: int, 

127 ) -> None: 

128 """Override this method to process question definitions and elements""" 

129 pass 

130 

131 def should_load_question_details(self, question: QuestionInstance) -> bool: 

132 """Override to control when to load question details""" 

133 return True 

134 

135 def cleanup(self) -> None: 

136 """Clean up resources""" 

137 self.streaming_session.close() 

138 

139 

140class XmlExportVisitor(StreamingVisitor): 

141 def __init__(self, session: Any, project_id: int, file_path: str) -> None: 

142 super().__init__(session, project_id) 

143 self.file_path = file_path 

144 self.xml_file: Any = None 

145 self.xml: Any = None 

146 

147 # Load project and weights for efficient access 

148 self.project: Project = ( 

149 session.query(Project).filter(Project.id == project_id).one() 

150 ) 

151 weightset_id = self.project.get_or_create_default_weighting_set_id() 

152 

153 weights_data = weightq.total_weightings_dict(self.project, weightset_id) 

154 # Convert to lookup dicts 

155 self.question_weights: dict[int, float] = { 

156 q["question_instance_id"]: q["weight"] for q in weights_data["questions"] 

157 } 

158 self.section_weights: dict[int, float] = { 

159 s["section_id"]: s["weight"] for s in weights_data["sections"] 

160 } 

161 

162 def visit(self) -> None: 

163 """Process the entire questionnaire into XML""" 

164 from xml.sax.saxutils import XMLGenerator 

165 

166 # Set up XML file 

167 self.xml_file = open(self.file_path, "wb") 

168 self.xml = XMLGenerator(self.xml_file, "utf-8") 

169 self.xml.startDocument() 

170 self.xml.startElement("questionnaire", {"project_id": str(self.project_id)}) 

171 

172 # Traverse the structure 

173 super().visit() 

174 

175 # Close XML document 

176 self.xml.endElement("questionnaire") 

177 self.xml.endDocument() 

178 self.cleanup() 

179 

180 def process_section(self, section: Section, _path: list[int], depth: int) -> None: 

181 """Write section start to XML""" 

182 attrs = { 

183 "id": str(section.id), 

184 "number": section.number or "", 

185 "weight": str(self.section_weights.get(section.id, 1)), 

186 } 

187 self.xml.startElement("section", attrs) 

188 

189 # Write title and description immediately 

190 self.xml.startElement("title", {}) 

191 self.xml.characters(section.title) 

192 self.xml.endElement("title") 

193 

194 if section.description: 

195 self.xml.startElement("description", {}) 

196 self.xml.characters(section.description) 

197 self.xml.endElement("description") 

198 

199 def finish_section(self, section: Section) -> None: 

200 """Close section element immediately after processing all its content""" 

201 self.xml.endElement("section") 

202 

203 def process_question( 

204 self, question: QuestionInstance, _section_path: list[int], depth: int 

205 ) -> None: 

206 """Write complete question to XML""" 

207 attrs = { 

208 "id": str(question.id), 

209 "number": question.number or "", 

210 "weight": str(self.question_weights.get(question.id, 1)), 

211 } 

212 self.xml.startElement("question", attrs) 

213 

214 def process_question_details( 

215 self, 

216 question: QuestionInstance, 

217 qdef: QuestionDefinition, 

218 elements: list[QElement], 

219 _section_path: list[int], 

220 _depth: int, 

221 ) -> None: 

222 """Write question details and close question element""" 

223 # Write title and description 

224 self.xml.startElement("title", {}) 

225 self.xml.characters(qdef.title) 

226 self.xml.endElement("title") 

227 

228 # Write elements 

229 if elements: 

230 self.xml.startElement("elements", {}) 

231 for element in elements: 

232 self._write_element(element) 

233 self.xml.endElement("elements") 

234 

235 # Close question element immediately 

236 self.xml.endElement("question") 

237 

238 def _write_element(self, element: QElement) -> None: 

239 """Write a question element to XML""" 

240 attrs = {"id": str(element.id), "type": element.el_type} 

241 

242 # Add specific attributes based on element type 

243 if hasattr(element, "multichoice"): 

244 attrs["multichoice"] = str(element.multichoice) 

245 

246 self.xml.startElement("element", attrs) 

247 if element.label: 

248 self.xml.characters(element.label) 

249 self.xml.endElement("element") 

250 

251 def cleanup(self) -> None: 

252 """Ensure proper cleanup""" 

253 # Close file if we exited early 

254 if self.xml_file and not self.xml_file.closed: 

255 self.xml_file.close() 

256 

257 super().cleanup()