Coverage for postrfp/shared/movenodes.py: 99%

103 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2This module provides helper classes to support the movement and reorganization of sections 

3and questions within a questionnaire. These classes are designed to handle common tasks 

4such as validation, updating relationships, handling orphaned items, and renumbering. 

5 

6The primary use case for these classes is to assist higher-level operations that involve 

7moving sections or questions to new parent sections or reordering them within the same parent. 

8 

9### Using Node Mover Classes 

10 

11The `AbstractNodeMover` class and its concrete implementations (`SectionsMover`, `QuestionsMover`) 

12encapsulate the logic for moving nodes. 

13 

141. Instantiate the appropriate mover class (`SectionsMover` or `QuestionsMover`) with 

15 necessary parameters 

16 

172. Call the `execute` method on the instance. This method performs the main algorithm 

18 and returns the nearest common ancestor's number, which is used for renumbering. 

19 

203. After the `execute` method, renumbering should typically be performed starting from 

21 the returned `ancestor_number`: 

22 

23Execute Method Actions: 

24 

251. Validation and Common Ancestor Identification: 

26 - The `_validate_and_find_common_ancestor` method is called to validate the provided IDs 

27 against the current structure and ensure that the structural integrity of the questionnaire 

28 is maintained. 

29 - This method also determines the nearest common ancestor, which is used later for renumbering. 

30 

312. Find Previous Parents: 

32 - The `_find_previous_parents` method (implemented by subclasses) is called to identify 

33 the original parent(s) of the nodes being moved. 

34 

353. Handling Orphans: 

36 - If the `delete_orphans` flag was set to `True` during instantiation, the `_handle_orphans` 

37 method is called to identify and delete orphaned items (sections or questions) that are 

38 no longer part of the provided IDs under the target parent. 

39 

404. Updating Positions and Relationships: 

41 - The `_update_positions_and_relationships` method is called to update the positions and 

42 parent-child relationships for the provided IDs. This ensures that the new order and hierarchy 

43 are reflected in the database. 

44 

455. Resetting Positions for Previous Parents: 

46 - The `_reset_previous_parent_positions` method (implemented by subclasses) is called to 

47 reset the positions of items in the previous parents after their children have been moved 

48 to a new parent. This ensures that the previous parents remain consistent. 

49 

506. Renumber Ancestor Number: 

51 - The `execute` method uses the `ancestor_number` identified in the first step, 

52 which is then used to call `renumber_from_common_ancestor`. 

53""" 

54 

55from abc import ABC, abstractmethod 

56from typing import Type, Protocol, Any, runtime_checkable, Sequence, TypeVar, Generic 

57 

58from sqlalchemy import select 

59from sqlalchemy.orm import Session 

60from sqlalchemy.orm.collections import InstrumentedList 

61 

62from postrfp.model import Section, QuestionInstance, Project, User 

63from postrfp.model.questionnaire.b36 import nearest_common_ancestor 

64from . import update 

65 

66 

67@runtime_checkable 

68class EntityWithId(Protocol): 

69 id: Any 

70 

71 

72@runtime_checkable 

73class PositionableEntity(Protocol): 

74 id: Any 

75 position: Any 

76 

77 

78T = TypeVar("T") 

79 

80 

81class AbstractNodeMover(ABC, Generic[T]): 

82 """ 

83 Abstract base class implementing the Template pattern for moving nodes 

84 (sections or questions) within a project's questionnaire structure. 

85 """ 

86 

87 def __init__( 

88 self, 

89 session: Session, 

90 user: User, 

91 project: Project, 

92 parent: Section, 

93 provided_ids: list[int], 

94 delete_orphans: bool = False, 

95 ): 

96 self.session = session 

97 self.user = user 

98 self.project = project 

99 self.parent = parent 

100 self.provided_ids = provided_ids 

101 self.delete_orphans = delete_orphans 

102 self.provided_id_set = set(provided_ids) 

103 

104 # Initialize entity-specific attributes via the abstract method 

105 self.entity_class, self.parent_id_attr, self.children, self.found_entities = ( 

106 self.initialise() 

107 ) 

108 

109 # Derive dependent fields based on initialized values 

110 self.found_ids = {e.id for e in self.found_entities} 

111 self.current_ids = {c.id for c in self.children} 

112 self.entity_lookup = {e.id: e for e in self.found_entities} 

113 

114 @abstractmethod 

115 def initialise( 

116 self, 

117 ) -> tuple[Type[Any], str, "InstrumentedList", list[Any]]: # pragma: no cover 

118 """ 

119 Initialize entity-specific attributes. 

120 

121 Returns: 

122 Tuple containing: 

123 - entity_class: The class of entities being managed 

124 - parent_id_attr: The name of the parent ID attribute 

125 - children: The current children collection from the parent 

126 - found_entities: The entities with the provided IDs 

127 """ 

128 pass 

129 

130 @abstractmethod 

131 def _find_previous_parents(self, moved_node_ids: set[int]) -> T: # pragma: no cover 

132 """Find previous parents for moved nodes.""" 

133 pass 

134 

135 @abstractmethod 

136 def _reset_previous_parent_positions( 

137 self, previous_parents: T 

138 ) -> None: # pragma: no cover 

139 """Reset positions for previous parents after moving their children.""" 

140 pass 

141 

142 @abstractmethod 

143 def _delete_orphan(self, orphan: Any) -> None: # pragma: no cover 

144 """Delete an orphaned entity.""" 

145 pass 

146 

147 def execute(self): 

148 """ 

149 Main template method that defines the workflow for moving nodes. 

150 Returns the ancestor number for renumbering. 

151 """ 

152 # Find previous parents for moved nodes BEFORE updating relationships 

153 moved_node_ids = self.found_ids - self.current_ids 

154 previous_parents = self._find_previous_parents(moved_node_ids) 

155 

156 if self.delete_orphans: 

157 self._handle_orphans() 

158 

159 self._update_positions_and_relationships() 

160 

161 self._reset_previous_parent_positions(previous_parents) 

162 

163 # Find the nearest common ancestor of source and target sections for renumbering 

164 ancestor_number = self._find_renumbering_ancestor(previous_parents) 

165 self.renumber_ancestor(ancestor_number) 

166 

167 def _find_renumbering_ancestor(self, previous_parents) -> str: 

168 """ 

169 Find the nearest common ancestor of source (previous parents) and target (current parent) sections. 

170 This ensures both source and target hierarchies get renumbered correctly. 

171 """ 

172 

173 # Collect b36_numbers from both source and target sections 

174 b36_numbers = [] 

175 

176 # Add target section (where items were moved TO) 

177 if self.parent.b36_number: 

178 b36_numbers.append(self.parent.b36_number) 

179 else: 

180 return "" # The root section has no b36_number, so renumber from there 

181 

182 # Add source sections (where items were moved FROM) 

183 if previous_parents: 

184 if isinstance(previous_parents, set): 

185 # For sections: previous_parents is a set of Section objects 

186 b36_numbers.extend( 

187 [ 

188 parent.b36_number 

189 for parent in previous_parents 

190 if parent and parent.b36_number 

191 ] 

192 ) 

193 else: 

194 # For questions: previous_parents contains section IDs 

195 if previous_parents: 

196 source_sections = self.session.scalars( 

197 select(Section).filter(Section.id.in_(previous_parents)) 

198 ).all() 

199 b36_numbers.extend( 

200 [ 

201 section.b36_number 

202 for section in source_sections 

203 if section.b36_number 

204 ] 

205 ) 

206 

207 if not b36_numbers: 

208 # Fallback to root section if no valid numbers found 

209 return "" 

210 

211 # Use the existing nearest_common_ancestor function from b36.py 

212 return nearest_common_ancestor(b36_numbers) 

213 

214 def _handle_orphans(self) -> None: 

215 orphan_ids = self.current_ids - self.provided_id_set 

216 for orphan_id in orphan_ids: 

217 orphan = self.session.get(self.entity_class, orphan_id) 

218 if orphan is not None: 

219 self._delete_orphan(orphan) 

220 

221 def _update_positions_and_relationships(self) -> None: 

222 deduplicated_ids = list(dict.fromkeys(self.provided_ids)) 

223 

224 for idx, entity_id in enumerate(deduplicated_ids, 1): 

225 entity = self.entity_lookup[entity_id] 

226 

227 entity.position = idx 

228 setattr(entity, self.parent_id_attr, self.parent.id) 

229 

230 self.session.flush() 

231 self.session.refresh(self.parent) 

232 

233 def renumber_ancestor(self, ancestor_number: str) -> None: 

234 """ 

235 Renumber sections starting from the nearest common ancestor. 

236 """ 

237 ancestor = ( 

238 self.session.query(Section) 

239 .filter_by(b36_number=ancestor_number, project_id=self.project.id) 

240 .one() 

241 ) 

242 ancestor.renumber() 

243 

244 

245class SectionsMover(AbstractNodeMover): 

246 def initialise( 

247 self, 

248 ) -> tuple[Type["Section"], str, InstrumentedList[Any], list["Section"]]: 

249 entity_class = Section 

250 parent_id_attr = "parent_id" 

251 children = self.parent.subsections 

252 

253 if not isinstance(children, InstrumentedList): 

254 children = InstrumentedList(children) 

255 

256 found_entities = self.project.sections.filter( 

257 Section.id.in_(self.provided_ids) 

258 ).all() 

259 

260 return entity_class, parent_id_attr, children, found_entities 

261 

262 def _find_previous_parents(self, moved_node_ids: set[int]) -> set["Section"]: 

263 return { 

264 s.parent 

265 for s in self.session.scalars( 

266 select(Section).filter(Section.id.in_(moved_node_ids)) 

267 ) 

268 if s.parent is not None # Filter out None values 

269 } 

270 

271 def _reset_previous_parent_positions( 

272 self, previous_parents: set["Section"] 

273 ) -> None: 

274 for ex_parent in previous_parents: 

275 if ex_parent is not None: 

276 ex_parent.set_subsection_positions() 

277 

278 def _delete_orphan(self, orphan: "Section") -> None: 

279 update.delete_project_section(self.session, self.user, self.project, orphan) 

280 

281 

282class QuestionsMover(AbstractNodeMover): 

283 def initialise( 

284 self, 

285 ) -> tuple[ 

286 Type["QuestionInstance"], str, InstrumentedList[Any], list["QuestionInstance"] 

287 ]: 

288 entity_class = QuestionInstance 

289 parent_id_attr = "section_id" 

290 children = self.parent.questions 

291 if not isinstance(children, InstrumentedList): 

292 children = InstrumentedList(children) 

293 found_entities = self.project.questions.filter( 

294 QuestionInstance.id.in_(self.provided_ids) 

295 ).all() 

296 

297 return entity_class, parent_id_attr, children, found_entities 

298 

299 def _find_previous_parents(self, moved_node_ids: set[int]) -> Sequence[int]: 

300 stmt = select(QuestionInstance.section_id).where( 

301 QuestionInstance.id.in_(moved_node_ids) 

302 ) 

303 return self.session.scalars(stmt).all() 

304 

305 def _reset_previous_parent_positions(self, ex_parent_ids: Sequence[int]) -> None: 

306 for ex_parent in self.session.query(Section).filter( 

307 Section.id.in_(ex_parent_ids) 

308 ): 

309 ex_parent.set_question_positions() 

310 

311 def _delete_orphan(self, orphan: "QuestionInstance") -> None: 

312 update.delete_project_section_question( 

313 self.session, self.user, self.project, self.parent, orphan 

314 )