Coverage for postrfp / ref / handlers / administrators.py: 100%

36 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1""" 

2HTTP endpoints for administrative functions, including CEL policy management 

3""" 

4 

5from sqlalchemy.orm import Session 

6 

7from postrfp.authorisation import perms 

8from postrfp.shared.decorators import http 

9from postrfp.shared.serial.refmodels import PolicyRequest, PolicyResponse 

10from postrfp.model import User, Content, Subject 

11from postrfp.model.ref import ContentSpec 

12from ..service.auth_service import ( 

13 update_content_policy, 

14 update_subject_policy, 

15 update_content_spec_policy, 

16) 

17 

18 

19@http 

20def post_policy( 

21 session: Session, user: User, grant_data: PolicyRequest 

22) -> PolicyResponse: 

23 """ 

24 Update a CEL authorization policy on a content item, content spec, or subject. 

25 

26 **Request Body:** Policy data including: 

27 - `entity`: Type of entity ("Content", "ContentSpec", or "Subject") 

28 - `entity_id`: ID of the entity to update policy for 

29 - `policy`: CEL expression for authorization 

30 

31 **Returns:** Policy update response with success status and details 

32 

33 **Example CEL Policies:** 

34 - `entity.author_org_id == user.org.id`: Author organization access 

35 - `'admin' in user.roles`: Role-based access 

36 

37 **@permissions REF_MANAGE_PERMISSIONS** 

38 """ 

39 user.check_permission(perms.REF_MANAGE_PERMISSIONS) 

40 

41 # Policy is required for post operations 

42 if grant_data.policy is None: 

43 raise ValueError("Policy is required for policy updates") 

44 

45 # Route to appropriate service function based on entity type 

46 if grant_data.entity_type == "Content": 

47 success, result = update_content_policy( 

48 session, grant_data.entity_id, grant_data.policy 

49 ) 

50 entity_name = f"Content {grant_data.entity_id}" 

51 

52 elif grant_data.entity_type == "ContentSpec": 

53 success, result = update_content_spec_policy( 

54 session, grant_data.entity_id, grant_data.policy 

55 ) 

56 entity_name = f"ContentSpec {grant_data.entity_id}" 

57 

58 else: # Subject 

59 success, result = update_subject_policy( 

60 session, grant_data.entity_id, grant_data.policy 

61 ) 

62 entity_name = f"Subject {grant_data.entity_id}" 

63 

64 return PolicyResponse( 

65 success=success, 

66 entity_type=grant_data.entity_type, 

67 entity_id=grant_data.entity_id, 

68 entity_name=entity_name, 

69 policy=grant_data.policy, 

70 operation_performed="policy_updated" if success else None, 

71 ) 

72 

73 

74@http 

75def delete_policy( 

76 session: Session, user: User, grant_data: PolicyRequest 

77) -> PolicyResponse: 

78 """ 

79 Remove a CEL authorization policy from an entity (sets policy to None). 

80 

81 **Request Body:** Policy data including: 

82 - `entity`: Type of entity ("Content", "ContentSpec", or "Subject") 

83 - `entity_id`: ID of the entity to remove policy from 

84 

85 **Returns:** Policy removal response with success status 

86 

87 **@permissions REF_MANAGE_PERMISSIONS** 

88 """ 

89 user.check_permission(perms.REF_MANAGE_PERMISSIONS) 

90 

91 # Route to appropriate entity and remove policy 

92 if grant_data.entity_type == "Content": 

93 content = session.get_one(Content, grant_data.entity_id) 

94 content.auth_policy = None 

95 entity_name = content.title 

96 

97 elif grant_data.entity_type == "ContentSpec": 

98 content_spec = session.get_one(ContentSpec, grant_data.entity_id) 

99 content_spec.auth_policy = None 

100 entity_name = content_spec.name 

101 

102 else: # Subject 

103 subject = session.get_one(Subject, grant_data.entity_id) 

104 subject.auth_policy = None 

105 entity_name = subject.name 

106 

107 return PolicyResponse( 

108 success=True, 

109 entity_type=grant_data.entity_type, 

110 entity_id=grant_data.entity_id, 

111 entity_name=entity_name, 

112 )