Coverage for postrfp / ref / service / content_service.py: 98%
83 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1from typing import TypeVar
4from jsonpath.patch import JSONPatch
5from sqlalchemy import Row
6from sqlalchemy.orm import Session
7from sqlalchemy.orm.attributes import flag_modified
10from postrfp.model.tags import Tag
11from postrfp.model.audit import AuditEvent, evt_types
12from postrfp.model.humans import User
13from postrfp.model.ref import (
14 ContentQElementPair,
15 ContentRevision,
16 ContentSpecMap,
17 ContentSpec,
18 Content,
19 Subject,
20)
21from postrfp.shared.exceptions import UpdateConflict
22from postrfp.shared.serial.refmodels import (
23 ContentDocument,
24 JsonPatchOp,
25 TagSummary,
26 SubjectSummary,
27)
28from postrfp.ref.service.helpers import fetch_related_items
31T = TypeVar("T")
34def _update_content_tags(
35 session: Session,
36 content: Content,
37 tag_refs: list[int] | list[TagSummary] | None,
38 replace: bool = False,
39) -> None:
40 if tag_refs is not None:
41 # Extract IDs from TagSummary objects or use IDs directly
42 if tag_refs and isinstance(tag_refs[0], TagSummary):
43 tag_ids = [tag.id for tag in tag_refs if isinstance(tag, TagSummary)]
44 else:
45 tag_ids = tag_refs # type: ignore
47 tags: list[Tag] = fetch_related_items(session, Tag, tag_ids, "Tags")
48 if replace:
49 content.tags = tags
50 else:
51 content.tags.extend(tags)
54def _update_content_subjects(
55 session: Session,
56 content: Content,
57 subject_refs: list[int] | list[SubjectSummary] | None,
58 replace: bool = False,
59) -> None:
60 if subject_refs is not None:
61 # Extract IDs from SubjectSummary objects or use IDs directly
62 if subject_refs and isinstance(subject_refs[0], SubjectSummary):
63 subject_ids = [
64 subject.id
65 for subject in subject_refs
66 if isinstance(subject, SubjectSummary)
67 ]
68 else:
69 subject_ids = subject_refs # type: ignore
71 subjects: list[Subject] = fetch_related_items(
72 session, Subject, subject_ids, "Subjects"
73 )
74 if replace:
75 content.subjects = subjects
76 else:
77 content.subjects.extend(subjects)
80def create_content(
81 session: Session,
82 content_doc: ContentDocument,
83 author_org_id: str,
84 created_by_id: str,
85) -> Content:
86 """
87 Create a new Content item from a ContentDocument
89 Args:
90 session: SQLAlchemy session
91 content_doc: ContentDocument with content values
92 author_org_id: Organization ID that owns this content
93 created_by_id: User ID creating the content
95 Returns:
96 Newly created Content object
97 """
99 # Create new content
100 new_content = Content(
101 title=content_doc.title,
102 content_doc=content_doc.content_doc,
103 schema_id=content_doc.schema_id,
104 auth_policy=content_doc.auth_policy,
105 author_org_id=author_org_id,
106 last_updated_by_id=created_by_id,
107 primary_subject_id=content_doc.primary_subject_id,
108 )
110 # Add tags if provided
111 _update_content_tags(session, new_content, content_doc.tags, replace=False)
113 # Add subjects if provided
114 _update_content_subjects(session, new_content, content_doc.subjects, replace=False)
116 session.add(new_content)
117 session.flush()
119 # Validate *after* flush so the ContentSpec relationship is loaded
120 new_content.jsonschema_validate(content_doc.content_doc, raise_on_error=False)
121 return new_content
124def update_content(
125 session: Session,
126 content: Content,
127 content_doc: ContentDocument,
128 updated_by_id: str,
129) -> Content:
130 """
131 Update a Content item with values from a ContentDocument
133 Args:
134 session: SQLAlchemy session
135 content: Content object to update
136 content_doc: ContentDocument with new values
137 updated_by_id: User ID performing the update
139 Returns:
140 Updated Content object
141 """
142 # Validate the content against the ContentSpect schema
143 content.jsonschema_validate(content_doc.content_doc, raise_on_error=False)
145 # Update basic fields
146 content.title = content_doc.title
147 if content_doc.content_doc is not None:
148 content.content_doc = content_doc.content_doc
149 content.last_updated_by_id = updated_by_id
150 content.auth_policy = content_doc.auth_policy
152 if content_doc.primary_subject_id:
153 content.primary_subject_id = content_doc.primary_subject_id
155 # Update tags if provided (replace existing)
156 _update_content_tags(session, content, content_doc.tags, replace=True)
158 # Update subjects if provided (replace existing)
159 _update_content_subjects(session, content, content_doc.subjects, replace=True)
161 return content
164def patch_content_doc(
165 session: Session,
166 content: Content,
167 json_patch_dicts: list[JsonPatchOp],
168 if_match: str | None,
169 updated_by_user: User,
170 comment: str | None = None,
171 skip_etag_check: bool = False,
172) -> None:
173 """
174 Apply a JSONPatch to a Content item
176 Args:
177 session: SQLAlchemy session
178 content: Content object to patch
179 json_patch_dicts: List of JSONPatch operation dicts
180 if_match: ETag for optimistic locking (optional)
181 updated_by_user: User performing the update
182 comment: Optional comment describing the change
183 skip_etag_check: Skip ETag validation (for schema migrations)
184 """
186 # Check ETag match if provided and not skipping
187 if not skip_etag_check and if_match and if_match != content.etag:
188 raise UpdateConflict(
189 "ETag mismatch: content has been modified since last retrieval"
190 )
192 # Save the original version number for the revision record
193 original_version_number = content.version
195 patches = [p.model_dump() for p in json_patch_dicts]
197 json_patch = JSONPatch(patches)
198 # Apply the JSONPatch to the content document, in-place modification
199 json_patch.apply(content.content_doc)
201 # Validate the patched content against the ContentSpec schema
202 content.jsonschema_validate(content.content_doc)
203 # Update the content document and last updated by
204 flag_modified(content, "content_doc")
205 content.last_updated_by_id = updated_by_user.id
207 revision = ContentRevision(
208 content_id=content.id,
209 entity_type="Document",
210 user_id=updated_by_user.id,
211 patch_operations=json_patch.asdicts(),
212 version_number=original_version_number,
213 comment=comment,
214 )
215 # Add the revision to the Content's revisions relationship
216 # thus adding to the session and populating the revisions list
217 # for testing convenience
218 content.revisions.append(revision)
219 # Get the ID number of Revision for the audit event
220 session.flush()
222 updated_event = AuditEvent.create(
223 session,
224 evt_types.REF_CONTENT_UPDATED,
225 object_id=revision.id,
226 user_id=updated_by_user.id,
227 org_id=updated_by_user.org_id,
228 )
230 session.add(updated_event)
233def fetch_answer_references(
234 content_spec_map: ContentSpecMap, issue_id: int
235) -> list[Row[tuple[str, int, str]]]:
236 """
237 Get answer, element_id, content_reference tuples for the given issue and ContentSpecMap.
238 """
239 from postrfp.model.questionnaire.answering import Answer
240 from postrfp.model.questionnaire.qelements import QElement
242 session = Session.object_session(content_spec_map)
243 if session is None:
244 raise ValueError("content_spec_map is not attached to a session")
245 return (
246 session.query(
247 Answer.answer, Answer.element_id, ContentQElementPair.content_reference
248 )
249 .join(QElement, QElement.id == Answer.element_id)
250 .join(ContentQElementPair)
251 .filter(
252 ContentQElementPair.content_map_id == content_spec_map.id,
253 Answer.issue_id == issue_id,
254 )
255 .all()
256 )
259def fetch_content_spec(session: Session, content_spec_id: int) -> ContentSpec:
260 """
261 Retrieve a ContentSpec by ID, raising an error if not found.
262 (useful for mocking in tests)
263 """
265 return session.get_one(ContentSpec, content_spec_id)
268def jsonpatch_from_answers(
269 session: Session, content_specmap_id: int, issue_id: int
270) -> JSONPatch:
271 """
272 Build a JSONPatch (collection of patches) object for the given ContentSpecMap and Issue
273 """
274 content_spec_map = session.get_one(ContentSpecMap, content_specmap_id)
276 json_patch = JSONPatch()
278 for answer, element_id, reference in fetch_answer_references(
279 content_spec_map, issue_id
280 ):
281 trimmed_answer = answer.strip() if isinstance(answer, str) else answer
282 if trimmed_answer: # Skip empty answers
283 json_patch.add(reference, trimmed_answer)
285 return json_patch
288"""
289FUTURE OPTIMIZATION: Progressive authorization filtering for content search.
291def get_contents_with_progressive_authorization_filtering(
292 session: Session,
293 user: User,
294 q_name: str | None = None,
295 q_spec_id: int | None = None,
296 target_count: int = 50,
297 batch_size: int = 50,
298) -> list[Content]: # type: ignore
299 \"\"\"
301 This approach uses "online learning" during search execution to progressively
302 eliminate content patterns that fail authorization, dramatically reducing
303 CEL evaluations for large result sets.
305 The Algorithm:
306 1. Fetch initial batch of search results
307 2. Evaluate CEL authorization on each item
308 3. For items that fail authorization, record their "auth_criteria_hash"
309 4. On next batch, exclude items with known-failing hash patterns
310 5. Repeat until we have enough authorized results
312 Performance Benefits:
313 - Reduces CEL evaluations from O(total_results) to O(unique_auth_patterns)
314 - Learns authorization patterns on-the-fly (no pre-caching required)
315 - Especially effective when many items share common failing criteria
317 Example Scenario:
318 - Search for "budget" returns 1000 items
319 - 400 items have tag "classified" which user can't access
320 - After evaluating ~50 items, we learn this pattern and exclude remaining 350
321 - Total CEL evaluations: ~150 instead of 1000
323 Prerequisites for Implementation:
324 - Add auth_criteria_hash column to Content table:
325 ALTER TABLE ref_contents ADD COLUMN auth_criteria_hash VARCHAR(64) INDEX;
326 - Populate hash on content create/update based on authorization-relevant attributes:
327 {author_org_id, visibility, subject_types, tags, etc.}
329 For possible policy hashing:
330 - Add policy_hash VARCHAR(64) INDEX column to Content table
331 - Populate on save: hashlib.sha256(policy_text.encode()).hexdigest()
332 - Track failed (auth_criteria_hash, policy_hash) tuples
333 - Use compound SQL filtering for both hash types
335 \"\"\"
336 authorized_results: list[Content] = []
337 failed_authorization_hashes = set() # type: ignore
338 failed_policy_patterns = set() # type: ignore
339 offset = 0
341 while len(authorized_results) < target_count:
342 # Build base query with search filters
343 query = session.query(Content).order_by(Content.date_updated.desc())
345 if q_name:
346 query = query.filter(Content.title.ilike(f"%{q_name}%"))
347 if q_spec_id:
348 query = query.filter(Content.schema_id == q_spec_id)
350 # OPTIMIZATION: Exclude content with auth patterns we know will fail
351 # This is the key performance improvement - each failed CEL evaluation
352 # eliminates entire classes of future content from consideration
353 if failed_authorization_hashes:
354 query = query.filter(
355 ~Content.auth_criteria_hash.in_(failed_authorization_hashes) # type: ignore
356 )
358 # Fetch next batch of candidates
359 batch = query.offset(offset).limit(batch_size).all()
360 if not batch:
361 break # No more content to evaluate
363 # Evaluate authorization for this batch
364 batch_authorized_count = 0
365 for content in batch:
366 from ..permissions import check_content_authorization
368 authorization_result = check_content_authorization(content, user, "view")
370 if authorization_result.granted:
371 authorized_results.append(content)
372 batch_authorized_count += 1
373 else:
374 if content.auth_policy is None:
375 # Record this authorization pattern as "failing" for future batches
376 # This prevents us from evaluating similar content again
377 # Only track failures for content without custom policies
378 failed_authorization_hashes.add(content.auth_criteria_hash) # type: ignore
379 else:
380 # For custom policies, we could also create a hash column of the content policy
381 # so we can eliminate matching auth_criteria_hash and policy_hash combinations
382 # This would require another step in the preceding algorithm
383 failed_policy_patterns.add(content.policy_hash) # type: ignore
384 pass
386 offset += batch_size
388 # Safety valve: if we're not finding any authorized content in recent batches,
389 # we're probably in a scenario where most/all remaining content is unauthorized
390 if batch_authorized_count == 0 and len(batch) < batch_size:
391 break
393 # Return results in the same format as current implementation
394 final_results = authorized_results[:target_count]
395 return final_results
397 """