Coverage for postrfp/buyer/api/authorise.py: 100%
118 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1from dataclasses import dataclass
3from postrfp.authorisation.errors import AuthorisationFailures
4from postrfp.authorisation.actions import (
5 ADMIN_ACTIONS,
6 ISSUE_ACTIONS,
7 ISSUE_STATUS_ACTIONS,
8 PROJECT_ACTIONS,
9 QUESTIONNAIRE_ACTIONS,
10)
11from postrfp.authorisation import perms
12from postrfp.shared.exceptions import AuthorizationFailure
13from postrfp.model.humans import (
14 ConsultantOrganisation,
15 Organisation,
16 OrganisationType,
17 User,
18)
19from postrfp.model.issue import Issue
20from postrfp.model.project import Project
21from postrfp.shared.types import PermString
24@dataclass
25class AuthContext:
26 user: User
27 errors: AuthorisationFailures
28 project: Project | None = None
29 issue: Issue | None = None
30 multiproject: bool = False
31 section_id: int | None = None
32 target_user: User | None = None
33 target_org: Organisation | None = None
34 deny_restricted: bool = True
37def check(
38 user: User,
39 action: PermString,
40 project: Project | None = None,
41 issue: Issue | None = None,
42 multiproject: bool = False,
43 section_id: int | None = None,
44 target_user: User | None = None,
45 target_org: Organisation | None = None,
46 deny_restricted: bool = True,
47) -> None:
48 """
49 Centralised statement of auth business logic, all API calls should invoke this
51 @param multiproject - some actions cover mutliple projects, thus it's not possible
52 to pass the Project into this function
53 @param deny_restricted - if True, immediately deny any action for Restricted users
54 @param target_user - the user being administered, if any
55 @param target_org - the organisation being administered, if any
56 """
58 errors = AuthorisationFailures(action)
59 context = AuthContext(
60 user=user,
61 errors=errors,
62 project=project,
63 issue=issue,
64 multiproject=multiproject,
65 section_id=section_id,
66 target_user=target_user,
67 target_org=target_org,
68 deny_restricted=deny_restricted,
69 )
71 apply_rules(action, context)
74def apply_rules(action: PermString, context: AuthContext) -> None:
75 """
76 Apply the relevant validation rules based on the context.
78 Some actions may exist in multiple _ACTIONS categories so more than one set of
79 rules may be applied.
81 Errors are collected in the context and raised as a single exception at the end.
82 """
84 # 1 Is the action a valid permission? (fail fast)
85 validate_action(action)
87 # 2 Is the user restricted and is this action allowed for restricted users?
88 enforce_restricted_user_guard(context)
90 # Successive checks may add errors to the context
92 # 3 Do the user's assigned roles include the correct permission for this action?
93 ensure_user_permission(action, context)
95 if action in PROJECT_ACTIONS:
96 # 4 Is the user's organisation a participant in the project with the required permission?
97 # 5 Is the action permitted at the project's current status?
98 # 6 Are there any project-specific business rules preventing this action?
99 enforce_project_rules(action, context)
101 if action in QUESTIONNAIRE_ACTIONS:
102 # 7 If the user is restricted, do they have permission to access the specified section?
103 # 8 If a section ID is provided, does it belong to the specified project?
104 enforce_questionnaire_rules(action, context)
106 if action in ISSUE_ACTIONS:
107 # 9 Is the action permitted at the issue's current status (or is it a new issue)?
108 enforce_issue_rules(action, context)
110 if action in ADMIN_ACTIONS:
111 # 10 Does the user have permission to administer the target organisation or user?
112 enforce_admin_rules(context)
114 if context.errors.has_errors:
115 raise AuthorizationFailure(errors=context.errors)
118def validate_action(action: PermString) -> None:
119 if action not in perms.ALL_PERMISSIONS:
120 raise ValueError(f"Action {action} is not associated with a valid permission")
123def enforce_restricted_user_guard(context: AuthContext) -> None:
124 if context.deny_restricted and context.user.is_restricted:
125 context.errors.auth_failure(
126 "Action not permitted for Restricted (Domain Expert) users"
127 )
130def ensure_user_permission(action: PermString, context: AuthContext) -> None:
131 if not context.user.has_permission(action):
132 context.errors.auth_failure(f"User lacks permission for {action}")
135def enforce_project_rules(action: PermString, context: AuthContext) -> None:
136 if context.multiproject:
137 return
139 project = require_project(context.project, action)
140 verify_project_membership(context.user, project, action, context.errors)
141 verify_restricted_project_access(context.user, project, context.errors)
142 verify_project_status_allows_action(project, action, context.errors)
143 if action == perms.ISSUE_VIEW_ANSWERS:
144 verify_can_view_answers(context.user, project, context.errors)
147def enforce_questionnaire_rules(action: PermString, context: AuthContext) -> None:
148 if context.user.is_restricted:
149 section_id = require_section_id(context.section_id)
150 if not context.user.can_view_section_id(section_id):
151 context.errors.auth_failure(
152 f"User has not been granted permission to section {section_id}"
153 )
155 if context.section_id is not None:
156 project = require_project(context.project, action)
157 if not project.contains_section_id(context.section_id):
158 context.errors.auth_failure(
159 f"Section ID {context.section_id} does not belong to project {project.id} ({project.title})"
160 )
163def enforce_issue_rules(action: PermString, context: AuthContext) -> None:
164 if context.issue is None:
165 allowed_actions = ISSUE_STATUS_ACTIONS["__new__"] | PROJECT_ACTIONS
166 if action not in allowed_actions:
167 context.errors.auth_failure(
168 f"Action '{action}' not permitted for a new Issue "
169 )
170 return
172 if action not in ISSUE_STATUS_ACTIONS.get(context.issue.status, set()):
173 context.errors.invalid_issue_status(context.issue.status)
176def enforce_admin_rules(context: AuthContext) -> None:
177 resolved_org = resolve_admin_target(context.target_org, context.target_user)
179 if context.user.organisation is resolved_org:
180 return
182 if not context.user.organisation.is_consultant:
183 context.errors.auth_failure(
184 "Only consultants can perform operations on users in a different org"
185 )
186 return
188 consultant_org = context.user.organisation
189 assert isinstance(consultant_org, ConsultantOrganisation)
191 if (
192 resolved_org is not consultant_org
193 and resolved_org.type is not OrganisationType.RESPONDENT
194 and resolved_org not in consultant_org.clients
195 ):
196 context.errors.auth_failure(
197 f"Organisation {resolved_org.id} is not a client of consultant {context.user.org_id}"
198 )
201def require_project(project: Project | None, action: PermString) -> Project:
202 if project is None:
203 raise ValueError(f"project must be provided if checking for action '{action}'")
204 return project
207def require_section_id(section_id: int | None) -> int:
208 if section_id is None:
209 raise ValueError(
210 "section_id must be provided to validate() for QUESTIONNAIRE ACTION with a Restricted User"
211 )
212 return section_id
215def verify_project_membership(
216 user: User, project: Project, action: PermString, errors: AuthorisationFailures
217) -> None:
218 if user.org_id == project.org_id:
219 return
221 if user.organisation not in project.participants:
222 errors.auth_failure(
223 f"{user.organisation} is not a participant in project {project.title}"
224 )
225 return
227 if action not in project.participant_role_permissions(user):
228 errors.auth_failure(
229 f"Participant {user.organisation} lacks permission to {action} in project {project}"
230 )
233def verify_restricted_project_access(
234 user: User, project: Project, errors: AuthorisationFailures
235) -> None:
236 if user.is_restricted and user not in project.restricted_users:
237 errors.auth_failure("User not granted permissions to this project")
240def verify_project_status_allows_action(
241 project: Project, action: PermString, errors: AuthorisationFailures
242) -> None:
243 if action not in project.actions_for_current_status():
244 errors.invalid_project_status(project.status_name)
247def resolve_admin_target(
248 target_org: Organisation | None, target_user: User | None
249) -> Organisation:
250 if target_org is None and target_user is None:
251 raise ValueError(
252 "Either target_org or target_user must be set for admin validation"
253 )
255 if target_org is None:
256 assert target_user is not None
257 return target_user.organisation
259 if target_user is not None and target_user.organisation is not target_org:
260 raise ValueError(
261 f"Target org '{target_org.id}' must be the same as that of user: {target_user.id}, ({target_user.org_id})"
262 )
264 return target_org
267def verify_can_view_answers(
268 user: User, project: Project, errors: AuthorisationFailures
269) -> None:
270 if project.lock_issues:
271 errors.auth_failure(
272 "Cannot view answers while the Project is set to Lock Responses"
273 )