Coverage for postrfp/buyer/api/endpoints/auth.py: 95%
148 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2Users, roles and authentication
3"""
5from typing import Optional
6from uuid import uuid4
8from sqlalchemy.orm import Session
10from postrfp.authorisation import actions, perms
11from postrfp.shared.exceptions import AuthorizationFailure
12from postrfp.shared import fetch, serial
13from postrfp.shared.decorators import http
14from postrfp.authorisation.roles import ROLES
15from postrfp.buyer.api import authorise
16from postrfp.model.humans import (
17 BuyerOrganisation,
18 ConsultantOrganisation,
19 User,
20 CustomRole,
21 UserRole,
22 Organisation,
23 OrganisationType,
24)
25from postrfp.model.audit import AuditEvent
26from postrfp.model.exc import DuplicateDataProvided
29@http
30def get_role(session: Session, role_id: str) -> list[str]:
31 """
32 List permissions associated with the provided Role
33 """
34 if role_id in ROLES:
35 return sorted(ROLES[role_id])
36 custom_role = session.query(CustomRole).filter(CustomRole.name == role_id).one()
37 return sorted(p.permission_id for p in custom_role.permissions)
40@http
41def get_roles(session: Session, user: User) -> list[str]:
42 """
43 List all available roles - built-in and custom (defined by the current users organisation)
44 """
45 roles = list(ROLES.keys())
46 for role in session.query(CustomRole).filter(CustomRole.org_id == user.org_id):
47 roles.append(role.name)
48 return roles
51@http
52def get_permissions(entity: Optional[str] = None) -> list[str]:
53 """
54 List all available permissions, optionally filtered by the entity type provided.
55 Valid entity types are 'project', 'issue' and 'admin'. If no entity type is provided
56 then all permissions are returned.
57 """
58 if entity is None:
59 return sorted(perms.ALL_PERMISSIONS)
61 match entity:
62 case "project":
63 return sorted(actions.PROJECT_ACTIONS)
64 case "issue":
65 return sorted(actions.ISSUE_ACTIONS)
66 case "admin":
67 return sorted(actions.ADMIN_ACTIONS)
68 case _:
69 return sorted(perms.ALL_PERMISSIONS)
72@http
73def get_users(
74 session: Session, user: User, q_org_id: Optional[str] = None
75) -> list[serial.BaseUser]:
76 """
77 List users belonging to the Organisation given by orgId. If this parameter is not provided
78 then users are returned for the current logged-in user's organisation.
79 """
80 if q_org_id is None or q_org_id == user.org_id:
81 org = user.organisation
82 else:
83 org = fetch.organisation(session, q_org_id)
84 authorise.check(user, perms.MANAGE_USERS, target_org=org)
85 return [serial.User.model_validate(u) for u in org.users]
88@http
89def get_user(session: Session, user: User, user_id: Optional[str]) -> serial.FullUser:
90 """
91 Fetch an existing User by ID
92 """
93 if not user_id:
94 target_user = user
95 else:
96 target_user = fetch.user(session, user_id)
97 if target_user.org_id != user.org_id:
98 authorise.check(user, perms.MANAGE_USERS, target_user=target_user)
99 return serial.FullUser.model_validate(target_user)
102@http
103def put_user(session: Session, user: User, user_doc: serial.EditableUser):
104 """
105 Update an existing user account
106 """
107 target_user = fetch.user(session, user_doc.id)
108 if user is not target_user:
109 authorise.check(user, action=perms.MANAGE_USERS, target_user=target_user)
111 evt = AuditEvent.create(
112 session,
113 "USER_UPDATED",
114 object_id=target_user.id,
115 user_id=user.id,
116 org_id=user.org_id,
117 )
118 session.add(evt)
119 for key in ("fullname", "email", "type"):
120 doc_val = getattr(user_doc, key)
121 if getattr(target_user, key) != doc_val:
122 evt.add_change(key, getattr(target_user, key), doc_val)
123 setattr(target_user, key, doc_val)
125 session.query(UserRole).filter(UserRole.user == target_user).delete()
126 session.flush()
127 for role in user_doc.roles:
128 evt.add_change("role assigned", "", role)
129 target_user.add_role(role)
132@http
133def post_user(
134 session: Session, user: User, user_doc: serial.EditableUser
135) -> serial.StringId:
136 """
137 Create a new user account.
139 User ID must be unique across the system. An HTTP 409 response is returned
140 if the user ID provided already exists in the database;
141 """
143 target_org_id = user_doc.org_id
144 if target_org_id is None:
145 organisation = user.organisation
146 else:
147 organisation = fetch.organisation(session, target_org_id)
149 authorise.check(user, action=perms.MANAGE_USERS, target_org=organisation)
151 new_user_id = user_doc.id
152 existing_user = session.get(User, new_user_id)
153 if existing_user is not None:
154 raise DuplicateDataProvided(f"User '{new_user_id}' already exits")
155 user = User(new_user_id)
156 user.fullname = user_doc.fullname
157 user.type = user_doc.type
158 user.email = user_doc.email
159 user.org_id = organisation.id
161 evt = AuditEvent.create(
162 session,
163 "USER_UPDATED",
164 object_id=user.id,
165 user_id=user.id,
166 org_id=user.org_id,
167 )
168 session.add(evt)
169 evt.add_change("fullname", "", user.fullname)
170 evt.add_change("email", "", user.email)
171 evt.add_change("org_id", "", organisation.id)
172 evt.add_change("type", "", user.type)
173 for role in user_doc.roles:
174 evt.add_change("role", "", role)
175 user.add_role(role)
176 session.add(user)
177 return serial.StringId(id=user.id)
180@http
181def delete_user(session: Session, user: User, user_id_doc: serial.UserId):
182 """
183 Delete the user account given by 'id' in the body document.
185 If the user ID is that of the current user an HTTP 400 Error reponse is returned -
186 a user cannot delete themselves via this method.
187 """
188 user_id = user_id_doc.id
189 target_user = fetch.user(session, user_id)
190 if user is target_user:
191 raise ValueError("A user cannot delete their own account")
192 authorise.check(user, action=perms.MANAGE_USERS, target_user=target_user)
194 session.delete(target_user)
195 evt = AuditEvent.create(
196 session,
197 "USER_DELETED",
198 object_id=target_user.id,
199 user_id=user.id,
200 org_id=user.org_id,
201 )
202 evt.add_change("User", user_id, None)
203 session.add(evt)
206@http
207def get_organisations(
208 session: Session, user: User, org_type: str
209) -> list[serial.BaseOrganisation]:
210 """
211 Get an array of organisations. The type of organisation returned depends on the value of the
212 query parameter 'orgType':
213 - 'vendors', the default, provides an array of vendor(supplier) organisations that have been
214 invited to respond to Projects issued by the current users' organisation.
215 - 'clients' - get buyer organisations that are clients of current user's organisation. This
216 option is only valid for Consultant organisations.
217 """
218 authorise.check(
219 user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation
220 )
221 fo = serial.BaseOrganisation.model_validate
222 if org_type == "vendors":
223 return [
224 fo(org)
225 for org in user.organisation.suppliers.filter(
226 Organisation.type == OrganisationType.RESPONDENT
227 )
228 ]
229 if org_type == "clients":
230 if not user.organisation.is_consultant:
231 raise AuthorizationFailure("Action only permitted for Consultant Users")
232 assert isinstance(user.organisation, ConsultantOrganisation)
233 return [fo(org) for org in user.organisation.clients]
236@http
237def get_organisation(
238 session: Session, user: User, q_org_id: Optional[str]
239) -> serial.OrgWithUsers:
240 """
241 Fetch organisation details together with an array of users.
243 Consultant users can provide an "orgId" query parameter to fetch details for a client
244 organisation.
245 """
246 if not q_org_id:
247 org = user.organisation
248 else:
249 org = fetch.organisation(session, q_org_id)
250 authorise.check(user, action=perms.MANAGE_ORGANISATION, target_org=org)
251 return serial.OrgWithUsers.model_validate(org)
254@http
255def post_client(
256 session: Session, user: User, new_client_doc: serial.NewClient
257) -> serial.OrgWithUsers:
258 """
259 Create a new organisation. This operation is only valid for Consultant users creating
260 Buyer organisations for their clients.
261 """
262 authorise.check(
263 user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation
264 )
265 assert isinstance(user.organisation, ConsultantOrganisation)
266 org = BuyerOrganisation(str(uuid4()))
267 org.name = new_client_doc.org_name
268 org.domain_name = new_client_doc.domain_name
269 client_user = User(
270 new_client_doc.administrator_email,
271 fullname=new_client_doc.administrator_name,
272 email=new_client_doc.administrator_email,
273 )
274 client_user.add_role("Administrator")
275 org.users.append(client_user)
276 user.organisation.clients.append(org)
277 return get_organisation(session, user, q_org_id=org.id)
280@http
281def put_organisation(
282 session: Session, user: User, org_doc: serial.Organisation
283) -> serial.BaseOrganisation:
284 """
285 Update name, password_expiry and public fields for the given organisation.
286 """
287 if org_doc.id == user.org_id:
288 org = user.organisation
289 else:
290 org = fetch.organisation(session, org_doc.id)
291 authorise.check(user, action=perms.MANAGE_ORGANISATION, target_org=org)
292 org.name = org_doc.name
293 org.domain_name = org_doc.domain_name
294 org.password_expiry = org_doc.password_expiry
295 org.public = org_doc.public
296 return serial.BaseOrganisation.model_validate(org)