Coverage for postrfp/ref/permissions.py: 80%

59 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2CEL-based authorization system for reference content management. 

3 

4This module provides CEL policy evaluation for content, content specs, and subjects. 

5It replaces the complex permission checking system with flexible policy expressions. 

6""" 

7 

8from typing import Dict, Any, Optional 

9from dataclasses import dataclass, field 

10 

11from postrfp.shared.expression import evaluate_expression # CEL evaluation function 

12from postrfp.model.ref import Content, ContentSpec, Subject 

13from postrfp.model.humans import User 

14 

15 

16@dataclass 

17class AuthorizationResult: 

18 """Result of a CEL policy evaluation with detailed context.""" 

19 

20 granted: bool 

21 reason: str 

22 policy_expression: Optional[str] = None 

23 evaluation_context: Dict[str, Any] = field(default_factory=dict) 

24 suggested_actions: list[str] = field(default_factory=list) 

25 

26 def __bool__(self) -> bool: 

27 """Allow using AuthorizationResult in boolean contexts.""" 

28 return self.granted 

29 

30 def get_user_friendly_message(self) -> str: 

31 """Generate a user-friendly message about the authorization decision.""" 

32 if self.granted: 

33 return f"Access granted: {self.reason}" 

34 

35 message = f"Access denied: {self.reason}" 

36 if self.suggested_actions: 

37 message += "\n\nSuggested actions:" 

38 for i, action in enumerate(self.suggested_actions, 1): 

39 message += f"\n {i}. {action}" 

40 

41 return message 

42 

43 def to_dict(self) -> Dict[str, Any]: 

44 """Convert to dictionary for API responses.""" 

45 return { 

46 "granted": self.granted, 

47 "reason": self.reason, 

48 "policy_expression": self.policy_expression, 

49 "evaluation_context": self.evaluation_context, 

50 "suggested_actions": self.suggested_actions, 

51 "user_message": self.get_user_friendly_message(), 

52 } 

53 

54 

55class CELAuthorizationChecker: 

56 """ 

57 Main authorization checker using CEL policies. 

58 

59 This replaces the complex ContentPermissionChecker with a simpler 

60 CEL-based approach. 

61 """ 

62 

63 def __init__(self, entity: Any): 

64 """Initialize with an entity (Content, ContentSpec, or Subject).""" 

65 self.entity = entity 

66 

67 # Determine entity type 

68 if isinstance(entity, Content): 

69 self.entity_type = "content" 

70 elif isinstance(entity, ContentSpec): 

71 self.entity_type = "content_spec" 

72 elif isinstance(entity, Subject): 

73 self.entity_type = "subject" 

74 else: 

75 self.entity_type = "unknown" 

76 

77 def check_authorization( 

78 self, user: User, action: str, **context 

79 ) -> AuthorizationResult: 

80 """ 

81 Check if user is authorized to perform action on entity. 

82 

83 Args: 

84 user: User requesting access 

85 action: Action being attempted (view, edit, create, etc.) 

86 **context: Additional context for evaluation 

87 

88 Returns: 

89 AuthorizationResult with decision and details 

90 """ 

91 # Build evaluation context - simplified for stub 

92 entity_context = { 

93 "author_org_id": getattr(self.entity, "author_org_id", None), 

94 } 

95 

96 eval_context = { 

97 "entity": entity_context, 

98 "user": { 

99 "id": user.id, 

100 "org_id": user.org_id, 

101 "org": {"id": user.org_id}, 

102 }, 

103 "action": action, 

104 } 

105 

106 # Get policy to evaluate 

107 policy = getattr(self.entity, "auth_policy", None) 

108 

109 if not policy: 

110 # Use default policy that allows public content or author org access 

111 policy = "entity.author_org_id == user.org.id" 

112 

113 # Evaluate policy 

114 granted = evaluate_expression(policy, eval_context) 

115 

116 return AuthorizationResult( 

117 granted=granted, 

118 reason="Access granted" if granted else "Access denied", 

119 policy_expression=policy, 

120 evaluation_context=eval_context, 

121 ) 

122 

123 

124class AuthorizationDeniedError(Exception): 

125 """Exception raised when authorization is denied.""" 

126 

127 def __init__( 

128 self, 

129 entity: Any, 

130 authorization_result: AuthorizationResult, 

131 attempted_action: str, 

132 user: Any, 

133 ): 

134 self.entity = entity 

135 self.authorization_result = authorization_result 

136 self.attempted_action = attempted_action 

137 self.user = user 

138 

139 super().__init__(authorization_result.get_user_friendly_message()) 

140 

141 

142# Convenience functions for backward compatibility 

143 

144 

145def check_content_authorization( 

146 content: Content, user: Any, action: str, **context 

147) -> AuthorizationResult: 

148 """Check authorization for a Content entity.""" 

149 checker = CELAuthorizationChecker(content) 

150 return checker.check_authorization(user, action, **context) 

151 

152 

153def check_subject_authorization( 

154 subject: Subject, user: Any, action: str, **context 

155) -> AuthorizationResult: 

156 """Check authorization for a Subject entity.""" 

157 checker = CELAuthorizationChecker(subject) 

158 return checker.check_authorization(user, action, **context) 

159 

160 

161def check_content_spec_authorization( 

162 content_spec: ContentSpec, user: Any, action: str, **context 

163) -> AuthorizationResult: 

164 """Check authorization for a ContentSpec entity.""" 

165 checker = CELAuthorizationChecker(content_spec) 

166 return checker.check_authorization(user, action, **context)