Coverage for postrfp / ref / json_migration / patches.py: 100%
115 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1from pydantic import BaseModel
3from postrfp.shared.serial.refmodels import JsonPatchOp
5from postrfp.shared.serial.refmodels import (
6 AddOptionalFieldRequest,
7 AddRequiredFieldRequest,
8 DeleteFieldRequest,
9 RenameFieldRequest,
10 MoveFieldRequest,
11)
14class DocPatches(BaseModel):
15 """Result of a migration operation containing patches for schema and data"""
17 schema_patches: list[JsonPatchOp] = []
18 data_patches: list[JsonPatchOp] = []
21class PatchBuilder:
22 """
23 A class to handle JSON schema migrations.
25 Methods prepare JSON patches for modifying both the JSON schemas
26 and json documents conforming to those schemas. All methods support
27 nested JSON Pointer paths for complex schema structures.
28 """
30 def __init__(self, schema: dict) -> None:
31 self.schema = schema
33 def _parse_schema_path(self, path: str) -> tuple[str, str, list[str]]:
34 """
35 Parse a schema path to extract field name, parent path, and path segments.
37 Args:
38 path: JSON Pointer path (e.g., '/properties/company/properties/name')
40 Returns:
41 Tuple of (field_name, parent_path, path_segments)
42 - field_name: The final field name
43 - parent_path: The schema path to the parent properties object
44 - path_segments: List of path segments for navigation
45 """
46 # Remove leading slash and split
47 segments = path.strip("/").split("/")
49 # For schema paths, we need to identify the field name
50 # Paths typically alternate: properties/field/properties/nested/...
51 if len(segments) < 2 or segments[0] != "properties":
52 raise ValueError(
53 f"Invalid schema path: {path}. Must start with /properties/"
54 )
56 field_name = segments[-1]
58 # Build parent path (everything except the last segment)
59 if len(segments) > 2:
60 parent_path = "/" + "/".join(segments[:-1])
61 else:
62 parent_path = "/properties"
64 return field_name, parent_path, segments
66 def _parse_data_path(self, schema_path: str) -> str:
67 """
68 Convert a schema path to a data path.
70 Args:
71 schema_path: Schema JSON Pointer (e.g., '/properties/company/properties/name')
73 Returns:
74 Data JSON Pointer (e.g., '/company/name')
75 """
76 segments = schema_path.strip("/").split("/")
77 # Remove 'properties' keywords
78 data_segments = [seg for seg in segments if seg != "properties"]
79 return "/" + "/".join(data_segments) if data_segments else "/"
81 def _get_schema_value_at_path(self, path: str) -> dict | list | None:
82 """
83 Get the schema value at a given path.
85 Args:
86 path: JSON Pointer path in the schema
88 Returns:
89 The schema object at that path (dict, list, or None for missing paths)
90 """
91 segments = path.strip("/").split("/")
92 current = self.schema
94 for segment in segments:
95 if segment and isinstance(current, dict):
96 current = current.get(segment, {})
97 else:
98 return {}
100 # Return the value as-is (could be dict, list, or other types)
101 return current
103 def _build_field_schema(
104 self, field_type: str, additional_schema: dict | None = None
105 ) -> dict:
106 """
107 Build a complete field schema from type and additional properties.
109 Args:
110 field_type: The JSON Schema type
111 additional_schema: Optional additional schema properties
113 Returns:
114 Complete field schema dict
115 """
116 schema = {"type": field_type}
117 if additional_schema:
118 schema.update(additional_schema)
119 return schema
121 def _handle_required_field_move(
122 self,
123 from_required_path: str,
124 from_field_name: str,
125 to_required_path: str,
126 to_field_name: str,
127 schema_patches: list[JsonPatchOp],
128 ) -> None:
129 """
130 Handle updating required arrays when moving a required field.
132 Args:
133 from_required_path: Path to source required array
134 from_field_name: Name of field in source
135 to_required_path: Path to destination required array
136 to_field_name: Name of field in destination
137 schema_patches: List to append patches to (modified in place)
138 """
139 # Get source required array and check if field is required
140 from_required_array = self._get_schema_value_at_path(
141 from_required_path.strip("/")
142 )
143 was_required = (
144 isinstance(from_required_array, list)
145 and from_field_name in from_required_array
146 )
148 if not was_required:
149 return
151 # Remove from source required array
152 field_index = from_required_array.index(from_field_name) # type: ignore[union-attr]
153 schema_patches.append(
154 JsonPatchOp(
155 op="remove",
156 path=f"{from_required_path}/{field_index}",
157 )
158 )
160 # Add to destination required array
161 to_required_array = self._get_schema_value_at_path(to_required_path.strip("/"))
163 if isinstance(to_required_array, list):
164 # Required array exists, append to it
165 schema_patches.append(
166 JsonPatchOp(
167 op="add",
168 path=f"{to_required_path}/-",
169 value=to_field_name,
170 )
171 )
172 else:
173 # Required array doesn't exist, create it
174 schema_patches.append(
175 JsonPatchOp(
176 op="add",
177 path=to_required_path,
178 value=[to_field_name],
179 )
180 )
182 def add_optional_field(self, request: AddOptionalFieldRequest) -> DocPatches:
183 """
184 Add a new optional field to a JSON schema at the specified path.
186 Supports nested paths like '/properties/company/properties/newField'.
187 """
188 field_name, parent_path, _ = self._parse_schema_path(request.path)
190 # Build the complete schema for the new field
191 field_schema = self._build_field_schema(
192 request.field_type, request.field_schema
193 )
194 field_schema["nullable"] = True
196 return DocPatches(
197 schema_patches=[
198 JsonPatchOp(
199 op="add",
200 path=request.path,
201 value=field_schema,
202 )
203 ]
204 )
206 def add_required_field(self, request: AddRequiredFieldRequest) -> DocPatches:
207 """
208 Add a new required field to a JSON schema and update existing documents.
210 Supports nested paths and automatically updates the appropriate required array.
211 Creates the required array if it doesn't exist.
212 """
213 field_name, parent_path, segments = self._parse_schema_path(request.path)
215 # Build the complete schema for the new field
216 field_schema = self._build_field_schema(
217 request.field_type, request.field_schema
218 )
220 # Determine the required array path based on nesting level
221 # For /properties/field -> /required
222 # For /properties/company/properties/field -> /properties/company/required
223 if len(segments) == 2: # Top-level field
224 required_path = "/required"
225 else:
226 # Nested field - get parent object path and add /required
227 parent_object_path = "/" + "/".join(segments[:-2])
228 required_path = f"{parent_object_path}/required"
230 # Get data path for updating documents
231 data_path = self._parse_data_path(request.path)
233 schema_patches = [
234 JsonPatchOp(
235 op="add",
236 path=request.path,
237 value=field_schema,
238 )
239 ]
241 # Check if required array exists
242 required_array = self._get_schema_value_at_path(required_path.strip("/"))
243 if isinstance(required_array, list):
244 # Required array exists, append to it
245 schema_patches.append(
246 JsonPatchOp(
247 op="add",
248 path=f"{required_path}/-",
249 value=field_name,
250 )
251 )
252 else:
253 # Required array doesn't exist, create it with the field
254 schema_patches.append(
255 JsonPatchOp(
256 op="add",
257 path=required_path,
258 value=[field_name],
259 )
260 )
262 return DocPatches(
263 schema_patches=schema_patches,
264 data_patches=[
265 JsonPatchOp(
266 op="add",
267 path=data_path,
268 value=request.default_value,
269 )
270 ],
271 )
273 def delete_field(self, request: DeleteFieldRequest) -> DocPatches:
274 """
275 Delete a field from a JSON schema and existing documents.
277 Supports nested paths and removes from the appropriate required array if present.
278 """
279 field_name, parent_path, segments = self._parse_schema_path(request.path)
281 # Determine the required array path
282 if len(segments) == 2: # Top-level field
283 required_path = "/required"
284 else:
285 parent_object_path = "/" + "/".join(segments[:-2])
286 required_path = f"{parent_object_path}/required"
288 data_path = self._parse_data_path(request.path)
290 schema_patches = [JsonPatchOp(op="remove", path=request.path)]
292 # Only try to remove from required array if it exists and contains the field
293 required_array = self._get_schema_value_at_path(required_path.strip("/"))
294 if isinstance(required_array, list) and field_name in required_array:
295 # Find the index of the field in the required array
296 field_index = required_array.index(field_name)
297 schema_patches.append(
298 JsonPatchOp(
299 op="remove",
300 path=f"{required_path}/{field_index}",
301 )
302 )
304 return DocPatches(
305 schema_patches=schema_patches,
306 data_patches=[JsonPatchOp(op="remove", path=data_path)],
307 )
309 def rename_field(self, request: RenameFieldRequest) -> DocPatches:
310 """
311 Rename a field in a JSON schema and existing documents.
313 Both old and new paths must be at the same nesting level.
314 """
315 old_field_name, old_parent_path, old_segments = self._parse_schema_path(
316 request.old_path
317 )
318 new_field_name, new_parent_path, new_segments = self._parse_schema_path(
319 request.new_path
320 )
322 # Validate that paths are at the same level
323 if old_parent_path != new_parent_path:
324 raise ValueError(
325 f"Cannot rename across different parent paths. Old: {old_parent_path}, New: {new_parent_path}"
326 )
328 # Get the existing field schema
329 old_field_schema = self._get_schema_value_at_path(request.old_path)
331 # Determine required array path
332 if len(old_segments) == 2:
333 required_path = "/required"
334 else:
335 parent_object_path = "/" + "/".join(old_segments[:-2])
336 required_path = f"{parent_object_path}/required"
338 # Get required array and update it
339 required_array = self._get_schema_value_at_path(required_path.strip("/"))
340 if isinstance(required_array, list):
341 updated_required = [
342 new_field_name if f == old_field_name else f for f in required_array
343 ]
344 else:
345 updated_required = None
347 old_data_path = self._parse_data_path(request.old_path)
348 new_data_path = self._parse_data_path(request.new_path)
350 schema_patches = [
351 JsonPatchOp(
352 op="add",
353 path=request.new_path,
354 value=old_field_schema,
355 ),
356 JsonPatchOp(op="remove", path=request.old_path),
357 ]
359 # Only update required array if it exists and the field was in it
360 if updated_required is not None:
361 schema_patches.append(
362 JsonPatchOp(
363 op="replace",
364 path=required_path,
365 value=updated_required,
366 )
367 )
369 return DocPatches(
370 schema_patches=schema_patches,
371 data_patches=[
372 JsonPatchOp(
373 op="move",
374 path=new_data_path,
375 **{"from": old_data_path},
376 ),
377 ],
378 )
380 def move_field(self, request: MoveFieldRequest) -> DocPatches:
381 """
382 Move a field to a different location in the schema.
384 This is useful for restructuring schemas, like moving a top-level field
385 into a nested object.
386 """
387 from_field_name, from_parent_path, from_segments = self._parse_schema_path(
388 request.from_path
389 )
390 to_field_name, to_parent_path, to_segments = self._parse_schema_path(
391 request.to_path
392 )
394 # Get the field schema from the old location
395 field_schema = self._get_schema_value_at_path(request.from_path)
397 # Determine required array paths
398 if len(from_segments) == 2:
399 from_required_path = "/required"
400 else:
401 from_parent_object_path = "/" + "/".join(from_segments[:-2])
402 from_required_path = f"{from_parent_object_path}/required"
404 if len(to_segments) == 2:
405 to_required_path = "/required"
406 else:
407 to_parent_object_path = "/" + "/".join(to_segments[:-2])
408 to_required_path = f"{to_parent_object_path}/required"
410 schema_patches = [
411 JsonPatchOp(op="add", path=request.to_path, value=field_schema),
412 JsonPatchOp(op="remove", path=request.from_path),
413 ]
415 # Handle required array updates if field was required
416 self._handle_required_field_move(
417 from_required_path,
418 from_field_name,
419 to_required_path,
420 to_field_name,
421 schema_patches,
422 )
424 from_data_path = self._parse_data_path(request.from_path)
425 to_data_path = self._parse_data_path(request.to_path)
427 return DocPatches(
428 schema_patches=schema_patches,
429 data_patches=[
430 JsonPatchOp(
431 op="move",
432 path=to_data_path,
433 **{"from": from_data_path},
434 ),
435 ],
436 )