Coverage for postrfp/buyer/api/endpoints/auth.py: 95%

148 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Users, roles and authentication 

3""" 

4 

5from typing import Optional 

6from uuid import uuid4 

7 

8from sqlalchemy.orm import Session 

9 

10from postrfp.authorisation import actions, perms 

11from postrfp.shared.exceptions import AuthorizationFailure 

12from postrfp.shared import fetch, serial 

13from postrfp.shared.decorators import http 

14from postrfp.authorisation.roles import ROLES 

15from postrfp.buyer.api import authorise 

16from postrfp.model.humans import ( 

17 BuyerOrganisation, 

18 ConsultantOrganisation, 

19 User, 

20 CustomRole, 

21 UserRole, 

22 Organisation, 

23 OrganisationType, 

24) 

25from postrfp.model.audit import AuditEvent 

26from postrfp.model.exc import DuplicateDataProvided 

27 

28 

29@http 

30def get_role(session: Session, role_id: str) -> list[str]: 

31 """ 

32 List permissions associated with the provided Role 

33 """ 

34 if role_id in ROLES: 

35 return sorted(ROLES[role_id]) 

36 custom_role = session.query(CustomRole).filter(CustomRole.name == role_id).one() 

37 return sorted(p.permission_id for p in custom_role.permissions) 

38 

39 

40@http 

41def get_roles(session: Session, user: User) -> list[str]: 

42 """ 

43 List all available roles - built-in and custom (defined by the current users organisation) 

44 """ 

45 roles = list(ROLES.keys()) 

46 for role in session.query(CustomRole).filter(CustomRole.org_id == user.org_id): 

47 roles.append(role.name) 

48 return roles 

49 

50 

51@http 

52def get_permissions(entity: Optional[str] = None) -> list[str]: 

53 """ 

54 List all available permissions, optionally filtered by the entity type provided. 

55 Valid entity types are 'project', 'issue' and 'admin'. If no entity type is provided 

56 then all permissions are returned. 

57 """ 

58 if entity is None: 

59 return sorted(perms.ALL_PERMISSIONS) 

60 

61 match entity: 

62 case "project": 

63 return sorted(actions.PROJECT_ACTIONS) 

64 case "issue": 

65 return sorted(actions.ISSUE_ACTIONS) 

66 case "admin": 

67 return sorted(actions.ADMIN_ACTIONS) 

68 case _: 

69 return sorted(perms.ALL_PERMISSIONS) 

70 

71 

72@http 

73def get_users( 

74 session: Session, user: User, q_org_id: Optional[str] = None 

75) -> list[serial.BaseUser]: 

76 """ 

77 List users belonging to the Organisation given by orgId. If this parameter is not provided 

78 then users are returned for the current logged-in user's organisation. 

79 """ 

80 if q_org_id is None or q_org_id == user.org_id: 

81 org = user.organisation 

82 else: 

83 org = fetch.organisation(session, q_org_id) 

84 authorise.check(user, perms.MANAGE_USERS, target_org=org) 

85 return [serial.User.model_validate(u) for u in org.users] 

86 

87 

88@http 

89def get_user(session: Session, user: User, user_id: Optional[str]) -> serial.FullUser: 

90 """ 

91 Fetch an existing User by ID 

92 """ 

93 if not user_id: 

94 target_user = user 

95 else: 

96 target_user = fetch.user(session, user_id) 

97 if target_user.org_id != user.org_id: 

98 authorise.check(user, perms.MANAGE_USERS, target_user=target_user) 

99 return serial.FullUser.model_validate(target_user) 

100 

101 

102@http 

103def put_user(session: Session, user: User, user_doc: serial.EditableUser): 

104 """ 

105 Update an existing user account 

106 """ 

107 target_user = fetch.user(session, user_doc.id) 

108 if user is not target_user: 

109 authorise.check(user, action=perms.MANAGE_USERS, target_user=target_user) 

110 

111 evt = AuditEvent.create( 

112 session, 

113 "USER_UPDATED", 

114 object_id=target_user.id, 

115 user_id=user.id, 

116 org_id=user.org_id, 

117 ) 

118 session.add(evt) 

119 for key in ("fullname", "email", "type"): 

120 doc_val = getattr(user_doc, key) 

121 if getattr(target_user, key) != doc_val: 

122 evt.add_change(key, getattr(target_user, key), doc_val) 

123 setattr(target_user, key, doc_val) 

124 

125 session.query(UserRole).filter(UserRole.user == target_user).delete() 

126 session.flush() 

127 for role in user_doc.roles: 

128 evt.add_change("role assigned", "", role) 

129 target_user.add_role(role) 

130 

131 

132@http 

133def post_user( 

134 session: Session, user: User, user_doc: serial.EditableUser 

135) -> serial.StringId: 

136 """ 

137 Create a new user account. 

138 

139 User ID must be unique across the system. An HTTP 409 response is returned 

140 if the user ID provided already exists in the database; 

141 """ 

142 

143 target_org_id = user_doc.org_id 

144 if target_org_id is None: 

145 organisation = user.organisation 

146 else: 

147 organisation = fetch.organisation(session, target_org_id) 

148 

149 authorise.check(user, action=perms.MANAGE_USERS, target_org=organisation) 

150 

151 new_user_id = user_doc.id 

152 existing_user = session.get(User, new_user_id) 

153 if existing_user is not None: 

154 raise DuplicateDataProvided(f"User '{new_user_id}' already exits") 

155 user = User(new_user_id) 

156 user.fullname = user_doc.fullname 

157 user.type = user_doc.type 

158 user.email = user_doc.email 

159 user.org_id = organisation.id 

160 

161 evt = AuditEvent.create( 

162 session, 

163 "USER_UPDATED", 

164 object_id=user.id, 

165 user_id=user.id, 

166 org_id=user.org_id, 

167 ) 

168 session.add(evt) 

169 evt.add_change("fullname", "", user.fullname) 

170 evt.add_change("email", "", user.email) 

171 evt.add_change("org_id", "", organisation.id) 

172 evt.add_change("type", "", user.type) 

173 for role in user_doc.roles: 

174 evt.add_change("role", "", role) 

175 user.add_role(role) 

176 session.add(user) 

177 return serial.StringId(id=user.id) 

178 

179 

180@http 

181def delete_user(session: Session, user: User, user_id_doc: serial.UserId): 

182 """ 

183 Delete the user account given by 'id' in the body document. 

184 

185 If the user ID is that of the current user an HTTP 400 Error reponse is returned - 

186 a user cannot delete themselves via this method. 

187 """ 

188 user_id = user_id_doc.id 

189 target_user = fetch.user(session, user_id) 

190 if user is target_user: 

191 raise ValueError("A user cannot delete their own account") 

192 authorise.check(user, action=perms.MANAGE_USERS, target_user=target_user) 

193 

194 session.delete(target_user) 

195 evt = AuditEvent.create( 

196 session, 

197 "USER_DELETED", 

198 object_id=target_user.id, 

199 user_id=user.id, 

200 org_id=user.org_id, 

201 ) 

202 evt.add_change("User", user_id, None) 

203 session.add(evt) 

204 

205 

206@http 

207def get_organisations( 

208 session: Session, user: User, org_type: str 

209) -> list[serial.BaseOrganisation]: 

210 """ 

211 Get an array of organisations. The type of organisation returned depends on the value of the 

212 query parameter 'orgType': 

213 - 'vendors', the default, provides an array of vendor(supplier) organisations that have been 

214 invited to respond to Projects issued by the current users' organisation. 

215 - 'clients' - get buyer organisations that are clients of current user's organisation. This 

216 option is only valid for Consultant organisations. 

217 """ 

218 authorise.check( 

219 user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation 

220 ) 

221 fo = serial.BaseOrganisation.model_validate 

222 if org_type == "vendors": 

223 return [ 

224 fo(org) 

225 for org in user.organisation.suppliers.filter( 

226 Organisation.type == OrganisationType.RESPONDENT 

227 ) 

228 ] 

229 if org_type == "clients": 

230 if not user.organisation.is_consultant: 

231 raise AuthorizationFailure("Action only permitted for Consultant Users") 

232 assert isinstance(user.organisation, ConsultantOrganisation) 

233 return [fo(org) for org in user.organisation.clients] 

234 

235 

236@http 

237def get_organisation( 

238 session: Session, user: User, q_org_id: Optional[str] 

239) -> serial.OrgWithUsers: 

240 """ 

241 Fetch organisation details together with an array of users. 

242 

243 Consultant users can provide an "orgId" query parameter to fetch details for a client 

244 organisation. 

245 """ 

246 if not q_org_id: 

247 org = user.organisation 

248 else: 

249 org = fetch.organisation(session, q_org_id) 

250 authorise.check(user, action=perms.MANAGE_ORGANISATION, target_org=org) 

251 return serial.OrgWithUsers.model_validate(org) 

252 

253 

254@http 

255def post_client( 

256 session: Session, user: User, new_client_doc: serial.NewClient 

257) -> serial.OrgWithUsers: 

258 """ 

259 Create a new organisation. This operation is only valid for Consultant users creating 

260 Buyer organisations for their clients. 

261 """ 

262 authorise.check( 

263 user, action=perms.MANAGE_ORGANISATION, target_org=user.organisation 

264 ) 

265 assert isinstance(user.organisation, ConsultantOrganisation) 

266 org = BuyerOrganisation(str(uuid4())) 

267 org.name = new_client_doc.org_name 

268 org.domain_name = new_client_doc.domain_name 

269 client_user = User( 

270 new_client_doc.administrator_email, 

271 fullname=new_client_doc.administrator_name, 

272 email=new_client_doc.administrator_email, 

273 ) 

274 client_user.add_role("Administrator") 

275 org.users.append(client_user) 

276 user.organisation.clients.append(org) 

277 return get_organisation(session, user, q_org_id=org.id) 

278 

279 

280@http 

281def put_organisation( 

282 session: Session, user: User, org_doc: serial.Organisation 

283) -> serial.BaseOrganisation: 

284 """ 

285 Update name, password_expiry and public fields for the given organisation. 

286 """ 

287 if org_doc.id == user.org_id: 

288 org = user.organisation 

289 else: 

290 org = fetch.organisation(session, org_doc.id) 

291 authorise.check(user, action=perms.MANAGE_ORGANISATION, target_org=org) 

292 org.name = org_doc.name 

293 org.domain_name = org_doc.domain_name 

294 org.password_expiry = org_doc.password_expiry 

295 org.public = org_doc.public 

296 return serial.BaseOrganisation.model_validate(org)