Coverage for postrfp / shared / movenodes.py: 99%

96 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2This module provides helper classes to support the movement and reorganization of sections 

3and questions within a questionnaire. These classes are designed to handle common tasks 

4such as validation, updating relationships, handling orphaned items, and renumbering. 

5 

6The primary use case for these classes is to assist higher-level operations that involve 

7moving sections or questions to new parent sections or reordering them within the same parent. 

8 

9### Using Node Mover Classes 

10 

11The `AbstractNodeMover` class and its concrete implementations (`SectionsMover`, `QuestionsMover`) 

12encapsulate the logic for moving nodes. 

13 

141. Instantiate the appropriate mover class (`SectionsMover` or `QuestionsMover`) with 

15 necessary parameters 

16 

172. Call the `execute` method on the instance. This method performs the main algorithm 

18 and returns the nearest common ancestor's number, which is used for renumbering. 

19 

203. After the `execute` method, renumbering should typically be performed starting from 

21 the returned `ancestor_number`: 

22 

23Execute Method Actions: 

24 

251. Validation and Common Ancestor Identification: 

26 - The `_validate_and_find_common_ancestor` method is called to validate the provided IDs 

27 against the current structure and ensure that the structural integrity of the questionnaire 

28 is maintained. 

29 - This method also determines the nearest common ancestor, which is used later for renumbering. 

30 

312. Find Previous Parents: 

32 - The `_find_previous_parents` method (implemented by subclasses) is called to identify 

33 the original parent(s) of the nodes being moved. 

34 

353. Handling Orphans: 

36 - If the `delete_orphans` flag was set to `True` during instantiation, the `_handle_orphans` 

37 method is called to identify and delete orphaned items (sections or questions) that are 

38 no longer part of the provided IDs under the target parent. 

39 

404. Updating Positions and Relationships: 

41 - The `_update_positions_and_relationships` method is called to update the positions and 

42 parent-child relationships for the provided IDs. This ensures that the new order and hierarchy 

43 are reflected in the database. 

44 

455. Resetting Positions for Previous Parents: 

46 - The `_reset_previous_parent_positions` method (implemented by subclasses) is called to 

47 reset the positions of items in the previous parents after their children have been moved 

48 to a new parent. This ensures that the previous parents remain consistent. 

49 

506. Renumber Ancestor Number: 

51 - The `execute` method uses the `ancestor_number` identified in the first step, 

52 which is then used to call `renumber_from_common_ancestor`. 

53""" 

54 

55from abc import ABC, abstractmethod 

56from typing import Type, Any, Sequence, TypeVar, Generic 

57 

58from sqlalchemy import select 

59from sqlalchemy.orm import Session 

60from sqlalchemy.orm.collections import InstrumentedList 

61 

62from postrfp.model import Section, QuestionInstance, Project, User 

63from postrfp.model.questionnaire.b36 import nearest_common_ancestor 

64from . import update 

65 

66 

67T = TypeVar("T") 

68 

69 

70class AbstractNodeMover(ABC, Generic[T]): 

71 """ 

72 Abstract base class implementing the Template pattern for moving nodes 

73 (sections or questions) within a project's questionnaire structure. 

74 """ 

75 

76 def __init__( 

77 self, 

78 session: Session, 

79 user: User, 

80 project: Project, 

81 parent: Section, 

82 provided_ids: list[int], 

83 delete_orphans: bool = False, 

84 ): 

85 self.session = session 

86 self.user = user 

87 self.project = project 

88 self.parent = parent 

89 self.provided_ids = provided_ids 

90 self.delete_orphans = delete_orphans 

91 self.provided_id_set = set(provided_ids) 

92 

93 # Initialize entity-specific attributes via the abstract method 

94 self.entity_class, self.parent_id_attr, self.children, self.found_entities = ( 

95 self.initialise() 

96 ) 

97 

98 # Derive dependent fields based on initialized values 

99 self.found_ids = {e.id for e in self.found_entities} 

100 self.current_ids = {c.id for c in self.children} 

101 self.entity_lookup = {e.id: e for e in self.found_entities} 

102 

103 @abstractmethod 

104 def initialise( 

105 self, 

106 ) -> tuple[Type[Any], str, "InstrumentedList", list[Any]]: # pragma: no cover 

107 """ 

108 Initialize entity-specific attributes. 

109 

110 Returns: 

111 Tuple containing: 

112 - entity_class: The class of entities being managed 

113 - parent_id_attr: The name of the parent ID attribute 

114 - children: The current children collection from the parent 

115 - found_entities: The entities with the provided IDs 

116 """ 

117 pass 

118 

119 @abstractmethod 

120 def _find_previous_parents(self, moved_node_ids: set[int]) -> T: # pragma: no cover 

121 """Find previous parents for moved nodes.""" 

122 pass 

123 

124 @abstractmethod 

125 def _reset_previous_parent_positions( 

126 self, previous_parents: T 

127 ) -> None: # pragma: no cover 

128 """Reset positions for previous parents after moving their children.""" 

129 pass 

130 

131 @abstractmethod 

132 def _delete_orphan(self, orphan: Any) -> None: # pragma: no cover 

133 """Delete an orphaned entity.""" 

134 pass 

135 

136 def execute(self): 

137 """ 

138 Main template method that defines the workflow for moving nodes. 

139 Returns the ancestor number for renumbering. 

140 """ 

141 # Find previous parents for moved nodes BEFORE updating relationships 

142 moved_node_ids = self.found_ids - self.current_ids 

143 previous_parents = self._find_previous_parents(moved_node_ids) 

144 

145 if self.delete_orphans: 

146 self._handle_orphans() 

147 

148 self._update_positions_and_relationships() 

149 

150 self._reset_previous_parent_positions(previous_parents) 

151 

152 # Find the nearest common ancestor of source and target sections for renumbering 

153 ancestor_number = self._find_renumbering_ancestor(previous_parents) 

154 self.renumber_ancestor(ancestor_number) 

155 

156 def _find_renumbering_ancestor(self, previous_parents) -> str: 

157 """ 

158 Find the nearest common ancestor of source (previous parents) and target (current parent) sections. 

159 This ensures both source and target hierarchies get renumbered correctly. 

160 """ 

161 

162 # Collect b36_numbers from both source and target sections 

163 b36_numbers = [] 

164 

165 # Add target section (where items were moved TO) 

166 if self.parent.b36_number: 

167 b36_numbers.append(self.parent.b36_number) 

168 else: 

169 return "" # The root section has no b36_number, so renumber from there 

170 

171 # Add source sections (where items were moved FROM) 

172 if previous_parents: 

173 if isinstance(previous_parents, set): 

174 # For sections: previous_parents is a set of Section objects 

175 b36_numbers.extend( 

176 [ 

177 parent.b36_number 

178 for parent in previous_parents 

179 if parent and parent.b36_number 

180 ] 

181 ) 

182 else: 

183 # For questions: previous_parents contains section IDs 

184 if previous_parents: 

185 source_sections = self.session.scalars( 

186 select(Section).filter(Section.id.in_(previous_parents)) 

187 ).all() 

188 b36_numbers.extend( 

189 [ 

190 section.b36_number 

191 for section in source_sections 

192 if section.b36_number 

193 ] 

194 ) 

195 

196 if not b36_numbers: 

197 # Fallback to root section if no valid numbers found 

198 return "" 

199 

200 # Use the existing nearest_common_ancestor function from b36.py 

201 return nearest_common_ancestor(b36_numbers) 

202 

203 def _handle_orphans(self) -> None: 

204 orphan_ids = self.current_ids - self.provided_id_set 

205 for orphan_id in orphan_ids: 

206 orphan = self.session.get(self.entity_class, orphan_id) 

207 if orphan is not None: 

208 self._delete_orphan(orphan) 

209 

210 def _update_positions_and_relationships(self) -> None: 

211 deduplicated_ids = list(dict.fromkeys(self.provided_ids)) 

212 

213 for idx, entity_id in enumerate(deduplicated_ids, 1): 

214 entity = self.entity_lookup[entity_id] 

215 

216 entity.position = idx 

217 setattr(entity, self.parent_id_attr, self.parent.id) 

218 

219 self.session.flush() 

220 self.session.refresh(self.parent) 

221 

222 def renumber_ancestor(self, ancestor_number: str) -> None: 

223 """ 

224 Renumber sections starting from the nearest common ancestor. 

225 """ 

226 ancestor = ( 

227 self.session.query(Section) 

228 .filter_by(b36_number=ancestor_number, project_id=self.project.id) 

229 .one() 

230 ) 

231 ancestor.renumber() 

232 

233 

234class SectionsMover(AbstractNodeMover): 

235 def initialise( 

236 self, 

237 ) -> tuple[Type["Section"], str, InstrumentedList[Any], list["Section"]]: 

238 entity_class = Section 

239 parent_id_attr = "parent_id" 

240 children = self.parent.subsections 

241 

242 if not isinstance(children, InstrumentedList): 

243 children = InstrumentedList(children) 

244 

245 found_entities = self.project.sections.filter( 

246 Section.id.in_(self.provided_ids) 

247 ).all() 

248 

249 return entity_class, parent_id_attr, children, found_entities 

250 

251 def _find_previous_parents(self, moved_node_ids: set[int]) -> set["Section"]: 

252 return { 

253 s.parent 

254 for s in self.session.scalars( 

255 select(Section).filter(Section.id.in_(moved_node_ids)) 

256 ) 

257 if s.parent is not None # Filter out None values 

258 } 

259 

260 def _reset_previous_parent_positions( 

261 self, previous_parents: set["Section"] 

262 ) -> None: 

263 for ex_parent in previous_parents: 

264 if ex_parent is not None: 

265 ex_parent.set_subsection_positions() 

266 

267 def _delete_orphan(self, orphan: "Section") -> None: 

268 update.delete_project_section(self.session, self.user, self.project, orphan) 

269 

270 

271class QuestionsMover(AbstractNodeMover): 

272 def initialise( 

273 self, 

274 ) -> tuple[ 

275 Type["QuestionInstance"], str, InstrumentedList[Any], list["QuestionInstance"] 

276 ]: 

277 entity_class = QuestionInstance 

278 parent_id_attr = "section_id" 

279 children = self.parent.questions 

280 if not isinstance(children, InstrumentedList): 

281 children = InstrumentedList(children) 

282 found_entities = self.project.questions.filter( 

283 QuestionInstance.id.in_(self.provided_ids) 

284 ).all() 

285 

286 return entity_class, parent_id_attr, children, found_entities 

287 

288 def _find_previous_parents(self, moved_node_ids: set[int]) -> Sequence[int]: 

289 stmt = select(QuestionInstance.section_id).where( 

290 QuestionInstance.id.in_(moved_node_ids) 

291 ) 

292 return self.session.scalars(stmt).all() 

293 

294 def _reset_previous_parent_positions(self, ex_parent_ids: Sequence[int]) -> None: 

295 for ex_parent in self.session.query(Section).filter( 

296 Section.id.in_(ex_parent_ids) 

297 ): 

298 ex_parent.set_question_positions() 

299 

300 def _delete_orphan(self, orphan: "QuestionInstance") -> None: 

301 update.delete_project_section_question( 

302 self.session, self.user, self.project, self.parent, orphan 

303 )