Coverage for postrfp / ref / service / content_service.py: 98%

83 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from typing import TypeVar 

2 

3 

4from jsonpath.patch import JSONPatch 

5from sqlalchemy import Row 

6from sqlalchemy.orm import Session 

7from sqlalchemy.orm.attributes import flag_modified 

8 

9 

10from postrfp.model.tags import Tag 

11from postrfp.model.audit import AuditEvent, evt_types 

12from postrfp.model.humans import User 

13from postrfp.model.ref import ( 

14 ContentQElementPair, 

15 ContentRevision, 

16 ContentSpecMap, 

17 ContentSpec, 

18 Content, 

19 Subject, 

20) 

21from postrfp.shared.exceptions import UpdateConflict 

22from postrfp.shared.serial.refmodels import ( 

23 ContentDocument, 

24 JsonPatchOp, 

25 TagSummary, 

26 SubjectSummary, 

27) 

28from postrfp.ref.service.helpers import fetch_related_items 

29 

30 

31T = TypeVar("T") 

32 

33 

34def _update_content_tags( 

35 session: Session, 

36 content: Content, 

37 tag_refs: list[int] | list[TagSummary] | None, 

38 replace: bool = False, 

39) -> None: 

40 if tag_refs is not None: 

41 # Extract IDs from TagSummary objects or use IDs directly 

42 if tag_refs and isinstance(tag_refs[0], TagSummary): 

43 tag_ids = [tag.id for tag in tag_refs if isinstance(tag, TagSummary)] 

44 else: 

45 tag_ids = tag_refs # type: ignore 

46 

47 tags: list[Tag] = fetch_related_items(session, Tag, tag_ids, "Tags") 

48 if replace: 

49 content.tags = tags 

50 else: 

51 content.tags.extend(tags) 

52 

53 

54def _update_content_subjects( 

55 session: Session, 

56 content: Content, 

57 subject_refs: list[int] | list[SubjectSummary] | None, 

58 replace: bool = False, 

59) -> None: 

60 if subject_refs is not None: 

61 # Extract IDs from SubjectSummary objects or use IDs directly 

62 if subject_refs and isinstance(subject_refs[0], SubjectSummary): 

63 subject_ids = [ 

64 subject.id 

65 for subject in subject_refs 

66 if isinstance(subject, SubjectSummary) 

67 ] 

68 else: 

69 subject_ids = subject_refs # type: ignore 

70 

71 subjects: list[Subject] = fetch_related_items( 

72 session, Subject, subject_ids, "Subjects" 

73 ) 

74 if replace: 

75 content.subjects = subjects 

76 else: 

77 content.subjects.extend(subjects) 

78 

79 

80def create_content( 

81 session: Session, 

82 content_doc: ContentDocument, 

83 author_org_id: str, 

84 created_by_id: str, 

85) -> Content: 

86 """ 

87 Create a new Content item from a ContentDocument 

88 

89 Args: 

90 session: SQLAlchemy session 

91 content_doc: ContentDocument with content values 

92 author_org_id: Organization ID that owns this content 

93 created_by_id: User ID creating the content 

94 

95 Returns: 

96 Newly created Content object 

97 """ 

98 

99 # Create new content 

100 new_content = Content( 

101 title=content_doc.title, 

102 content_doc=content_doc.content_doc, 

103 schema_id=content_doc.schema_id, 

104 auth_policy=content_doc.auth_policy, 

105 author_org_id=author_org_id, 

106 last_updated_by_id=created_by_id, 

107 primary_subject_id=content_doc.primary_subject_id, 

108 ) 

109 

110 # Add tags if provided 

111 _update_content_tags(session, new_content, content_doc.tags, replace=False) 

112 

113 # Add subjects if provided 

114 _update_content_subjects(session, new_content, content_doc.subjects, replace=False) 

115 

116 session.add(new_content) 

117 session.flush() 

118 

119 # Validate *after* flush so the ContentSpec relationship is loaded 

120 new_content.jsonschema_validate(content_doc.content_doc, raise_on_error=False) 

121 return new_content 

122 

123 

124def update_content( 

125 session: Session, 

126 content: Content, 

127 content_doc: ContentDocument, 

128 updated_by_id: str, 

129) -> Content: 

130 """ 

131 Update a Content item with values from a ContentDocument 

132 

133 Args: 

134 session: SQLAlchemy session 

135 content: Content object to update 

136 content_doc: ContentDocument with new values 

137 updated_by_id: User ID performing the update 

138 

139 Returns: 

140 Updated Content object 

141 """ 

142 # Validate the content against the ContentSpect schema 

143 content.jsonschema_validate(content_doc.content_doc, raise_on_error=False) 

144 

145 # Update basic fields 

146 content.title = content_doc.title 

147 if content_doc.content_doc is not None: 

148 content.content_doc = content_doc.content_doc 

149 content.last_updated_by_id = updated_by_id 

150 content.auth_policy = content_doc.auth_policy 

151 

152 if content_doc.primary_subject_id: 

153 content.primary_subject_id = content_doc.primary_subject_id 

154 

155 # Update tags if provided (replace existing) 

156 _update_content_tags(session, content, content_doc.tags, replace=True) 

157 

158 # Update subjects if provided (replace existing) 

159 _update_content_subjects(session, content, content_doc.subjects, replace=True) 

160 

161 return content 

162 

163 

164def patch_content_doc( 

165 session: Session, 

166 content: Content, 

167 json_patch_dicts: list[JsonPatchOp], 

168 if_match: str | None, 

169 updated_by_user: User, 

170 comment: str | None = None, 

171 skip_etag_check: bool = False, 

172) -> None: 

173 """ 

174 Apply a JSONPatch to a Content item 

175 

176 Args: 

177 session: SQLAlchemy session 

178 content: Content object to patch 

179 json_patch_dicts: List of JSONPatch operation dicts 

180 if_match: ETag for optimistic locking (optional) 

181 updated_by_user: User performing the update 

182 comment: Optional comment describing the change 

183 skip_etag_check: Skip ETag validation (for schema migrations) 

184 """ 

185 

186 # Check ETag match if provided and not skipping 

187 if not skip_etag_check and if_match and if_match != content.etag: 

188 raise UpdateConflict( 

189 "ETag mismatch: content has been modified since last retrieval" 

190 ) 

191 

192 # Save the original version number for the revision record 

193 original_version_number = content.version 

194 

195 patches = [p.model_dump() for p in json_patch_dicts] 

196 

197 json_patch = JSONPatch(patches) 

198 # Apply the JSONPatch to the content document, in-place modification 

199 json_patch.apply(content.content_doc) 

200 

201 # Validate the patched content against the ContentSpec schema 

202 content.jsonschema_validate(content.content_doc) 

203 # Update the content document and last updated by 

204 flag_modified(content, "content_doc") 

205 content.last_updated_by_id = updated_by_user.id 

206 

207 revision = ContentRevision( 

208 content_id=content.id, 

209 entity_type="Document", 

210 user_id=updated_by_user.id, 

211 patch_operations=json_patch.asdicts(), 

212 version_number=original_version_number, 

213 comment=comment, 

214 ) 

215 # Add the revision to the Content's revisions relationship 

216 # thus adding to the session and populating the revisions list 

217 # for testing convenience 

218 content.revisions.append(revision) 

219 # Get the ID number of Revision for the audit event 

220 session.flush() 

221 

222 updated_event = AuditEvent.create( 

223 session, 

224 evt_types.REF_CONTENT_UPDATED, 

225 object_id=revision.id, 

226 user_id=updated_by_user.id, 

227 org_id=updated_by_user.org_id, 

228 ) 

229 

230 session.add(updated_event) 

231 

232 

233def fetch_answer_references( 

234 content_spec_map: ContentSpecMap, issue_id: int 

235) -> list[Row[tuple[str, int, str]]]: 

236 """ 

237 Get answer, element_id, content_reference tuples for the given issue and ContentSpecMap. 

238 """ 

239 from postrfp.model.questionnaire.answering import Answer 

240 from postrfp.model.questionnaire.qelements import QElement 

241 

242 session = Session.object_session(content_spec_map) 

243 if session is None: 

244 raise ValueError("content_spec_map is not attached to a session") 

245 return ( 

246 session.query( 

247 Answer.answer, Answer.element_id, ContentQElementPair.content_reference 

248 ) 

249 .join(QElement, QElement.id == Answer.element_id) 

250 .join(ContentQElementPair) 

251 .filter( 

252 ContentQElementPair.content_map_id == content_spec_map.id, 

253 Answer.issue_id == issue_id, 

254 ) 

255 .all() 

256 ) 

257 

258 

259def fetch_content_spec(session: Session, content_spec_id: int) -> ContentSpec: 

260 """ 

261 Retrieve a ContentSpec by ID, raising an error if not found. 

262 (useful for mocking in tests) 

263 """ 

264 

265 return session.get_one(ContentSpec, content_spec_id) 

266 

267 

268def jsonpatch_from_answers( 

269 session: Session, content_specmap_id: int, issue_id: int 

270) -> JSONPatch: 

271 """ 

272 Build a JSONPatch (collection of patches) object for the given ContentSpecMap and Issue 

273 """ 

274 content_spec_map = session.get_one(ContentSpecMap, content_specmap_id) 

275 

276 json_patch = JSONPatch() 

277 

278 for answer, element_id, reference in fetch_answer_references( 

279 content_spec_map, issue_id 

280 ): 

281 trimmed_answer = answer.strip() if isinstance(answer, str) else answer 

282 if trimmed_answer: # Skip empty answers 

283 json_patch.add(reference, trimmed_answer) 

284 

285 return json_patch 

286 

287 

288""" 

289FUTURE OPTIMIZATION: Progressive authorization filtering for content search. 

290 

291def get_contents_with_progressive_authorization_filtering( 

292 session: Session, 

293 user: User, 

294 q_name: str | None = None, 

295 q_spec_id: int | None = None, 

296 target_count: int = 50, 

297 batch_size: int = 50, 

298) -> list[Content]: # type: ignore 

299 \"\"\" 

300 

301 This approach uses "online learning" during search execution to progressively 

302 eliminate content patterns that fail authorization, dramatically reducing 

303 CEL evaluations for large result sets. 

304 

305 The Algorithm: 

306 1. Fetch initial batch of search results 

307 2. Evaluate CEL authorization on each item 

308 3. For items that fail authorization, record their "auth_criteria_hash" 

309 4. On next batch, exclude items with known-failing hash patterns 

310 5. Repeat until we have enough authorized results 

311 

312 Performance Benefits: 

313 - Reduces CEL evaluations from O(total_results) to O(unique_auth_patterns) 

314 - Learns authorization patterns on-the-fly (no pre-caching required) 

315 - Especially effective when many items share common failing criteria 

316 

317 Example Scenario: 

318 - Search for "budget" returns 1000 items 

319 - 400 items have tag "classified" which user can't access 

320 - After evaluating ~50 items, we learn this pattern and exclude remaining 350 

321 - Total CEL evaluations: ~150 instead of 1000 

322 

323 Prerequisites for Implementation: 

324 - Add auth_criteria_hash column to Content table: 

325 ALTER TABLE ref_contents ADD COLUMN auth_criteria_hash VARCHAR(64) INDEX; 

326 - Populate hash on content create/update based on authorization-relevant attributes: 

327 {author_org_id, visibility, subject_types, tags, etc.} 

328 

329 For possible policy hashing: 

330 - Add policy_hash VARCHAR(64) INDEX column to Content table 

331 - Populate on save: hashlib.sha256(policy_text.encode()).hexdigest() 

332 - Track failed (auth_criteria_hash, policy_hash) tuples 

333 - Use compound SQL filtering for both hash types 

334 

335 \"\"\" 

336 authorized_results: list[Content] = [] 

337 failed_authorization_hashes = set() # type: ignore 

338 failed_policy_patterns = set() # type: ignore 

339 offset = 0 

340 

341 while len(authorized_results) < target_count: 

342 # Build base query with search filters 

343 query = session.query(Content).order_by(Content.date_updated.desc()) 

344 

345 if q_name: 

346 query = query.filter(Content.title.ilike(f"%{q_name}%")) 

347 if q_spec_id: 

348 query = query.filter(Content.schema_id == q_spec_id) 

349 

350 # OPTIMIZATION: Exclude content with auth patterns we know will fail 

351 # This is the key performance improvement - each failed CEL evaluation 

352 # eliminates entire classes of future content from consideration 

353 if failed_authorization_hashes: 

354 query = query.filter( 

355 ~Content.auth_criteria_hash.in_(failed_authorization_hashes) # type: ignore 

356 ) 

357 

358 # Fetch next batch of candidates 

359 batch = query.offset(offset).limit(batch_size).all() 

360 if not batch: 

361 break # No more content to evaluate 

362 

363 # Evaluate authorization for this batch 

364 batch_authorized_count = 0 

365 for content in batch: 

366 from ..permissions import check_content_authorization 

367 

368 authorization_result = check_content_authorization(content, user, "view") 

369 

370 if authorization_result.granted: 

371 authorized_results.append(content) 

372 batch_authorized_count += 1 

373 else: 

374 if content.auth_policy is None: 

375 # Record this authorization pattern as "failing" for future batches 

376 # This prevents us from evaluating similar content again 

377 # Only track failures for content without custom policies 

378 failed_authorization_hashes.add(content.auth_criteria_hash) # type: ignore 

379 else: 

380 # For custom policies, we could also create a hash column of the content policy 

381 # so we can eliminate matching auth_criteria_hash and policy_hash combinations 

382 # This would require another step in the preceding algorithm 

383 failed_policy_patterns.add(content.policy_hash) # type: ignore 

384 pass 

385 

386 offset += batch_size 

387 

388 # Safety valve: if we're not finding any authorized content in recent batches, 

389 # we're probably in a scenario where most/all remaining content is unauthorized 

390 if batch_authorized_count == 0 and len(batch) < batch_size: 

391 break 

392 

393 # Return results in the same format as current implementation 

394 final_results = authorized_results[:target_count] 

395 return final_results 

396 

397 """