Coverage for postrfp/ref/service/reference_validator.py: 89%
85 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2JSON Pointer reference validation against JSON Schema.
4This module provides validation for JSON Pointer expressions used in ContentQElementPair
5mappings, ensuring they reference valid paths within a ContentSpec's JSON Schema.
6"""
8from typing import Optional
9from difflib import get_close_matches
11from postrfp.model.ref import ContentSpec
14class ContentReferenceValidator:
15 """
16 Validates JSON Pointer references against a JSON Schema.
18 This validator extracts all valid paths from a JSON Schema and provides
19 efficient batch validation of JSON Pointer references against those paths.
21 Usage:
22 validator = ContentReferenceValidator(content_spec)
23 references = ["/company/name", "/sla/uptime", "/invalid/path"]
24 valid, invalid = validator.validate_references_batch(references)
25 """
27 def __init__(self, content_spec: ContentSpec):
28 """
29 Initialize validator with a ContentSpec.
31 Args:
32 content_spec: The ContentSpec containing the JSON Schema to validate against
33 """
34 self.spec = content_spec
35 self._valid_paths: Optional[set[str]] = None
37 def get_valid_paths(self) -> set[str]:
38 """
39 Extract all valid JSON Pointer paths from the schema (cached).
41 Returns:
42 Set of all valid JSON Pointer paths in the schema
43 """
44 if self._valid_paths is not None:
45 return self._valid_paths
47 self._valid_paths = self._extract_paths_from_schema(self.spec.spec_doc)
48 return self._valid_paths
50 def _extract_paths_from_schema(self, schema: dict, prefix: str = "") -> set[str]:
51 """
52 Recursively extract all JSON Pointer paths from a JSON Schema.
54 Handles:
55 - Objects with properties
56 - Arrays with items
57 - Required vs optional fields (all paths are valid for references)
58 - Nested structures
59 - oneOf/anyOf/allOf combinators
60 - $ref references to definitions
62 Args:
63 schema: JSON Schema or sub-schema
64 prefix: Current path prefix for recursive extraction
66 Returns:
67 Set of all JSON Pointer paths in this schema
68 """
69 paths = set()
71 # Handle $ref - resolve the reference
72 if "$ref" in schema:
73 ref_path = schema["$ref"]
74 resolved_schema = self._resolve_ref(ref_path)
75 if resolved_schema:
76 paths.update(self._extract_paths_from_schema(resolved_schema, prefix))
77 return paths
79 schema_type = schema.get("type")
81 # Handle object types with properties
82 if schema_type == "object" and "properties" in schema:
83 for prop_name, prop_schema in schema["properties"].items():
84 path = f"{prefix}/{prop_name}"
85 paths.add(path)
86 # Recurse into nested structures
87 paths.update(self._extract_paths_from_schema(prop_schema, path))
89 # Handle array types
90 elif schema_type == "array" and "items" in schema:
91 # Arrays support any numeric index
92 # We add /0 as the canonical pattern for array item access
93 array_item_path = f"{prefix}/0"
94 paths.add(array_item_path)
95 # Recurse into array items
96 paths.update(
97 self._extract_paths_from_schema(schema["items"], array_item_path)
98 )
100 # Handle schema combinators (oneOf, anyOf, allOf)
101 # These allow multiple valid schemas, so we extract paths from all
102 for combinator in ["oneOf", "anyOf", "allOf"]:
103 if combinator in schema:
104 for sub_schema in schema[combinator]:
105 paths.update(self._extract_paths_from_schema(sub_schema, prefix))
107 return paths
109 def _resolve_ref(self, ref_path: str) -> Optional[dict]:
110 """
111 Resolve a $ref reference within the schema.
113 Supports references like:
114 - "#/$defs/executive"
115 - "#/definitions/insurance_policy"
117 Args:
118 ref_path: The $ref path to resolve
120 Returns:
121 The referenced schema, or None if not found
122 """
123 if not ref_path.startswith("#/"):
124 # External references not supported
125 return None
127 # Remove the leading "#/" and split by "/"
128 path_parts = ref_path[2:].split("/")
130 # Navigate the schema document
131 current = self.spec.spec_doc
132 for part in path_parts:
133 if isinstance(current, dict) and part in current:
134 current = current[part]
135 else:
136 return None
138 return current if isinstance(current, dict) else None
140 def validate_references_batch(
141 self, references: list[str]
142 ) -> tuple[set[str], dict[str, str]]:
143 """
144 Batch validate multiple JSON Pointer references.
146 This method uses set operations for efficient validation of multiple
147 references at once. It also handles array index normalization.
149 Args:
150 references: List of JSON Pointer strings to validate
152 Returns:
153 Tuple of (valid_refs, invalid_refs_with_errors):
154 - valid_refs: Set of references that are valid
155 - invalid_refs_with_errors: Dict mapping invalid refs to error messages
156 """
157 valid_paths = self.get_valid_paths()
158 reference_set = set(references)
160 # Simple set intersection for exact matches - O(n) operation
161 valid_refs = reference_set & valid_paths
162 invalid_refs = reference_set - valid_paths
164 # Check array patterns for references that didn't match exactly
165 validated_array_refs = set()
166 still_invalid = {}
168 for ref in invalid_refs:
169 if self._is_valid_array_reference(ref, valid_paths):
170 validated_array_refs.add(ref)
171 else:
172 # Generate helpful error message with suggestions
173 still_invalid[ref] = self._get_error_message(ref, valid_paths)
175 return valid_refs | validated_array_refs, still_invalid
177 def validate_pairs(self, pairs: list) -> tuple[bool, list[str]]:
178 """
179 Validate content_reference fields for a list of pairs.
181 This is a convenience method for validating ContentQElementPairDocument
182 objects, building error messages that include question_element_id context.
184 Args:
185 pairs: List of objects with content_reference and question_element_id attributes
187 Returns:
188 Tuple of (is_valid, error_messages):
189 - is_valid: True if all references are valid
190 - error_messages: List of formatted error strings (empty if valid)
191 """
192 if not pairs:
193 return True, []
195 # Extract references and validate in batch
196 references = [pair.content_reference for pair in pairs]
197 valid_refs, invalid_refs = self.validate_references_batch(references)
199 if not invalid_refs:
200 return True, []
202 # Build error messages with context
203 error_messages = [
204 f"Question element {pair.question_element_id}: {invalid_refs[pair.content_reference]}"
205 for pair in pairs
206 if pair.content_reference in invalid_refs
207 ]
209 return False, error_messages
211 def _is_valid_array_reference(self, ref: str, valid_paths: set[str]) -> bool:
212 """
213 Check if a reference is to a valid array element.
215 Array references with any numeric index are valid if the /0 pattern
216 exists in the schema.
218 Examples:
219 /items/5 is valid if /items/0 exists in valid_paths
220 /data/records/42/name is valid if /data/records/0/name exists
222 Args:
223 ref: JSON Pointer reference to validate
224 valid_paths: Set of known valid paths from schema
226 Returns:
227 True if the reference is a valid array access pattern
228 """
229 # Replace any numeric index with /0 to check the canonical pattern
230 parts = ref.split("/")
231 normalized_parts = []
233 for part in parts:
234 if part and part.isdigit():
235 normalized_parts.append("0")
236 else:
237 normalized_parts.append(part)
239 normalized = "/".join(normalized_parts)
240 return normalized in valid_paths
242 def _get_error_message(self, ref: str, valid_paths: set[str]) -> str:
243 """
244 Generate helpful error message with path suggestions.
246 Uses difflib to find similar paths that might have been intended,
247 helping users correct typos or understand the schema structure.
249 Args:
250 ref: The invalid reference
251 valid_paths: Set of valid paths for suggestions
253 Returns:
254 Error message string with suggestions if available
255 """
256 suggestions = get_close_matches(ref, valid_paths, n=3, cutoff=0.6)
258 if suggestions:
259 return f"Path not found in schema. Did you mean: {', '.join(suggestions)}?"
260 return "Path not found in schema"
262 def validate_reference(self, reference: str) -> tuple[bool, Optional[str]]:
263 """
264 Validate a single JSON Pointer reference.
266 Convenience method for validating individual references.
267 For multiple references, use validate_references_batch() for better performance.
269 Args:
270 reference: JSON Pointer string to validate
272 Returns:
273 Tuple of (is_valid, error_message):
274 - is_valid: True if reference is valid
275 - error_message: None if valid, error string if invalid
276 """
277 valid_refs, invalid_refs = self.validate_references_batch([reference])
279 if reference in valid_refs:
280 return True, None
281 else:
282 return False, invalid_refs.get(reference, "Path not found in schema")