Coverage for postrfp / ref / permissions.py: 91%

46 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2CEL-based authorization system for reference content management. 

3 

4This module provides CEL policy evaluation for content, content specs, and subjects. 

5It replaces the complex permission checking system with flexible policy expressions. 

6""" 

7 

8from typing import Any 

9from dataclasses import dataclass, field 

10 

11from postrfp.shared.expression import evaluate_expression # CEL evaluation function 

12from postrfp.model.ref import Content, ContentSpec, Subject 

13from postrfp.model.humans import User 

14 

15 

16@dataclass 

17class AuthorizationResult: 

18 """Result of a CEL policy evaluation with detailed context.""" 

19 

20 granted: bool 

21 reason: str 

22 policy_expression: str | None = None 

23 evaluation_context: dict[str, Any] = field(default_factory=dict) 

24 

25 def get_user_friendly_message(self) -> str: 

26 """Generate a user-friendly message about the authorization decision.""" 

27 if self.granted: 

28 return f"Access granted: {self.reason}" 

29 

30 message = f"Access denied: {self.reason}" 

31 

32 return message 

33 

34 def to_dict(self) -> dict[str, Any]: 

35 """Convert to dictionary for API responses.""" 

36 return { 

37 "granted": self.granted, 

38 "reason": self.reason, 

39 "policy_expression": self.policy_expression, 

40 "evaluation_context": self.evaluation_context, 

41 "user_message": self.get_user_friendly_message(), 

42 } 

43 

44 

45class CELAuthorizationChecker: 

46 """ 

47 Main authorization checker using CEL policies. 

48 

49 This replaces the complex ContentPermissionChecker with a simpler 

50 CEL-based approach. 

51 """ 

52 

53 def __init__(self, entity: Any): 

54 """Initialize with an entity (Content, ContentSpec, or Subject).""" 

55 self.entity = entity 

56 

57 # Determine entity type 

58 if isinstance(entity, Content): 

59 self.entity_type = "content" 

60 elif isinstance(entity, ContentSpec): 

61 self.entity_type = "content_spec" 

62 elif isinstance(entity, Subject): 

63 self.entity_type = "subject" 

64 else: 

65 self.entity_type = "unknown" 

66 

67 def check_authorization( 

68 self, user: User, action: str, **context 

69 ) -> AuthorizationResult: 

70 """ 

71 Check if user is authorized to perform action on entity. 

72 

73 Args: 

74 user: User requesting access 

75 action: Action being attempted (view, edit, create, etc.) 

76 **context: Additional context for evaluation 

77 

78 Returns: 

79 AuthorizationResult with decision and details 

80 """ 

81 # Build evaluation context - simplified for stub 

82 entity_context = { 

83 "author_org_id": getattr(self.entity, "author_org_id", None), 

84 } 

85 

86 eval_context = { 

87 "entity": entity_context, 

88 "user": { 

89 "id": user.id, 

90 "org_id": user.org_id, 

91 "org": {"id": user.org_id}, 

92 }, 

93 "action": action, 

94 } 

95 

96 # Get policy to evaluate 

97 policy = getattr(self.entity, "auth_policy", None) 

98 

99 if not policy: 

100 # Use default policy that allows public content or author org access 

101 policy = "entity.author_org_id == user.org.id" 

102 

103 # Evaluate policy 

104 granted = evaluate_expression(policy, eval_context) 

105 

106 return AuthorizationResult( 

107 granted=granted, 

108 reason="Access granted" if granted else "Access denied", 

109 policy_expression=policy, 

110 evaluation_context=eval_context, 

111 ) 

112 

113 

114class AuthorizationDeniedError(Exception): 

115 """Exception raised when authorization is denied.""" 

116 

117 def __init__( 

118 self, 

119 entity: Any, 

120 authorization_result: AuthorizationResult, 

121 attempted_action: str, 

122 user: Any, 

123 ): 

124 self.entity = entity 

125 self.authorization_result = authorization_result 

126 self.attempted_action = attempted_action 

127 self.user = user 

128 

129 super().__init__(authorization_result.get_user_friendly_message()) 

130 

131 

132# Convenience function for backward compatibility 

133 

134 

135def check_content_authorization( 

136 content: Content, user: Any, action: str, **context 

137) -> AuthorizationResult: 

138 """Check authorization for a Content entity.""" 

139 checker = CELAuthorizationChecker(content) 

140 return checker.check_authorization(user, action, **context)