Coverage for postrfp / ref / json_migration / patches.py: 100%

115 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from pydantic import BaseModel 

2 

3from postrfp.shared.serial.refmodels import JsonPatchOp 

4 

5from postrfp.shared.serial.refmodels import ( 

6 AddOptionalFieldRequest, 

7 AddRequiredFieldRequest, 

8 DeleteFieldRequest, 

9 RenameFieldRequest, 

10 MoveFieldRequest, 

11) 

12 

13 

14class DocPatches(BaseModel): 

15 """Result of a migration operation containing patches for schema and data""" 

16 

17 schema_patches: list[JsonPatchOp] = [] 

18 data_patches: list[JsonPatchOp] = [] 

19 

20 

21class PatchBuilder: 

22 """ 

23 A class to handle JSON schema migrations. 

24 

25 Methods prepare JSON patches for modifying both the JSON schemas 

26 and json documents conforming to those schemas. All methods support 

27 nested JSON Pointer paths for complex schema structures. 

28 """ 

29 

30 def __init__(self, schema: dict) -> None: 

31 self.schema = schema 

32 

33 def _parse_schema_path(self, path: str) -> tuple[str, str, list[str]]: 

34 """ 

35 Parse a schema path to extract field name, parent path, and path segments. 

36 

37 Args: 

38 path: JSON Pointer path (e.g., '/properties/company/properties/name') 

39 

40 Returns: 

41 Tuple of (field_name, parent_path, path_segments) 

42 - field_name: The final field name 

43 - parent_path: The schema path to the parent properties object 

44 - path_segments: List of path segments for navigation 

45 """ 

46 # Remove leading slash and split 

47 segments = path.strip("/").split("/") 

48 

49 # For schema paths, we need to identify the field name 

50 # Paths typically alternate: properties/field/properties/nested/... 

51 if len(segments) < 2 or segments[0] != "properties": 

52 raise ValueError( 

53 f"Invalid schema path: {path}. Must start with /properties/" 

54 ) 

55 

56 field_name = segments[-1] 

57 

58 # Build parent path (everything except the last segment) 

59 if len(segments) > 2: 

60 parent_path = "/" + "/".join(segments[:-1]) 

61 else: 

62 parent_path = "/properties" 

63 

64 return field_name, parent_path, segments 

65 

66 def _parse_data_path(self, schema_path: str) -> str: 

67 """ 

68 Convert a schema path to a data path. 

69 

70 Args: 

71 schema_path: Schema JSON Pointer (e.g., '/properties/company/properties/name') 

72 

73 Returns: 

74 Data JSON Pointer (e.g., '/company/name') 

75 """ 

76 segments = schema_path.strip("/").split("/") 

77 # Remove 'properties' keywords 

78 data_segments = [seg for seg in segments if seg != "properties"] 

79 return "/" + "/".join(data_segments) if data_segments else "/" 

80 

81 def _get_schema_value_at_path(self, path: str) -> dict | list | None: 

82 """ 

83 Get the schema value at a given path. 

84 

85 Args: 

86 path: JSON Pointer path in the schema 

87 

88 Returns: 

89 The schema object at that path (dict, list, or None for missing paths) 

90 """ 

91 segments = path.strip("/").split("/") 

92 current = self.schema 

93 

94 for segment in segments: 

95 if segment and isinstance(current, dict): 

96 current = current.get(segment, {}) 

97 else: 

98 return {} 

99 

100 # Return the value as-is (could be dict, list, or other types) 

101 return current 

102 

103 def _build_field_schema( 

104 self, field_type: str, additional_schema: dict | None = None 

105 ) -> dict: 

106 """ 

107 Build a complete field schema from type and additional properties. 

108 

109 Args: 

110 field_type: The JSON Schema type 

111 additional_schema: Optional additional schema properties 

112 

113 Returns: 

114 Complete field schema dict 

115 """ 

116 schema = {"type": field_type} 

117 if additional_schema: 

118 schema.update(additional_schema) 

119 return schema 

120 

121 def _handle_required_field_move( 

122 self, 

123 from_required_path: str, 

124 from_field_name: str, 

125 to_required_path: str, 

126 to_field_name: str, 

127 schema_patches: list[JsonPatchOp], 

128 ) -> None: 

129 """ 

130 Handle updating required arrays when moving a required field. 

131 

132 Args: 

133 from_required_path: Path to source required array 

134 from_field_name: Name of field in source 

135 to_required_path: Path to destination required array 

136 to_field_name: Name of field in destination 

137 schema_patches: List to append patches to (modified in place) 

138 """ 

139 # Get source required array and check if field is required 

140 from_required_array = self._get_schema_value_at_path( 

141 from_required_path.strip("/") 

142 ) 

143 was_required = ( 

144 isinstance(from_required_array, list) 

145 and from_field_name in from_required_array 

146 ) 

147 

148 if not was_required: 

149 return 

150 

151 # Remove from source required array 

152 field_index = from_required_array.index(from_field_name) # type: ignore[union-attr] 

153 schema_patches.append( 

154 JsonPatchOp( 

155 op="remove", 

156 path=f"{from_required_path}/{field_index}", 

157 ) 

158 ) 

159 

160 # Add to destination required array 

161 to_required_array = self._get_schema_value_at_path(to_required_path.strip("/")) 

162 

163 if isinstance(to_required_array, list): 

164 # Required array exists, append to it 

165 schema_patches.append( 

166 JsonPatchOp( 

167 op="add", 

168 path=f"{to_required_path}/-", 

169 value=to_field_name, 

170 ) 

171 ) 

172 else: 

173 # Required array doesn't exist, create it 

174 schema_patches.append( 

175 JsonPatchOp( 

176 op="add", 

177 path=to_required_path, 

178 value=[to_field_name], 

179 ) 

180 ) 

181 

182 def add_optional_field(self, request: AddOptionalFieldRequest) -> DocPatches: 

183 """ 

184 Add a new optional field to a JSON schema at the specified path. 

185 

186 Supports nested paths like '/properties/company/properties/newField'. 

187 """ 

188 field_name, parent_path, _ = self._parse_schema_path(request.path) 

189 

190 # Build the complete schema for the new field 

191 field_schema = self._build_field_schema( 

192 request.field_type, request.field_schema 

193 ) 

194 field_schema["nullable"] = True 

195 

196 return DocPatches( 

197 schema_patches=[ 

198 JsonPatchOp( 

199 op="add", 

200 path=request.path, 

201 value=field_schema, 

202 ) 

203 ] 

204 ) 

205 

206 def add_required_field(self, request: AddRequiredFieldRequest) -> DocPatches: 

207 """ 

208 Add a new required field to a JSON schema and update existing documents. 

209 

210 Supports nested paths and automatically updates the appropriate required array. 

211 Creates the required array if it doesn't exist. 

212 """ 

213 field_name, parent_path, segments = self._parse_schema_path(request.path) 

214 

215 # Build the complete schema for the new field 

216 field_schema = self._build_field_schema( 

217 request.field_type, request.field_schema 

218 ) 

219 

220 # Determine the required array path based on nesting level 

221 # For /properties/field -> /required 

222 # For /properties/company/properties/field -> /properties/company/required 

223 if len(segments) == 2: # Top-level field 

224 required_path = "/required" 

225 else: 

226 # Nested field - get parent object path and add /required 

227 parent_object_path = "/" + "/".join(segments[:-2]) 

228 required_path = f"{parent_object_path}/required" 

229 

230 # Get data path for updating documents 

231 data_path = self._parse_data_path(request.path) 

232 

233 schema_patches = [ 

234 JsonPatchOp( 

235 op="add", 

236 path=request.path, 

237 value=field_schema, 

238 ) 

239 ] 

240 

241 # Check if required array exists 

242 required_array = self._get_schema_value_at_path(required_path.strip("/")) 

243 if isinstance(required_array, list): 

244 # Required array exists, append to it 

245 schema_patches.append( 

246 JsonPatchOp( 

247 op="add", 

248 path=f"{required_path}/-", 

249 value=field_name, 

250 ) 

251 ) 

252 else: 

253 # Required array doesn't exist, create it with the field 

254 schema_patches.append( 

255 JsonPatchOp( 

256 op="add", 

257 path=required_path, 

258 value=[field_name], 

259 ) 

260 ) 

261 

262 return DocPatches( 

263 schema_patches=schema_patches, 

264 data_patches=[ 

265 JsonPatchOp( 

266 op="add", 

267 path=data_path, 

268 value=request.default_value, 

269 ) 

270 ], 

271 ) 

272 

273 def delete_field(self, request: DeleteFieldRequest) -> DocPatches: 

274 """ 

275 Delete a field from a JSON schema and existing documents. 

276 

277 Supports nested paths and removes from the appropriate required array if present. 

278 """ 

279 field_name, parent_path, segments = self._parse_schema_path(request.path) 

280 

281 # Determine the required array path 

282 if len(segments) == 2: # Top-level field 

283 required_path = "/required" 

284 else: 

285 parent_object_path = "/" + "/".join(segments[:-2]) 

286 required_path = f"{parent_object_path}/required" 

287 

288 data_path = self._parse_data_path(request.path) 

289 

290 schema_patches = [JsonPatchOp(op="remove", path=request.path)] 

291 

292 # Only try to remove from required array if it exists and contains the field 

293 required_array = self._get_schema_value_at_path(required_path.strip("/")) 

294 if isinstance(required_array, list) and field_name in required_array: 

295 # Find the index of the field in the required array 

296 field_index = required_array.index(field_name) 

297 schema_patches.append( 

298 JsonPatchOp( 

299 op="remove", 

300 path=f"{required_path}/{field_index}", 

301 ) 

302 ) 

303 

304 return DocPatches( 

305 schema_patches=schema_patches, 

306 data_patches=[JsonPatchOp(op="remove", path=data_path)], 

307 ) 

308 

309 def rename_field(self, request: RenameFieldRequest) -> DocPatches: 

310 """ 

311 Rename a field in a JSON schema and existing documents. 

312 

313 Both old and new paths must be at the same nesting level. 

314 """ 

315 old_field_name, old_parent_path, old_segments = self._parse_schema_path( 

316 request.old_path 

317 ) 

318 new_field_name, new_parent_path, new_segments = self._parse_schema_path( 

319 request.new_path 

320 ) 

321 

322 # Validate that paths are at the same level 

323 if old_parent_path != new_parent_path: 

324 raise ValueError( 

325 f"Cannot rename across different parent paths. Old: {old_parent_path}, New: {new_parent_path}" 

326 ) 

327 

328 # Get the existing field schema 

329 old_field_schema = self._get_schema_value_at_path(request.old_path) 

330 

331 # Determine required array path 

332 if len(old_segments) == 2: 

333 required_path = "/required" 

334 else: 

335 parent_object_path = "/" + "/".join(old_segments[:-2]) 

336 required_path = f"{parent_object_path}/required" 

337 

338 # Get required array and update it 

339 required_array = self._get_schema_value_at_path(required_path.strip("/")) 

340 if isinstance(required_array, list): 

341 updated_required = [ 

342 new_field_name if f == old_field_name else f for f in required_array 

343 ] 

344 else: 

345 updated_required = None 

346 

347 old_data_path = self._parse_data_path(request.old_path) 

348 new_data_path = self._parse_data_path(request.new_path) 

349 

350 schema_patches = [ 

351 JsonPatchOp( 

352 op="add", 

353 path=request.new_path, 

354 value=old_field_schema, 

355 ), 

356 JsonPatchOp(op="remove", path=request.old_path), 

357 ] 

358 

359 # Only update required array if it exists and the field was in it 

360 if updated_required is not None: 

361 schema_patches.append( 

362 JsonPatchOp( 

363 op="replace", 

364 path=required_path, 

365 value=updated_required, 

366 ) 

367 ) 

368 

369 return DocPatches( 

370 schema_patches=schema_patches, 

371 data_patches=[ 

372 JsonPatchOp( 

373 op="move", 

374 path=new_data_path, 

375 **{"from": old_data_path}, 

376 ), 

377 ], 

378 ) 

379 

380 def move_field(self, request: MoveFieldRequest) -> DocPatches: 

381 """ 

382 Move a field to a different location in the schema. 

383 

384 This is useful for restructuring schemas, like moving a top-level field 

385 into a nested object. 

386 """ 

387 from_field_name, from_parent_path, from_segments = self._parse_schema_path( 

388 request.from_path 

389 ) 

390 to_field_name, to_parent_path, to_segments = self._parse_schema_path( 

391 request.to_path 

392 ) 

393 

394 # Get the field schema from the old location 

395 field_schema = self._get_schema_value_at_path(request.from_path) 

396 

397 # Determine required array paths 

398 if len(from_segments) == 2: 

399 from_required_path = "/required" 

400 else: 

401 from_parent_object_path = "/" + "/".join(from_segments[:-2]) 

402 from_required_path = f"{from_parent_object_path}/required" 

403 

404 if len(to_segments) == 2: 

405 to_required_path = "/required" 

406 else: 

407 to_parent_object_path = "/" + "/".join(to_segments[:-2]) 

408 to_required_path = f"{to_parent_object_path}/required" 

409 

410 schema_patches = [ 

411 JsonPatchOp(op="add", path=request.to_path, value=field_schema), 

412 JsonPatchOp(op="remove", path=request.from_path), 

413 ] 

414 

415 # Handle required array updates if field was required 

416 self._handle_required_field_move( 

417 from_required_path, 

418 from_field_name, 

419 to_required_path, 

420 to_field_name, 

421 schema_patches, 

422 ) 

423 

424 from_data_path = self._parse_data_path(request.from_path) 

425 to_data_path = self._parse_data_path(request.to_path) 

426 

427 return DocPatches( 

428 schema_patches=schema_patches, 

429 data_patches=[ 

430 JsonPatchOp( 

431 op="move", 

432 path=to_data_path, 

433 **{"from": from_data_path}, 

434 ), 

435 ], 

436 )