Coverage for postrfp/buyer/api/authorise.py: 100%

118 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1from dataclasses import dataclass 

2 

3from postrfp.authorisation.errors import AuthorisationFailures 

4from postrfp.authorisation.actions import ( 

5 ADMIN_ACTIONS, 

6 ISSUE_ACTIONS, 

7 ISSUE_STATUS_ACTIONS, 

8 PROJECT_ACTIONS, 

9 QUESTIONNAIRE_ACTIONS, 

10) 

11from postrfp.authorisation import perms 

12from postrfp.shared.exceptions import AuthorizationFailure 

13from postrfp.model.humans import ( 

14 ConsultantOrganisation, 

15 Organisation, 

16 OrganisationType, 

17 User, 

18) 

19from postrfp.model.issue import Issue 

20from postrfp.model.project import Project 

21from postrfp.shared.types import PermString 

22 

23 

24@dataclass 

25class AuthContext: 

26 user: User 

27 errors: AuthorisationFailures 

28 project: Project | None = None 

29 issue: Issue | None = None 

30 multiproject: bool = False 

31 section_id: int | None = None 

32 target_user: User | None = None 

33 target_org: Organisation | None = None 

34 deny_restricted: bool = True 

35 

36 

37def check( 

38 user: User, 

39 action: PermString, 

40 project: Project | None = None, 

41 issue: Issue | None = None, 

42 multiproject: bool = False, 

43 section_id: int | None = None, 

44 target_user: User | None = None, 

45 target_org: Organisation | None = None, 

46 deny_restricted: bool = True, 

47) -> None: 

48 """ 

49 Centralised statement of auth business logic, all API calls should invoke this 

50 

51 @param multiproject - some actions cover mutliple projects, thus it's not possible 

52 to pass the Project into this function 

53 @param deny_restricted - if True, immediately deny any action for Restricted users 

54 @param target_user - the user being administered, if any 

55 @param target_org - the organisation being administered, if any 

56 """ 

57 

58 errors = AuthorisationFailures(action) 

59 context = AuthContext( 

60 user=user, 

61 errors=errors, 

62 project=project, 

63 issue=issue, 

64 multiproject=multiproject, 

65 section_id=section_id, 

66 target_user=target_user, 

67 target_org=target_org, 

68 deny_restricted=deny_restricted, 

69 ) 

70 

71 apply_rules(action, context) 

72 

73 

74def apply_rules(action: PermString, context: AuthContext) -> None: 

75 """ 

76 Apply the relevant validation rules based on the context. 

77 

78 Some actions may exist in multiple _ACTIONS categories so more than one set of 

79 rules may be applied. 

80 

81 Errors are collected in the context and raised as a single exception at the end. 

82 """ 

83 

84 # 1 Is the action a valid permission? (fail fast) 

85 validate_action(action) 

86 

87 # 2 Is the user restricted and is this action allowed for restricted users? 

88 enforce_restricted_user_guard(context) 

89 

90 # Successive checks may add errors to the context 

91 

92 # 3 Do the user's assigned roles include the correct permission for this action? 

93 ensure_user_permission(action, context) 

94 

95 if action in PROJECT_ACTIONS: 

96 # 4 Is the user's organisation a participant in the project with the required permission? 

97 # 5 Is the action permitted at the project's current status? 

98 # 6 Are there any project-specific business rules preventing this action? 

99 enforce_project_rules(action, context) 

100 

101 if action in QUESTIONNAIRE_ACTIONS: 

102 # 7 If the user is restricted, do they have permission to access the specified section? 

103 # 8 If a section ID is provided, does it belong to the specified project? 

104 enforce_questionnaire_rules(action, context) 

105 

106 if action in ISSUE_ACTIONS: 

107 # 9 Is the action permitted at the issue's current status (or is it a new issue)? 

108 enforce_issue_rules(action, context) 

109 

110 if action in ADMIN_ACTIONS: 

111 # 10 Does the user have permission to administer the target organisation or user? 

112 enforce_admin_rules(context) 

113 

114 if context.errors.has_errors: 

115 raise AuthorizationFailure(errors=context.errors) 

116 

117 

118def validate_action(action: PermString) -> None: 

119 if action not in perms.ALL_PERMISSIONS: 

120 raise ValueError(f"Action {action} is not associated with a valid permission") 

121 

122 

123def enforce_restricted_user_guard(context: AuthContext) -> None: 

124 if context.deny_restricted and context.user.is_restricted: 

125 context.errors.auth_failure( 

126 "Action not permitted for Restricted (Domain Expert) users" 

127 ) 

128 

129 

130def ensure_user_permission(action: PermString, context: AuthContext) -> None: 

131 if not context.user.has_permission(action): 

132 context.errors.auth_failure(f"User lacks permission for {action}") 

133 

134 

135def enforce_project_rules(action: PermString, context: AuthContext) -> None: 

136 if context.multiproject: 

137 return 

138 

139 project = require_project(context.project, action) 

140 verify_project_membership(context.user, project, action, context.errors) 

141 verify_restricted_project_access(context.user, project, context.errors) 

142 verify_project_status_allows_action(project, action, context.errors) 

143 if action == perms.ISSUE_VIEW_ANSWERS: 

144 verify_can_view_answers(context.user, project, context.errors) 

145 

146 

147def enforce_questionnaire_rules(action: PermString, context: AuthContext) -> None: 

148 if context.user.is_restricted: 

149 section_id = require_section_id(context.section_id) 

150 if not context.user.can_view_section_id(section_id): 

151 context.errors.auth_failure( 

152 f"User has not been granted permission to section {section_id}" 

153 ) 

154 

155 if context.section_id is not None: 

156 project = require_project(context.project, action) 

157 if not project.contains_section_id(context.section_id): 

158 context.errors.auth_failure( 

159 f"Section ID {context.section_id} does not belong to project {project.id} ({project.title})" 

160 ) 

161 

162 

163def enforce_issue_rules(action: PermString, context: AuthContext) -> None: 

164 if context.issue is None: 

165 allowed_actions = ISSUE_STATUS_ACTIONS["__new__"] | PROJECT_ACTIONS 

166 if action not in allowed_actions: 

167 context.errors.auth_failure( 

168 f"Action '{action}' not permitted for a new Issue " 

169 ) 

170 return 

171 

172 if action not in ISSUE_STATUS_ACTIONS.get(context.issue.status, set()): 

173 context.errors.invalid_issue_status(context.issue.status) 

174 

175 

176def enforce_admin_rules(context: AuthContext) -> None: 

177 resolved_org = resolve_admin_target(context.target_org, context.target_user) 

178 

179 if context.user.organisation is resolved_org: 

180 return 

181 

182 if not context.user.organisation.is_consultant: 

183 context.errors.auth_failure( 

184 "Only consultants can perform operations on users in a different org" 

185 ) 

186 return 

187 

188 consultant_org = context.user.organisation 

189 assert isinstance(consultant_org, ConsultantOrganisation) 

190 

191 if ( 

192 resolved_org is not consultant_org 

193 and resolved_org.type is not OrganisationType.RESPONDENT 

194 and resolved_org not in consultant_org.clients 

195 ): 

196 context.errors.auth_failure( 

197 f"Organisation {resolved_org.id} is not a client of consultant {context.user.org_id}" 

198 ) 

199 

200 

201def require_project(project: Project | None, action: PermString) -> Project: 

202 if project is None: 

203 raise ValueError(f"project must be provided if checking for action '{action}'") 

204 return project 

205 

206 

207def require_section_id(section_id: int | None) -> int: 

208 if section_id is None: 

209 raise ValueError( 

210 "section_id must be provided to validate() for QUESTIONNAIRE ACTION with a Restricted User" 

211 ) 

212 return section_id 

213 

214 

215def verify_project_membership( 

216 user: User, project: Project, action: PermString, errors: AuthorisationFailures 

217) -> None: 

218 if user.org_id == project.org_id: 

219 return 

220 

221 if user.organisation not in project.participants: 

222 errors.auth_failure( 

223 f"{user.organisation} is not a participant in project {project.title}" 

224 ) 

225 return 

226 

227 if action not in project.participant_role_permissions(user): 

228 errors.auth_failure( 

229 f"Participant {user.organisation} lacks permission to {action} in project {project}" 

230 ) 

231 

232 

233def verify_restricted_project_access( 

234 user: User, project: Project, errors: AuthorisationFailures 

235) -> None: 

236 if user.is_restricted and user not in project.restricted_users: 

237 errors.auth_failure("User not granted permissions to this project") 

238 

239 

240def verify_project_status_allows_action( 

241 project: Project, action: PermString, errors: AuthorisationFailures 

242) -> None: 

243 if action not in project.actions_for_current_status(): 

244 errors.invalid_project_status(project.status_name) 

245 

246 

247def resolve_admin_target( 

248 target_org: Organisation | None, target_user: User | None 

249) -> Organisation: 

250 if target_org is None and target_user is None: 

251 raise ValueError( 

252 "Either target_org or target_user must be set for admin validation" 

253 ) 

254 

255 if target_org is None: 

256 assert target_user is not None 

257 return target_user.organisation 

258 

259 if target_user is not None and target_user.organisation is not target_org: 

260 raise ValueError( 

261 f"Target org '{target_org.id}' must be the same as that of user: {target_user.id}, ({target_user.org_id})" 

262 ) 

263 

264 return target_org 

265 

266 

267def verify_can_view_answers( 

268 user: User, project: Project, errors: AuthorisationFailures 

269) -> None: 

270 if project.lock_issues: 

271 errors.auth_failure( 

272 "Cannot view answers while the Project is set to Lock Responses" 

273 )