Coverage for postrfp/ref/service/content_service.py: 92%
64 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from typing import TypeVar
4from jsonpath.patch import JSONPatch
5from sqlalchemy import Row
6from sqlalchemy.orm import Session
9from postrfp.model.tags import Tag
10from postrfp.model.ref import (
11 ContentQElementPair,
12 ContentSpecMap,
13 ContentSpec,
14 Content,
15 Subject,
16)
17from postrfp.shared.serial.refmodels import ContentDocument, TagSummary, SubjectSummary
18from postrfp.ref.service.helpers import fetch_related_items
21T = TypeVar("T")
24def _update_content_tags(
25 session: Session,
26 content: Content,
27 tag_refs: list[int] | list[TagSummary] | None,
28 replace: bool = False,
29) -> None:
30 if tag_refs is not None:
31 # Extract IDs from TagSummary objects or use IDs directly
32 if tag_refs and isinstance(tag_refs[0], TagSummary):
33 tag_ids = [tag.id for tag in tag_refs if isinstance(tag, TagSummary)]
34 else:
35 tag_ids = tag_refs # type: ignore
37 tags: list[Tag] = fetch_related_items(session, Tag, tag_ids, "Tags")
38 if replace:
39 content.tags = tags
40 else:
41 content.tags.extend(tags)
44def _update_content_subjects(
45 session: Session,
46 content: Content,
47 subject_refs: list[int] | list[SubjectSummary] | None,
48 replace: bool = False,
49) -> None:
50 if subject_refs is not None:
51 # Extract IDs from SubjectSummary objects or use IDs directly
52 if subject_refs and isinstance(subject_refs[0], SubjectSummary):
53 subject_ids = [
54 subject.id
55 for subject in subject_refs
56 if isinstance(subject, SubjectSummary)
57 ]
58 else:
59 subject_ids = subject_refs # type: ignore
61 subjects: list[Subject] = fetch_related_items(
62 session, Subject, subject_ids, "Subjects"
63 )
64 if replace:
65 content.subjects = subjects
66 else:
67 content.subjects.extend(subjects)
70def create_content(
71 session: Session,
72 content_doc: ContentDocument,
73 author_org_id: str,
74 created_by_id: str,
75) -> Content:
76 """
77 Create a new Content item from a ContentDocument
79 Args:
80 session: SQLAlchemy session
81 content_doc: ContentDocument with content values
82 author_org_id: Organization ID that owns this content
83 created_by_id: User ID creating the content
85 Returns:
86 Newly created Content object
87 """
89 # Create new content
90 new_content = Content(
91 title=content_doc.title,
92 content_doc=content_doc.content_doc,
93 schema_id=content_doc.schema_id,
94 auth_policy=content_doc.auth_policy,
95 author_org_id=author_org_id,
96 last_updated_by_id=created_by_id,
97 primary_subject_id=content_doc.primary_subject_id,
98 )
100 # Add tags if provided
101 _update_content_tags(session, new_content, content_doc.tags, replace=False)
103 # Add subjects if provided
104 _update_content_subjects(session, new_content, content_doc.subjects, replace=False)
106 session.add(new_content)
107 session.flush()
109 # Validate *after* flush so the ContentSpec relationship is loaded
110 new_content.jsonschema_validate(content_doc.content_doc)
111 return new_content
114def update_content(
115 session: Session,
116 content: Content,
117 content_doc: ContentDocument,
118 updated_by_id: str,
119) -> Content:
120 """
121 Update a Content item with values from a ContentDocument
123 Args:
124 session: SQLAlchemy session
125 content: Content object to update
126 content_doc: ContentDocument with new values
127 updated_by_id: User ID performing the update
129 Returns:
130 Updated Content object
131 """
132 # Validate the content against the ContentSpect schema
133 content.jsonschema_validate(content_doc.content_doc)
135 # Update basic fields
136 content.title = content_doc.title
137 if content_doc.content_doc is not None:
138 content.content_doc = content_doc.content_doc
139 content.last_updated_by_id = updated_by_id
140 content.auth_policy = content_doc.auth_policy
142 if content_doc.primary_subject_id:
143 content.primary_subject_id = content_doc.primary_subject_id
145 # Update tags if provided (replace existing)
146 _update_content_tags(session, content, content_doc.tags, replace=True)
148 # Update subjects if provided (replace existing)
149 _update_content_subjects(session, content, content_doc.subjects, replace=True)
151 return content
154def fetch_answer_references(
155 content_spec_map: ContentSpecMap, issue_id: int
156) -> list[Row[tuple[str, int, str]]]:
157 """
158 Get answer, element_id, content_reference tuples for the given issue and ContentSpecMap.
159 """
160 from postrfp.model.questionnaire.answering import Answer
161 from postrfp.model.questionnaire.qelements import QElement
163 session = Session.object_session(content_spec_map)
164 if session is None:
165 raise ValueError("content_spec_map is not attached to a session")
166 return (
167 session.query(
168 Answer.answer, Answer.element_id, ContentQElementPair.content_reference
169 )
170 .join(QElement, QElement.id == Answer.element_id)
171 .join(ContentQElementPair)
172 .filter(
173 ContentQElementPair.content_map_id == content_spec_map.id,
174 Answer.issue_id == issue_id,
175 )
176 .all()
177 )
180def fetch_content_spec(session: Session, content_spec_id: int) -> ContentSpec:
181 """
182 Retrieve a ContentSpec by ID, raising an error if not found.
183 (useful for mocking in tests)
184 """
186 return session.get_one(ContentSpec, content_spec_id)
189def jsonpatch_from_answers(
190 session: Session, content_specmap_id: int, issue_id: int
191) -> JSONPatch:
192 """
193 Build a JSONPatch (collection of patches) object for the given ContentSpecMap and Issue
194 """
195 content_spec_map = session.get_one(ContentSpecMap, content_specmap_id)
197 json_patch = JSONPatch()
199 for answer, element_id, reference in fetch_answer_references(
200 content_spec_map, issue_id
201 ):
202 trimmed_answer = answer.strip() if isinstance(answer, str) else answer
203 if trimmed_answer: # Skip empty answers
204 json_patch.add(reference, trimmed_answer)
206 return json_patch
209"""
210FUTURE OPTIMIZATION: Progressive authorization filtering for content search.
212def get_contents_with_progressive_authorization_filtering(
213 session: Session,
214 user: User,
215 q_name: str | None = None,
216 q_spec_id: int | None = None,
217 target_count: int = 50,
218 batch_size: int = 50,
219) -> list[Content]: # type: ignore
220 \"\"\"
222 This approach uses "online learning" during search execution to progressively
223 eliminate content patterns that fail authorization, dramatically reducing
224 CEL evaluations for large result sets.
226 The Algorithm:
227 1. Fetch initial batch of search results
228 2. Evaluate CEL authorization on each item
229 3. For items that fail authorization, record their "auth_criteria_hash"
230 4. On next batch, exclude items with known-failing hash patterns
231 5. Repeat until we have enough authorized results
233 Performance Benefits:
234 - Reduces CEL evaluations from O(total_results) to O(unique_auth_patterns)
235 - Learns authorization patterns on-the-fly (no pre-caching required)
236 - Especially effective when many items share common failing criteria
238 Example Scenario:
239 - Search for "budget" returns 1000 items
240 - 400 items have tag "classified" which user can't access
241 - After evaluating ~50 items, we learn this pattern and exclude remaining 350
242 - Total CEL evaluations: ~150 instead of 1000
244 Prerequisites for Implementation:
245 - Add auth_criteria_hash column to Content table:
246 ALTER TABLE ref_contents ADD COLUMN auth_criteria_hash VARCHAR(64) INDEX;
247 - Populate hash on content create/update based on authorization-relevant attributes:
248 {author_org_id, visibility, subject_types, tags, etc.}
250 For possible policy hashing:
251 - Add policy_hash VARCHAR(64) INDEX column to Content table
252 - Populate on save: hashlib.sha256(policy_text.encode()).hexdigest()
253 - Track failed (auth_criteria_hash, policy_hash) tuples
254 - Use compound SQL filtering for both hash types
256 \"\"\"
257 authorized_results: list[Content] = []
258 failed_authorization_hashes = set() # type: ignore
259 failed_policy_patterns = set() # type: ignore
260 offset = 0
262 while len(authorized_results) < target_count:
263 # Build base query with search filters
264 query = session.query(Content).order_by(Content.date_updated.desc())
266 if q_name:
267 query = query.filter(Content.title.ilike(f"%{q_name}%"))
268 if q_spec_id:
269 query = query.filter(Content.schema_id == q_spec_id)
271 # OPTIMIZATION: Exclude content with auth patterns we know will fail
272 # This is the key performance improvement - each failed CEL evaluation
273 # eliminates entire classes of future content from consideration
274 if failed_authorization_hashes:
275 query = query.filter(
276 ~Content.auth_criteria_hash.in_(failed_authorization_hashes) # type: ignore
277 )
279 # Fetch next batch of candidates
280 batch = query.offset(offset).limit(batch_size).all()
281 if not batch:
282 break # No more content to evaluate
284 # Evaluate authorization for this batch
285 batch_authorized_count = 0
286 for content in batch:
287 from ..permissions import check_content_authorization
289 authorization_result = check_content_authorization(content, user, "view")
291 if authorization_result.granted:
292 authorized_results.append(content)
293 batch_authorized_count += 1
294 else:
295 if content.auth_policy is None:
296 # Record this authorization pattern as "failing" for future batches
297 # This prevents us from evaluating similar content again
298 # Only track failures for content without custom policies
299 failed_authorization_hashes.add(content.auth_criteria_hash) # type: ignore
300 else:
301 # For custom policies, we could also create a hash column of the content policy
302 # so we can eliminate matching auth_criteria_hash and policy_hash combinations
303 # This would require another step in the preceding algorithm
304 failed_policy_patterns.add(content.policy_hash) # type: ignore
305 pass
307 offset += batch_size
309 # Safety valve: if we're not finding any authorized content in recent batches,
310 # we're probably in a scenario where most/all remaining content is unauthorized
311 if batch_authorized_count == 0 and len(batch) < batch_size:
312 break
314 # Return results in the same format as current implementation
315 final_results = authorized_results[:target_count]
316 return final_results
318 """