Coverage for postrfp / shared / movenodes.py: 99%
96 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1"""
2This module provides helper classes to support the movement and reorganization of sections
3and questions within a questionnaire. These classes are designed to handle common tasks
4such as validation, updating relationships, handling orphaned items, and renumbering.
6The primary use case for these classes is to assist higher-level operations that involve
7moving sections or questions to new parent sections or reordering them within the same parent.
9### Using Node Mover Classes
11The `AbstractNodeMover` class and its concrete implementations (`SectionsMover`, `QuestionsMover`)
12encapsulate the logic for moving nodes.
141. Instantiate the appropriate mover class (`SectionsMover` or `QuestionsMover`) with
15 necessary parameters
172. Call the `execute` method on the instance. This method performs the main algorithm
18 and returns the nearest common ancestor's number, which is used for renumbering.
203. After the `execute` method, renumbering should typically be performed starting from
21 the returned `ancestor_number`:
23Execute Method Actions:
251. Validation and Common Ancestor Identification:
26 - The `_validate_and_find_common_ancestor` method is called to validate the provided IDs
27 against the current structure and ensure that the structural integrity of the questionnaire
28 is maintained.
29 - This method also determines the nearest common ancestor, which is used later for renumbering.
312. Find Previous Parents:
32 - The `_find_previous_parents` method (implemented by subclasses) is called to identify
33 the original parent(s) of the nodes being moved.
353. Handling Orphans:
36 - If the `delete_orphans` flag was set to `True` during instantiation, the `_handle_orphans`
37 method is called to identify and delete orphaned items (sections or questions) that are
38 no longer part of the provided IDs under the target parent.
404. Updating Positions and Relationships:
41 - The `_update_positions_and_relationships` method is called to update the positions and
42 parent-child relationships for the provided IDs. This ensures that the new order and hierarchy
43 are reflected in the database.
455. Resetting Positions for Previous Parents:
46 - The `_reset_previous_parent_positions` method (implemented by subclasses) is called to
47 reset the positions of items in the previous parents after their children have been moved
48 to a new parent. This ensures that the previous parents remain consistent.
506. Renumber Ancestor Number:
51 - The `execute` method uses the `ancestor_number` identified in the first step,
52 which is then used to call `renumber_from_common_ancestor`.
53"""
55from abc import ABC, abstractmethod
56from typing import Type, Any, Sequence, TypeVar, Generic
58from sqlalchemy import select
59from sqlalchemy.orm import Session
60from sqlalchemy.orm.collections import InstrumentedList
62from postrfp.model import Section, QuestionInstance, Project, User
63from postrfp.model.questionnaire.b36 import nearest_common_ancestor
64from . import update
67T = TypeVar("T")
70class AbstractNodeMover(ABC, Generic[T]):
71 """
72 Abstract base class implementing the Template pattern for moving nodes
73 (sections or questions) within a project's questionnaire structure.
74 """
76 def __init__(
77 self,
78 session: Session,
79 user: User,
80 project: Project,
81 parent: Section,
82 provided_ids: list[int],
83 delete_orphans: bool = False,
84 ):
85 self.session = session
86 self.user = user
87 self.project = project
88 self.parent = parent
89 self.provided_ids = provided_ids
90 self.delete_orphans = delete_orphans
91 self.provided_id_set = set(provided_ids)
93 # Initialize entity-specific attributes via the abstract method
94 self.entity_class, self.parent_id_attr, self.children, self.found_entities = (
95 self.initialise()
96 )
98 # Derive dependent fields based on initialized values
99 self.found_ids = {e.id for e in self.found_entities}
100 self.current_ids = {c.id for c in self.children}
101 self.entity_lookup = {e.id: e for e in self.found_entities}
103 @abstractmethod
104 def initialise(
105 self,
106 ) -> tuple[Type[Any], str, "InstrumentedList", list[Any]]: # pragma: no cover
107 """
108 Initialize entity-specific attributes.
110 Returns:
111 Tuple containing:
112 - entity_class: The class of entities being managed
113 - parent_id_attr: The name of the parent ID attribute
114 - children: The current children collection from the parent
115 - found_entities: The entities with the provided IDs
116 """
117 pass
119 @abstractmethod
120 def _find_previous_parents(self, moved_node_ids: set[int]) -> T: # pragma: no cover
121 """Find previous parents for moved nodes."""
122 pass
124 @abstractmethod
125 def _reset_previous_parent_positions(
126 self, previous_parents: T
127 ) -> None: # pragma: no cover
128 """Reset positions for previous parents after moving their children."""
129 pass
131 @abstractmethod
132 def _delete_orphan(self, orphan: Any) -> None: # pragma: no cover
133 """Delete an orphaned entity."""
134 pass
136 def execute(self):
137 """
138 Main template method that defines the workflow for moving nodes.
139 Returns the ancestor number for renumbering.
140 """
141 # Find previous parents for moved nodes BEFORE updating relationships
142 moved_node_ids = self.found_ids - self.current_ids
143 previous_parents = self._find_previous_parents(moved_node_ids)
145 if self.delete_orphans:
146 self._handle_orphans()
148 self._update_positions_and_relationships()
150 self._reset_previous_parent_positions(previous_parents)
152 # Find the nearest common ancestor of source and target sections for renumbering
153 ancestor_number = self._find_renumbering_ancestor(previous_parents)
154 self.renumber_ancestor(ancestor_number)
156 def _find_renumbering_ancestor(self, previous_parents) -> str:
157 """
158 Find the nearest common ancestor of source (previous parents) and target (current parent) sections.
159 This ensures both source and target hierarchies get renumbered correctly.
160 """
162 # Collect b36_numbers from both source and target sections
163 b36_numbers = []
165 # Add target section (where items were moved TO)
166 if self.parent.b36_number:
167 b36_numbers.append(self.parent.b36_number)
168 else:
169 return "" # The root section has no b36_number, so renumber from there
171 # Add source sections (where items were moved FROM)
172 if previous_parents:
173 if isinstance(previous_parents, set):
174 # For sections: previous_parents is a set of Section objects
175 b36_numbers.extend(
176 [
177 parent.b36_number
178 for parent in previous_parents
179 if parent and parent.b36_number
180 ]
181 )
182 else:
183 # For questions: previous_parents contains section IDs
184 if previous_parents:
185 source_sections = self.session.scalars(
186 select(Section).filter(Section.id.in_(previous_parents))
187 ).all()
188 b36_numbers.extend(
189 [
190 section.b36_number
191 for section in source_sections
192 if section.b36_number
193 ]
194 )
196 if not b36_numbers:
197 # Fallback to root section if no valid numbers found
198 return ""
200 # Use the existing nearest_common_ancestor function from b36.py
201 return nearest_common_ancestor(b36_numbers)
203 def _handle_orphans(self) -> None:
204 orphan_ids = self.current_ids - self.provided_id_set
205 for orphan_id in orphan_ids:
206 orphan = self.session.get(self.entity_class, orphan_id)
207 if orphan is not None:
208 self._delete_orphan(orphan)
210 def _update_positions_and_relationships(self) -> None:
211 deduplicated_ids = list(dict.fromkeys(self.provided_ids))
213 for idx, entity_id in enumerate(deduplicated_ids, 1):
214 entity = self.entity_lookup[entity_id]
216 entity.position = idx
217 setattr(entity, self.parent_id_attr, self.parent.id)
219 self.session.flush()
220 self.session.refresh(self.parent)
222 def renumber_ancestor(self, ancestor_number: str) -> None:
223 """
224 Renumber sections starting from the nearest common ancestor.
225 """
226 ancestor = (
227 self.session.query(Section)
228 .filter_by(b36_number=ancestor_number, project_id=self.project.id)
229 .one()
230 )
231 ancestor.renumber()
234class SectionsMover(AbstractNodeMover):
235 def initialise(
236 self,
237 ) -> tuple[Type["Section"], str, InstrumentedList[Any], list["Section"]]:
238 entity_class = Section
239 parent_id_attr = "parent_id"
240 children = self.parent.subsections
242 if not isinstance(children, InstrumentedList):
243 children = InstrumentedList(children)
245 found_entities = self.project.sections.filter(
246 Section.id.in_(self.provided_ids)
247 ).all()
249 return entity_class, parent_id_attr, children, found_entities
251 def _find_previous_parents(self, moved_node_ids: set[int]) -> set["Section"]:
252 return {
253 s.parent
254 for s in self.session.scalars(
255 select(Section).filter(Section.id.in_(moved_node_ids))
256 )
257 if s.parent is not None # Filter out None values
258 }
260 def _reset_previous_parent_positions(
261 self, previous_parents: set["Section"]
262 ) -> None:
263 for ex_parent in previous_parents:
264 if ex_parent is not None:
265 ex_parent.set_subsection_positions()
267 def _delete_orphan(self, orphan: "Section") -> None:
268 update.delete_project_section(self.session, self.user, self.project, orphan)
271class QuestionsMover(AbstractNodeMover):
272 def initialise(
273 self,
274 ) -> tuple[
275 Type["QuestionInstance"], str, InstrumentedList[Any], list["QuestionInstance"]
276 ]:
277 entity_class = QuestionInstance
278 parent_id_attr = "section_id"
279 children = self.parent.questions
280 if not isinstance(children, InstrumentedList):
281 children = InstrumentedList(children)
282 found_entities = self.project.questions.filter(
283 QuestionInstance.id.in_(self.provided_ids)
284 ).all()
286 return entity_class, parent_id_attr, children, found_entities
288 def _find_previous_parents(self, moved_node_ids: set[int]) -> Sequence[int]:
289 stmt = select(QuestionInstance.section_id).where(
290 QuestionInstance.id.in_(moved_node_ids)
291 )
292 return self.session.scalars(stmt).all()
294 def _reset_previous_parent_positions(self, ex_parent_ids: Sequence[int]) -> None:
295 for ex_parent in self.session.query(Section).filter(
296 Section.id.in_(ex_parent_ids)
297 ):
298 ex_parent.set_question_positions()
300 def _delete_orphan(self, orphan: "QuestionInstance") -> None:
301 update.delete_project_section_question(
302 self.session, self.user, self.project, self.parent, orphan
303 )