Coverage for postrfp/ref/service/content_service.py: 92%

64 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from typing import TypeVar 

2 

3 

4from jsonpath.patch import JSONPatch 

5from sqlalchemy import Row 

6from sqlalchemy.orm import Session 

7 

8 

9from postrfp.model.tags import Tag 

10from postrfp.model.ref import ( 

11 ContentQElementPair, 

12 ContentSpecMap, 

13 ContentSpec, 

14 Content, 

15 Subject, 

16) 

17from postrfp.shared.serial.refmodels import ContentDocument, TagSummary, SubjectSummary 

18from postrfp.ref.service.helpers import fetch_related_items 

19 

20 

21T = TypeVar("T") 

22 

23 

24def _update_content_tags( 

25 session: Session, 

26 content: Content, 

27 tag_refs: list[int] | list[TagSummary] | None, 

28 replace: bool = False, 

29) -> None: 

30 if tag_refs is not None: 

31 # Extract IDs from TagSummary objects or use IDs directly 

32 if tag_refs and isinstance(tag_refs[0], TagSummary): 

33 tag_ids = [tag.id for tag in tag_refs if isinstance(tag, TagSummary)] 

34 else: 

35 tag_ids = tag_refs # type: ignore 

36 

37 tags: list[Tag] = fetch_related_items(session, Tag, tag_ids, "Tags") 

38 if replace: 

39 content.tags = tags 

40 else: 

41 content.tags.extend(tags) 

42 

43 

44def _update_content_subjects( 

45 session: Session, 

46 content: Content, 

47 subject_refs: list[int] | list[SubjectSummary] | None, 

48 replace: bool = False, 

49) -> None: 

50 if subject_refs is not None: 

51 # Extract IDs from SubjectSummary objects or use IDs directly 

52 if subject_refs and isinstance(subject_refs[0], SubjectSummary): 

53 subject_ids = [ 

54 subject.id 

55 for subject in subject_refs 

56 if isinstance(subject, SubjectSummary) 

57 ] 

58 else: 

59 subject_ids = subject_refs # type: ignore 

60 

61 subjects: list[Subject] = fetch_related_items( 

62 session, Subject, subject_ids, "Subjects" 

63 ) 

64 if replace: 

65 content.subjects = subjects 

66 else: 

67 content.subjects.extend(subjects) 

68 

69 

70def create_content( 

71 session: Session, 

72 content_doc: ContentDocument, 

73 author_org_id: str, 

74 created_by_id: str, 

75) -> Content: 

76 """ 

77 Create a new Content item from a ContentDocument 

78 

79 Args: 

80 session: SQLAlchemy session 

81 content_doc: ContentDocument with content values 

82 author_org_id: Organization ID that owns this content 

83 created_by_id: User ID creating the content 

84 

85 Returns: 

86 Newly created Content object 

87 """ 

88 

89 # Create new content 

90 new_content = Content( 

91 title=content_doc.title, 

92 content_doc=content_doc.content_doc, 

93 schema_id=content_doc.schema_id, 

94 auth_policy=content_doc.auth_policy, 

95 author_org_id=author_org_id, 

96 last_updated_by_id=created_by_id, 

97 primary_subject_id=content_doc.primary_subject_id, 

98 ) 

99 

100 # Add tags if provided 

101 _update_content_tags(session, new_content, content_doc.tags, replace=False) 

102 

103 # Add subjects if provided 

104 _update_content_subjects(session, new_content, content_doc.subjects, replace=False) 

105 

106 session.add(new_content) 

107 session.flush() 

108 

109 # Validate *after* flush so the ContentSpec relationship is loaded 

110 new_content.jsonschema_validate(content_doc.content_doc) 

111 return new_content 

112 

113 

114def update_content( 

115 session: Session, 

116 content: Content, 

117 content_doc: ContentDocument, 

118 updated_by_id: str, 

119) -> Content: 

120 """ 

121 Update a Content item with values from a ContentDocument 

122 

123 Args: 

124 session: SQLAlchemy session 

125 content: Content object to update 

126 content_doc: ContentDocument with new values 

127 updated_by_id: User ID performing the update 

128 

129 Returns: 

130 Updated Content object 

131 """ 

132 # Validate the content against the ContentSpect schema 

133 content.jsonschema_validate(content_doc.content_doc) 

134 

135 # Update basic fields 

136 content.title = content_doc.title 

137 if content_doc.content_doc is not None: 

138 content.content_doc = content_doc.content_doc 

139 content.last_updated_by_id = updated_by_id 

140 content.auth_policy = content_doc.auth_policy 

141 

142 if content_doc.primary_subject_id: 

143 content.primary_subject_id = content_doc.primary_subject_id 

144 

145 # Update tags if provided (replace existing) 

146 _update_content_tags(session, content, content_doc.tags, replace=True) 

147 

148 # Update subjects if provided (replace existing) 

149 _update_content_subjects(session, content, content_doc.subjects, replace=True) 

150 

151 return content 

152 

153 

154def fetch_answer_references( 

155 content_spec_map: ContentSpecMap, issue_id: int 

156) -> list[Row[tuple[str, int, str]]]: 

157 """ 

158 Get answer, element_id, content_reference tuples for the given issue and ContentSpecMap. 

159 """ 

160 from postrfp.model.questionnaire.answering import Answer 

161 from postrfp.model.questionnaire.qelements import QElement 

162 

163 session = Session.object_session(content_spec_map) 

164 if session is None: 

165 raise ValueError("content_spec_map is not attached to a session") 

166 return ( 

167 session.query( 

168 Answer.answer, Answer.element_id, ContentQElementPair.content_reference 

169 ) 

170 .join(QElement, QElement.id == Answer.element_id) 

171 .join(ContentQElementPair) 

172 .filter( 

173 ContentQElementPair.content_map_id == content_spec_map.id, 

174 Answer.issue_id == issue_id, 

175 ) 

176 .all() 

177 ) 

178 

179 

180def fetch_content_spec(session: Session, content_spec_id: int) -> ContentSpec: 

181 """ 

182 Retrieve a ContentSpec by ID, raising an error if not found. 

183 (useful for mocking in tests) 

184 """ 

185 

186 return session.get_one(ContentSpec, content_spec_id) 

187 

188 

189def jsonpatch_from_answers( 

190 session: Session, content_specmap_id: int, issue_id: int 

191) -> JSONPatch: 

192 """ 

193 Build a JSONPatch (collection of patches) object for the given ContentSpecMap and Issue 

194 """ 

195 content_spec_map = session.get_one(ContentSpecMap, content_specmap_id) 

196 

197 json_patch = JSONPatch() 

198 

199 for answer, element_id, reference in fetch_answer_references( 

200 content_spec_map, issue_id 

201 ): 

202 trimmed_answer = answer.strip() if isinstance(answer, str) else answer 

203 if trimmed_answer: # Skip empty answers 

204 json_patch.add(reference, trimmed_answer) 

205 

206 return json_patch 

207 

208 

209""" 

210FUTURE OPTIMIZATION: Progressive authorization filtering for content search. 

211 

212def get_contents_with_progressive_authorization_filtering( 

213 session: Session, 

214 user: User, 

215 q_name: str | None = None, 

216 q_spec_id: int | None = None, 

217 target_count: int = 50, 

218 batch_size: int = 50, 

219) -> list[Content]: # type: ignore 

220 \"\"\" 

221 

222 This approach uses "online learning" during search execution to progressively 

223 eliminate content patterns that fail authorization, dramatically reducing 

224 CEL evaluations for large result sets. 

225 

226 The Algorithm: 

227 1. Fetch initial batch of search results 

228 2. Evaluate CEL authorization on each item 

229 3. For items that fail authorization, record their "auth_criteria_hash" 

230 4. On next batch, exclude items with known-failing hash patterns 

231 5. Repeat until we have enough authorized results 

232 

233 Performance Benefits: 

234 - Reduces CEL evaluations from O(total_results) to O(unique_auth_patterns) 

235 - Learns authorization patterns on-the-fly (no pre-caching required) 

236 - Especially effective when many items share common failing criteria 

237 

238 Example Scenario: 

239 - Search for "budget" returns 1000 items 

240 - 400 items have tag "classified" which user can't access 

241 - After evaluating ~50 items, we learn this pattern and exclude remaining 350 

242 - Total CEL evaluations: ~150 instead of 1000 

243 

244 Prerequisites for Implementation: 

245 - Add auth_criteria_hash column to Content table: 

246 ALTER TABLE ref_contents ADD COLUMN auth_criteria_hash VARCHAR(64) INDEX; 

247 - Populate hash on content create/update based on authorization-relevant attributes: 

248 {author_org_id, visibility, subject_types, tags, etc.} 

249 

250 For possible policy hashing: 

251 - Add policy_hash VARCHAR(64) INDEX column to Content table 

252 - Populate on save: hashlib.sha256(policy_text.encode()).hexdigest() 

253 - Track failed (auth_criteria_hash, policy_hash) tuples 

254 - Use compound SQL filtering for both hash types 

255 

256 \"\"\" 

257 authorized_results: list[Content] = [] 

258 failed_authorization_hashes = set() # type: ignore 

259 failed_policy_patterns = set() # type: ignore 

260 offset = 0 

261 

262 while len(authorized_results) < target_count: 

263 # Build base query with search filters 

264 query = session.query(Content).order_by(Content.date_updated.desc()) 

265 

266 if q_name: 

267 query = query.filter(Content.title.ilike(f"%{q_name}%")) 

268 if q_spec_id: 

269 query = query.filter(Content.schema_id == q_spec_id) 

270 

271 # OPTIMIZATION: Exclude content with auth patterns we know will fail 

272 # This is the key performance improvement - each failed CEL evaluation 

273 # eliminates entire classes of future content from consideration 

274 if failed_authorization_hashes: 

275 query = query.filter( 

276 ~Content.auth_criteria_hash.in_(failed_authorization_hashes) # type: ignore 

277 ) 

278 

279 # Fetch next batch of candidates 

280 batch = query.offset(offset).limit(batch_size).all() 

281 if not batch: 

282 break # No more content to evaluate 

283 

284 # Evaluate authorization for this batch 

285 batch_authorized_count = 0 

286 for content in batch: 

287 from ..permissions import check_content_authorization 

288 

289 authorization_result = check_content_authorization(content, user, "view") 

290 

291 if authorization_result.granted: 

292 authorized_results.append(content) 

293 batch_authorized_count += 1 

294 else: 

295 if content.auth_policy is None: 

296 # Record this authorization pattern as "failing" for future batches 

297 # This prevents us from evaluating similar content again 

298 # Only track failures for content without custom policies 

299 failed_authorization_hashes.add(content.auth_criteria_hash) # type: ignore 

300 else: 

301 # For custom policies, we could also create a hash column of the content policy 

302 # so we can eliminate matching auth_criteria_hash and policy_hash combinations 

303 # This would require another step in the preceding algorithm 

304 failed_policy_patterns.add(content.policy_hash) # type: ignore 

305 pass 

306 

307 offset += batch_size 

308 

309 # Safety valve: if we're not finding any authorized content in recent batches, 

310 # we're probably in a scenario where most/all remaining content is unauthorized 

311 if batch_authorized_count == 0 and len(batch) < batch_size: 

312 break 

313 

314 # Return results in the same format as current implementation 

315 final_results = authorized_results[:target_count] 

316 return final_results 

317 

318 """