Coverage for postrfp/ref/permissions.py: 80%
59 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2CEL-based authorization system for reference content management.
4This module provides CEL policy evaluation for content, content specs, and subjects.
5It replaces the complex permission checking system with flexible policy expressions.
6"""
8from typing import Dict, Any, Optional
9from dataclasses import dataclass, field
11from postrfp.shared.expression import evaluate_expression # CEL evaluation function
12from postrfp.model.ref import Content, ContentSpec, Subject
13from postrfp.model.humans import User
16@dataclass
17class AuthorizationResult:
18 """Result of a CEL policy evaluation with detailed context."""
20 granted: bool
21 reason: str
22 policy_expression: Optional[str] = None
23 evaluation_context: Dict[str, Any] = field(default_factory=dict)
24 suggested_actions: list[str] = field(default_factory=list)
26 def __bool__(self) -> bool:
27 """Allow using AuthorizationResult in boolean contexts."""
28 return self.granted
30 def get_user_friendly_message(self) -> str:
31 """Generate a user-friendly message about the authorization decision."""
32 if self.granted:
33 return f"Access granted: {self.reason}"
35 message = f"Access denied: {self.reason}"
36 if self.suggested_actions:
37 message += "\n\nSuggested actions:"
38 for i, action in enumerate(self.suggested_actions, 1):
39 message += f"\n {i}. {action}"
41 return message
43 def to_dict(self) -> Dict[str, Any]:
44 """Convert to dictionary for API responses."""
45 return {
46 "granted": self.granted,
47 "reason": self.reason,
48 "policy_expression": self.policy_expression,
49 "evaluation_context": self.evaluation_context,
50 "suggested_actions": self.suggested_actions,
51 "user_message": self.get_user_friendly_message(),
52 }
55class CELAuthorizationChecker:
56 """
57 Main authorization checker using CEL policies.
59 This replaces the complex ContentPermissionChecker with a simpler
60 CEL-based approach.
61 """
63 def __init__(self, entity: Any):
64 """Initialize with an entity (Content, ContentSpec, or Subject)."""
65 self.entity = entity
67 # Determine entity type
68 if isinstance(entity, Content):
69 self.entity_type = "content"
70 elif isinstance(entity, ContentSpec):
71 self.entity_type = "content_spec"
72 elif isinstance(entity, Subject):
73 self.entity_type = "subject"
74 else:
75 self.entity_type = "unknown"
77 def check_authorization(
78 self, user: User, action: str, **context
79 ) -> AuthorizationResult:
80 """
81 Check if user is authorized to perform action on entity.
83 Args:
84 user: User requesting access
85 action: Action being attempted (view, edit, create, etc.)
86 **context: Additional context for evaluation
88 Returns:
89 AuthorizationResult with decision and details
90 """
91 # Build evaluation context - simplified for stub
92 entity_context = {
93 "author_org_id": getattr(self.entity, "author_org_id", None),
94 }
96 eval_context = {
97 "entity": entity_context,
98 "user": {
99 "id": user.id,
100 "org_id": user.org_id,
101 "org": {"id": user.org_id},
102 },
103 "action": action,
104 }
106 # Get policy to evaluate
107 policy = getattr(self.entity, "auth_policy", None)
109 if not policy:
110 # Use default policy that allows public content or author org access
111 policy = "entity.author_org_id == user.org.id"
113 # Evaluate policy
114 granted = evaluate_expression(policy, eval_context)
116 return AuthorizationResult(
117 granted=granted,
118 reason="Access granted" if granted else "Access denied",
119 policy_expression=policy,
120 evaluation_context=eval_context,
121 )
124class AuthorizationDeniedError(Exception):
125 """Exception raised when authorization is denied."""
127 def __init__(
128 self,
129 entity: Any,
130 authorization_result: AuthorizationResult,
131 attempted_action: str,
132 user: Any,
133 ):
134 self.entity = entity
135 self.authorization_result = authorization_result
136 self.attempted_action = attempted_action
137 self.user = user
139 super().__init__(authorization_result.get_user_friendly_message())
142# Convenience functions for backward compatibility
145def check_content_authorization(
146 content: Content, user: Any, action: str, **context
147) -> AuthorizationResult:
148 """Check authorization for a Content entity."""
149 checker = CELAuthorizationChecker(content)
150 return checker.check_authorization(user, action, **context)
153def check_subject_authorization(
154 subject: Subject, user: Any, action: str, **context
155) -> AuthorizationResult:
156 """Check authorization for a Subject entity."""
157 checker = CELAuthorizationChecker(subject)
158 return checker.check_authorization(user, action, **context)
161def check_content_spec_authorization(
162 content_spec: ContentSpec, user: Any, action: str, **context
163) -> AuthorizationResult:
164 """Check authorization for a ContentSpec entity."""
165 checker = CELAuthorizationChecker(content_spec)
166 return checker.check_authorization(user, action, **context)