Coverage for postrfp/ref/handlers/administrators.py: 24%

46 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2HTTP endpoints for administrative functions, including CEL policy management 

3""" 

4 

5from sqlalchemy.orm import Session 

6 

7from postrfp.authorisation import perms 

8from postrfp.shared.decorators import http 

9from postrfp.shared.serial.refmodels import PolicyRequest, PolicyResponse 

10from postrfp.model import User, Content, Subject 

11from postrfp.model.ref import ContentSpec 

12from ..service.auth_service import ( 

13 update_content_policy, 

14 update_subject_policy, 

15 update_content_spec_policy, 

16) 

17 

18 

19@http 

20def post_policy( 

21 session: Session, user: User, grant_data: PolicyRequest 

22) -> PolicyResponse: 

23 """ 

24 Update a CEL authorization policy on a content item, content spec, or subject. 

25 

26 **Request Body:** Policy data including: 

27 - `entity`: Type of entity ("Content", "ContentSpec", or "Subject") 

28 - `entity_id`: ID of the entity to update policy for 

29 - `policy`: CEL expression for authorization 

30 

31 **Returns:** Policy update response with success status and details 

32 

33 **Example CEL Policies:** 

34 - `entity.author_org_id == user.org.id`: Author organization access 

35 - `'admin' in user.roles`: Role-based access 

36 

37 **@permissions REF_MANAGE_PERMISSIONS** 

38 """ 

39 user.check_permission(perms.REF_MANAGE_PERMISSIONS) 

40 

41 try: 

42 # Policy is required for post operations 

43 if grant_data.policy is None: 

44 raise ValueError("Policy is required for policy updates") 

45 

46 # Route to appropriate service function based on entity type 

47 if grant_data.entity_type == "Content": 

48 success, result = update_content_policy( 

49 session, grant_data.entity_id, grant_data.policy 

50 ) 

51 entity_name = f"Content {grant_data.entity_id}" 

52 

53 elif grant_data.entity_type == "ContentSpec": 

54 success, result = update_content_spec_policy( 

55 session, grant_data.entity_id, grant_data.policy 

56 ) 

57 entity_name = f"ContentSpec {grant_data.entity_id}" 

58 

59 elif grant_data.entity_type == "Subject": 

60 success, result = update_subject_policy( 

61 session, grant_data.entity_id, grant_data.policy 

62 ) 

63 entity_name = f"Subject {grant_data.entity_id}" 

64 

65 else: 

66 raise ValueError( 

67 f"Invalid entity type: {grant_data.entity_id}. " 

68 f"Valid types are: Content, ContentSpec, Subject" 

69 ) 

70 

71 return PolicyResponse( 

72 success=success, 

73 entity_type=grant_data.entity_type, 

74 entity_id=grant_data.entity_id, 

75 entity_name=entity_name, 

76 policy=grant_data.policy, 

77 operation_performed="policy_updated" if success else None, 

78 ) 

79 

80 except Exception as e: 

81 return PolicyResponse( 

82 success=False, 

83 error=str(e), 

84 entity_type=grant_data.entity_type, 

85 entity_id=grant_data.entity_id, 

86 ) 

87 

88 

89@http 

90def delete_policy( 

91 session: Session, user: User, grant_data: PolicyRequest 

92) -> PolicyResponse: 

93 """ 

94 Remove a CEL authorization policy from an entity (sets policy to None). 

95 

96 **Request Body:** Policy data including: 

97 - `entity`: Type of entity ("Content", "ContentSpec", or "Subject") 

98 - `entity_id`: ID of the entity to remove policy from 

99 

100 **Returns:** Policy removal response with success status 

101 

102 **@permissions REF_MANAGE_PERMISSIONS** 

103 """ 

104 user.check_permission(perms.REF_MANAGE_PERMISSIONS) 

105 

106 try: 

107 # Route to appropriate entity and remove policy 

108 if grant_data.entity_type == "Content": 

109 content = session.get_one(Content, grant_data.entity_id) 

110 content.auth_policy = None 

111 entity_name = content.title 

112 

113 elif grant_data.entity_type == "ContentSpec": 

114 content_spec = session.get_one(ContentSpec, grant_data.entity_id) 

115 content_spec.auth_policy = None 

116 entity_name = content_spec.name 

117 

118 elif grant_data.entity_type == "Subject": 

119 subject = session.get_one(Subject, grant_data.entity_id) 

120 subject.auth_policy = None 

121 entity_name = subject.name 

122 

123 else: 

124 raise ValueError( 

125 f"Invalid entity type: {grant_data.entity}. " 

126 f"Valid types are: Content, ContentSpec, Subject" 

127 ) 

128 

129 return PolicyResponse( 

130 success=True, 

131 entity_type=grant_data.entity_type, 

132 entity_id=grant_data.entity_id, 

133 entity_name=entity_name, 

134 ) 

135 

136 except Exception as e: 

137 return PolicyResponse( 

138 success=False, 

139 error=str(e), 

140 entity_type=grant_data.entity_type, 

141 entity_id=grant_data.entity_id, 

142 )