Coverage for postrfp/shared/movenodes.py: 99%
103 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2This module provides helper classes to support the movement and reorganization of sections
3and questions within a questionnaire. These classes are designed to handle common tasks
4such as validation, updating relationships, handling orphaned items, and renumbering.
6The primary use case for these classes is to assist higher-level operations that involve
7moving sections or questions to new parent sections or reordering them within the same parent.
9### Using Node Mover Classes
11The `AbstractNodeMover` class and its concrete implementations (`SectionsMover`, `QuestionsMover`)
12encapsulate the logic for moving nodes.
141. Instantiate the appropriate mover class (`SectionsMover` or `QuestionsMover`) with
15 necessary parameters
172. Call the `execute` method on the instance. This method performs the main algorithm
18 and returns the nearest common ancestor's number, which is used for renumbering.
203. After the `execute` method, renumbering should typically be performed starting from
21 the returned `ancestor_number`:
23Execute Method Actions:
251. Validation and Common Ancestor Identification:
26 - The `_validate_and_find_common_ancestor` method is called to validate the provided IDs
27 against the current structure and ensure that the structural integrity of the questionnaire
28 is maintained.
29 - This method also determines the nearest common ancestor, which is used later for renumbering.
312. Find Previous Parents:
32 - The `_find_previous_parents` method (implemented by subclasses) is called to identify
33 the original parent(s) of the nodes being moved.
353. Handling Orphans:
36 - If the `delete_orphans` flag was set to `True` during instantiation, the `_handle_orphans`
37 method is called to identify and delete orphaned items (sections or questions) that are
38 no longer part of the provided IDs under the target parent.
404. Updating Positions and Relationships:
41 - The `_update_positions_and_relationships` method is called to update the positions and
42 parent-child relationships for the provided IDs. This ensures that the new order and hierarchy
43 are reflected in the database.
455. Resetting Positions for Previous Parents:
46 - The `_reset_previous_parent_positions` method (implemented by subclasses) is called to
47 reset the positions of items in the previous parents after their children have been moved
48 to a new parent. This ensures that the previous parents remain consistent.
506. Renumber Ancestor Number:
51 - The `execute` method uses the `ancestor_number` identified in the first step,
52 which is then used to call `renumber_from_common_ancestor`.
53"""
55from abc import ABC, abstractmethod
56from typing import Type, Protocol, Any, runtime_checkable, Sequence, TypeVar, Generic
58from sqlalchemy import select
59from sqlalchemy.orm import Session
60from sqlalchemy.orm.collections import InstrumentedList
62from postrfp.model import Section, QuestionInstance, Project, User
63from postrfp.model.questionnaire.b36 import nearest_common_ancestor
64from . import update
67@runtime_checkable
68class EntityWithId(Protocol):
69 id: Any
72@runtime_checkable
73class PositionableEntity(Protocol):
74 id: Any
75 position: Any
78T = TypeVar("T")
81class AbstractNodeMover(ABC, Generic[T]):
82 """
83 Abstract base class implementing the Template pattern for moving nodes
84 (sections or questions) within a project's questionnaire structure.
85 """
87 def __init__(
88 self,
89 session: Session,
90 user: User,
91 project: Project,
92 parent: Section,
93 provided_ids: list[int],
94 delete_orphans: bool = False,
95 ):
96 self.session = session
97 self.user = user
98 self.project = project
99 self.parent = parent
100 self.provided_ids = provided_ids
101 self.delete_orphans = delete_orphans
102 self.provided_id_set = set(provided_ids)
104 # Initialize entity-specific attributes via the abstract method
105 self.entity_class, self.parent_id_attr, self.children, self.found_entities = (
106 self.initialise()
107 )
109 # Derive dependent fields based on initialized values
110 self.found_ids = {e.id for e in self.found_entities}
111 self.current_ids = {c.id for c in self.children}
112 self.entity_lookup = {e.id: e for e in self.found_entities}
114 @abstractmethod
115 def initialise(
116 self,
117 ) -> tuple[Type[Any], str, "InstrumentedList", list[Any]]: # pragma: no cover
118 """
119 Initialize entity-specific attributes.
121 Returns:
122 Tuple containing:
123 - entity_class: The class of entities being managed
124 - parent_id_attr: The name of the parent ID attribute
125 - children: The current children collection from the parent
126 - found_entities: The entities with the provided IDs
127 """
128 pass
130 @abstractmethod
131 def _find_previous_parents(self, moved_node_ids: set[int]) -> T: # pragma: no cover
132 """Find previous parents for moved nodes."""
133 pass
135 @abstractmethod
136 def _reset_previous_parent_positions(
137 self, previous_parents: T
138 ) -> None: # pragma: no cover
139 """Reset positions for previous parents after moving their children."""
140 pass
142 @abstractmethod
143 def _delete_orphan(self, orphan: Any) -> None: # pragma: no cover
144 """Delete an orphaned entity."""
145 pass
147 def execute(self):
148 """
149 Main template method that defines the workflow for moving nodes.
150 Returns the ancestor number for renumbering.
151 """
152 # Find previous parents for moved nodes BEFORE updating relationships
153 moved_node_ids = self.found_ids - self.current_ids
154 previous_parents = self._find_previous_parents(moved_node_ids)
156 if self.delete_orphans:
157 self._handle_orphans()
159 self._update_positions_and_relationships()
161 self._reset_previous_parent_positions(previous_parents)
163 # Find the nearest common ancestor of source and target sections for renumbering
164 ancestor_number = self._find_renumbering_ancestor(previous_parents)
165 self.renumber_ancestor(ancestor_number)
167 def _find_renumbering_ancestor(self, previous_parents) -> str:
168 """
169 Find the nearest common ancestor of source (previous parents) and target (current parent) sections.
170 This ensures both source and target hierarchies get renumbered correctly.
171 """
173 # Collect b36_numbers from both source and target sections
174 b36_numbers = []
176 # Add target section (where items were moved TO)
177 if self.parent.b36_number:
178 b36_numbers.append(self.parent.b36_number)
179 else:
180 return "" # The root section has no b36_number, so renumber from there
182 # Add source sections (where items were moved FROM)
183 if previous_parents:
184 if isinstance(previous_parents, set):
185 # For sections: previous_parents is a set of Section objects
186 b36_numbers.extend(
187 [
188 parent.b36_number
189 for parent in previous_parents
190 if parent and parent.b36_number
191 ]
192 )
193 else:
194 # For questions: previous_parents contains section IDs
195 if previous_parents:
196 source_sections = self.session.scalars(
197 select(Section).filter(Section.id.in_(previous_parents))
198 ).all()
199 b36_numbers.extend(
200 [
201 section.b36_number
202 for section in source_sections
203 if section.b36_number
204 ]
205 )
207 if not b36_numbers:
208 # Fallback to root section if no valid numbers found
209 return ""
211 # Use the existing nearest_common_ancestor function from b36.py
212 return nearest_common_ancestor(b36_numbers)
214 def _handle_orphans(self) -> None:
215 orphan_ids = self.current_ids - self.provided_id_set
216 for orphan_id in orphan_ids:
217 orphan = self.session.get(self.entity_class, orphan_id)
218 if orphan is not None:
219 self._delete_orphan(orphan)
221 def _update_positions_and_relationships(self) -> None:
222 deduplicated_ids = list(dict.fromkeys(self.provided_ids))
224 for idx, entity_id in enumerate(deduplicated_ids, 1):
225 entity = self.entity_lookup[entity_id]
227 entity.position = idx
228 setattr(entity, self.parent_id_attr, self.parent.id)
230 self.session.flush()
231 self.session.refresh(self.parent)
233 def renumber_ancestor(self, ancestor_number: str) -> None:
234 """
235 Renumber sections starting from the nearest common ancestor.
236 """
237 ancestor = (
238 self.session.query(Section)
239 .filter_by(b36_number=ancestor_number, project_id=self.project.id)
240 .one()
241 )
242 ancestor.renumber()
245class SectionsMover(AbstractNodeMover):
246 def initialise(
247 self,
248 ) -> tuple[Type["Section"], str, InstrumentedList[Any], list["Section"]]:
249 entity_class = Section
250 parent_id_attr = "parent_id"
251 children = self.parent.subsections
253 if not isinstance(children, InstrumentedList):
254 children = InstrumentedList(children)
256 found_entities = self.project.sections.filter(
257 Section.id.in_(self.provided_ids)
258 ).all()
260 return entity_class, parent_id_attr, children, found_entities
262 def _find_previous_parents(self, moved_node_ids: set[int]) -> set["Section"]:
263 return {
264 s.parent
265 for s in self.session.scalars(
266 select(Section).filter(Section.id.in_(moved_node_ids))
267 )
268 if s.parent is not None # Filter out None values
269 }
271 def _reset_previous_parent_positions(
272 self, previous_parents: set["Section"]
273 ) -> None:
274 for ex_parent in previous_parents:
275 if ex_parent is not None:
276 ex_parent.set_subsection_positions()
278 def _delete_orphan(self, orphan: "Section") -> None:
279 update.delete_project_section(self.session, self.user, self.project, orphan)
282class QuestionsMover(AbstractNodeMover):
283 def initialise(
284 self,
285 ) -> tuple[
286 Type["QuestionInstance"], str, InstrumentedList[Any], list["QuestionInstance"]
287 ]:
288 entity_class = QuestionInstance
289 parent_id_attr = "section_id"
290 children = self.parent.questions
291 if not isinstance(children, InstrumentedList):
292 children = InstrumentedList(children)
293 found_entities = self.project.questions.filter(
294 QuestionInstance.id.in_(self.provided_ids)
295 ).all()
297 return entity_class, parent_id_attr, children, found_entities
299 def _find_previous_parents(self, moved_node_ids: set[int]) -> Sequence[int]:
300 stmt = select(QuestionInstance.section_id).where(
301 QuestionInstance.id.in_(moved_node_ids)
302 )
303 return self.session.scalars(stmt).all()
305 def _reset_previous_parent_positions(self, ex_parent_ids: Sequence[int]) -> None:
306 for ex_parent in self.session.query(Section).filter(
307 Section.id.in_(ex_parent_ids)
308 ):
309 ex_parent.set_question_positions()
311 def _delete_orphan(self, orphan: "QuestionInstance") -> None:
312 update.delete_project_section_question(
313 self.session, self.user, self.project, self.parent, orphan
314 )