Coverage for postrfp/ref/service/reference_validator.py: 89%

85 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2JSON Pointer reference validation against JSON Schema. 

3 

4This module provides validation for JSON Pointer expressions used in ContentQElementPair 

5mappings, ensuring they reference valid paths within a ContentSpec's JSON Schema. 

6""" 

7 

8from typing import Optional 

9from difflib import get_close_matches 

10 

11from postrfp.model.ref import ContentSpec 

12 

13 

14class ContentReferenceValidator: 

15 """ 

16 Validates JSON Pointer references against a JSON Schema. 

17 

18 This validator extracts all valid paths from a JSON Schema and provides 

19 efficient batch validation of JSON Pointer references against those paths. 

20 

21 Usage: 

22 validator = ContentReferenceValidator(content_spec) 

23 references = ["/company/name", "/sla/uptime", "/invalid/path"] 

24 valid, invalid = validator.validate_references_batch(references) 

25 """ 

26 

27 def __init__(self, content_spec: ContentSpec): 

28 """ 

29 Initialize validator with a ContentSpec. 

30 

31 Args: 

32 content_spec: The ContentSpec containing the JSON Schema to validate against 

33 """ 

34 self.spec = content_spec 

35 self._valid_paths: Optional[set[str]] = None 

36 

37 def get_valid_paths(self) -> set[str]: 

38 """ 

39 Extract all valid JSON Pointer paths from the schema (cached). 

40 

41 Returns: 

42 Set of all valid JSON Pointer paths in the schema 

43 """ 

44 if self._valid_paths is not None: 

45 return self._valid_paths 

46 

47 self._valid_paths = self._extract_paths_from_schema(self.spec.spec_doc) 

48 return self._valid_paths 

49 

50 def _extract_paths_from_schema(self, schema: dict, prefix: str = "") -> set[str]: 

51 """ 

52 Recursively extract all JSON Pointer paths from a JSON Schema. 

53 

54 Handles: 

55 - Objects with properties 

56 - Arrays with items 

57 - Required vs optional fields (all paths are valid for references) 

58 - Nested structures 

59 - oneOf/anyOf/allOf combinators 

60 - $ref references to definitions 

61 

62 Args: 

63 schema: JSON Schema or sub-schema 

64 prefix: Current path prefix for recursive extraction 

65 

66 Returns: 

67 Set of all JSON Pointer paths in this schema 

68 """ 

69 paths = set() 

70 

71 # Handle $ref - resolve the reference 

72 if "$ref" in schema: 

73 ref_path = schema["$ref"] 

74 resolved_schema = self._resolve_ref(ref_path) 

75 if resolved_schema: 

76 paths.update(self._extract_paths_from_schema(resolved_schema, prefix)) 

77 return paths 

78 

79 schema_type = schema.get("type") 

80 

81 # Handle object types with properties 

82 if schema_type == "object" and "properties" in schema: 

83 for prop_name, prop_schema in schema["properties"].items(): 

84 path = f"{prefix}/{prop_name}" 

85 paths.add(path) 

86 # Recurse into nested structures 

87 paths.update(self._extract_paths_from_schema(prop_schema, path)) 

88 

89 # Handle array types 

90 elif schema_type == "array" and "items" in schema: 

91 # Arrays support any numeric index 

92 # We add /0 as the canonical pattern for array item access 

93 array_item_path = f"{prefix}/0" 

94 paths.add(array_item_path) 

95 # Recurse into array items 

96 paths.update( 

97 self._extract_paths_from_schema(schema["items"], array_item_path) 

98 ) 

99 

100 # Handle schema combinators (oneOf, anyOf, allOf) 

101 # These allow multiple valid schemas, so we extract paths from all 

102 for combinator in ["oneOf", "anyOf", "allOf"]: 

103 if combinator in schema: 

104 for sub_schema in schema[combinator]: 

105 paths.update(self._extract_paths_from_schema(sub_schema, prefix)) 

106 

107 return paths 

108 

109 def _resolve_ref(self, ref_path: str) -> Optional[dict]: 

110 """ 

111 Resolve a $ref reference within the schema. 

112 

113 Supports references like: 

114 - "#/$defs/executive" 

115 - "#/definitions/insurance_policy" 

116 

117 Args: 

118 ref_path: The $ref path to resolve 

119 

120 Returns: 

121 The referenced schema, or None if not found 

122 """ 

123 if not ref_path.startswith("#/"): 

124 # External references not supported 

125 return None 

126 

127 # Remove the leading "#/" and split by "/" 

128 path_parts = ref_path[2:].split("/") 

129 

130 # Navigate the schema document 

131 current = self.spec.spec_doc 

132 for part in path_parts: 

133 if isinstance(current, dict) and part in current: 

134 current = current[part] 

135 else: 

136 return None 

137 

138 return current if isinstance(current, dict) else None 

139 

140 def validate_references_batch( 

141 self, references: list[str] 

142 ) -> tuple[set[str], dict[str, str]]: 

143 """ 

144 Batch validate multiple JSON Pointer references. 

145 

146 This method uses set operations for efficient validation of multiple 

147 references at once. It also handles array index normalization. 

148 

149 Args: 

150 references: List of JSON Pointer strings to validate 

151 

152 Returns: 

153 Tuple of (valid_refs, invalid_refs_with_errors): 

154 - valid_refs: Set of references that are valid 

155 - invalid_refs_with_errors: Dict mapping invalid refs to error messages 

156 """ 

157 valid_paths = self.get_valid_paths() 

158 reference_set = set(references) 

159 

160 # Simple set intersection for exact matches - O(n) operation 

161 valid_refs = reference_set & valid_paths 

162 invalid_refs = reference_set - valid_paths 

163 

164 # Check array patterns for references that didn't match exactly 

165 validated_array_refs = set() 

166 still_invalid = {} 

167 

168 for ref in invalid_refs: 

169 if self._is_valid_array_reference(ref, valid_paths): 

170 validated_array_refs.add(ref) 

171 else: 

172 # Generate helpful error message with suggestions 

173 still_invalid[ref] = self._get_error_message(ref, valid_paths) 

174 

175 return valid_refs | validated_array_refs, still_invalid 

176 

177 def validate_pairs(self, pairs: list) -> tuple[bool, list[str]]: 

178 """ 

179 Validate content_reference fields for a list of pairs. 

180 

181 This is a convenience method for validating ContentQElementPairDocument 

182 objects, building error messages that include question_element_id context. 

183 

184 Args: 

185 pairs: List of objects with content_reference and question_element_id attributes 

186 

187 Returns: 

188 Tuple of (is_valid, error_messages): 

189 - is_valid: True if all references are valid 

190 - error_messages: List of formatted error strings (empty if valid) 

191 """ 

192 if not pairs: 

193 return True, [] 

194 

195 # Extract references and validate in batch 

196 references = [pair.content_reference for pair in pairs] 

197 valid_refs, invalid_refs = self.validate_references_batch(references) 

198 

199 if not invalid_refs: 

200 return True, [] 

201 

202 # Build error messages with context 

203 error_messages = [ 

204 f"Question element {pair.question_element_id}: {invalid_refs[pair.content_reference]}" 

205 for pair in pairs 

206 if pair.content_reference in invalid_refs 

207 ] 

208 

209 return False, error_messages 

210 

211 def _is_valid_array_reference(self, ref: str, valid_paths: set[str]) -> bool: 

212 """ 

213 Check if a reference is to a valid array element. 

214 

215 Array references with any numeric index are valid if the /0 pattern 

216 exists in the schema. 

217 

218 Examples: 

219 /items/5 is valid if /items/0 exists in valid_paths 

220 /data/records/42/name is valid if /data/records/0/name exists 

221 

222 Args: 

223 ref: JSON Pointer reference to validate 

224 valid_paths: Set of known valid paths from schema 

225 

226 Returns: 

227 True if the reference is a valid array access pattern 

228 """ 

229 # Replace any numeric index with /0 to check the canonical pattern 

230 parts = ref.split("/") 

231 normalized_parts = [] 

232 

233 for part in parts: 

234 if part and part.isdigit(): 

235 normalized_parts.append("0") 

236 else: 

237 normalized_parts.append(part) 

238 

239 normalized = "/".join(normalized_parts) 

240 return normalized in valid_paths 

241 

242 def _get_error_message(self, ref: str, valid_paths: set[str]) -> str: 

243 """ 

244 Generate helpful error message with path suggestions. 

245 

246 Uses difflib to find similar paths that might have been intended, 

247 helping users correct typos or understand the schema structure. 

248 

249 Args: 

250 ref: The invalid reference 

251 valid_paths: Set of valid paths for suggestions 

252 

253 Returns: 

254 Error message string with suggestions if available 

255 """ 

256 suggestions = get_close_matches(ref, valid_paths, n=3, cutoff=0.6) 

257 

258 if suggestions: 

259 return f"Path not found in schema. Did you mean: {', '.join(suggestions)}?" 

260 return "Path not found in schema" 

261 

262 def validate_reference(self, reference: str) -> tuple[bool, Optional[str]]: 

263 """ 

264 Validate a single JSON Pointer reference. 

265 

266 Convenience method for validating individual references. 

267 For multiple references, use validate_references_batch() for better performance. 

268 

269 Args: 

270 reference: JSON Pointer string to validate 

271 

272 Returns: 

273 Tuple of (is_valid, error_message): 

274 - is_valid: True if reference is valid 

275 - error_message: None if valid, error string if invalid 

276 """ 

277 valid_refs, invalid_refs = self.validate_references_batch([reference]) 

278 

279 if reference in valid_refs: 

280 return True, None 

281 else: 

282 return False, invalid_refs.get(reference, "Path not found in schema")