Coverage for postrfp/ref/service/auth_service.py: 82%

28 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2CEL-based authorization service for reference content management. 

3 

4This module provides simplified services for managing authorization policies, 

5replacing the complex permission management system. 

6""" 

7 

8from typing import Optional, Tuple 

9from sqlalchemy.orm import Session 

10 

11from postrfp.shared.serial.refmodels import PermissionUpdated 

12from postrfp.model import Subject, Content 

13from postrfp.model.ref import ContentSpec 

14 

15 

16def update_content_policy( 

17 session: Session, content_id: int, policy_expression: str 

18) -> Tuple[bool, PermissionUpdated]: 

19 """ 

20 Update the authorization policy for a content item. 

21 

22 Args: 

23 session: Database session 

24 content_id: Content ID 

25 policy_expression: CEL expression for authorization 

26 

27 Returns: 

28 tuple of (success flag, PermissionUpdated object) 

29 """ 

30 content = session.get_one(Content, content_id) 

31 content.auth_policy = policy_expression 

32 

33 # Create and return PermissionUpdated object 

34 return ( 

35 True, 

36 PermissionUpdated( 

37 operation_performed="updated", 

38 permission_name="policy_expression", 

39 entity_type="content", 

40 entity_name=content.title, 

41 target_org="system", # Since this is a system-level operation 

42 ), 

43 ) 

44 

45 

46def update_subject_policy( 

47 session: Session, subject_id: int, policy_expression: str 

48) -> Tuple[bool, PermissionUpdated]: 

49 """ 

50 Update the authorization policy for a subject. 

51 

52 Args: 

53 session: Database session 

54 subject_id: Subject ID 

55 policy_expression: CEL expression for authorization 

56 

57 Returns: 

58 tuple of (success flag, PermissionUpdated object) 

59 """ 

60 subject = session.get_one(Subject, subject_id) 

61 subject.auth_policy = policy_expression 

62 

63 return ( 

64 True, 

65 PermissionUpdated( 

66 operation_performed="updated", 

67 permission_name="policy_expression", 

68 entity_type="subject", 

69 entity_name=subject.name, 

70 target_org="system", 

71 ), 

72 ) 

73 

74 

75def update_content_spec_policy( 

76 session: Session, content_spec_id: int, policy_expression: str 

77) -> Tuple[bool, PermissionUpdated]: 

78 """ 

79 Update the authorization policy for a content spec. 

80 

81 Args: 

82 session: Database session 

83 content_spec_id: ContentSpec ID 

84 policy_expression: CEL expression for authorization 

85 

86 Returns: 

87 tuple of (success flag, PermissionUpdated object) 

88 """ 

89 content_spec = session.get_one(ContentSpec, content_spec_id) 

90 content_spec.auth_policy = policy_expression 

91 

92 return ( 

93 True, 

94 PermissionUpdated( 

95 operation_performed="updated", 

96 permission_name="policy_expression", 

97 entity_type="content_spec", 

98 entity_name=content_spec.name, 

99 target_org="system", 

100 ), 

101 ) 

102 

103 

104def get_content_policy(session: Session, content_id: int) -> Optional[str]: 

105 """ 

106 Get the authorization policy for a content item. 

107 

108 Args: 

109 session: Database session 

110 content_id: Content ID 

111 

112 Returns: 

113 CEL policy expression or None if not set 

114 """ 

115 content = session.get_one(Content, content_id) 

116 return content.auth_policy 

117 

118 

119def get_subject_policy(session: Session, subject_id: int) -> Optional[str]: 

120 """ 

121 Get the authorization policy for a subject. 

122 

123 Args: 

124 session: Database session 

125 subject_id: Subject ID 

126 

127 Returns: 

128 CEL policy expression or None if not set 

129 """ 

130 subject = session.get_one(Subject, subject_id) 

131 return subject.auth_policy 

132 

133 

134def get_content_spec_policy(session: Session, content_spec_id: int) -> Optional[str]: 

135 """ 

136 Get the authorization policy for a content spec. 

137 

138 Args: 

139 session: Database session 

140 content_spec_id: ContentSpec ID 

141 

142 Returns: 

143 CEL policy expression or None if not set 

144 """ 

145 content_spec = session.get_one(ContentSpec, content_spec_id) 

146 return content_spec.auth_policy 

147 

148 

149def get_policy_info() -> dict[str, list[str]]: 

150 """ 

151 Get information about available CEL policy features. 

152 

153 Returns: 

154 Dictionary with available policy features and examples 

155 """ 

156 return { 

157 "available_variables": [ 

158 "entity.author_org_id", 

159 "entity.visibility", 

160 "entity.managing_org_id", 

161 "user.org.id", 

162 "user.roles", 

163 "action", 

164 "now", 

165 ], 

166 "example_policies": [ 

167 "entity.author_org_id == user.org.id", 

168 "entity.visibility == 'public'", 

169 "entity.author_org_id == user.org.id || entity.visibility == 'public'", 

170 "'content_admin' in user.roles", 

171 "entity.managing_org_id == user.org.id || entity.subject_type == 'country'", 

172 ], 

173 "supported_actions": [ 

174 "view", 

175 "edit", 

176 "create", 

177 "delete", 

178 "comment", 

179 ], 

180 }