Coverage for postrfp/vendor/api/users.py: 98%
116 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2Manage user accounts and issue watchlists
3"""
5from postrfp.authorisation import perms
6from postrfp.shared.exceptions import AuthorizationFailure
7from postrfp.model.notify import IssueWatchList
8from typing import List
10from sqlalchemy import select
11from sqlalchemy.orm import Session
13from postrfp.shared import fetch
14from postrfp.shared.decorators import http
15from postrfp.authorisation.roles import ROLES
16from postrfp.model import AuditEvent, User, CustomRole, UserRole
17from postrfp.shared import serial
18from postrfp.vendor.validation import validate
21@http
22def get_users(session: Session, effective_user: User) -> List[serial.BaseUser]:
23 """
24 List users in the caller's organisation (respondent scope). Returns basic identity fields.
25 """
26 q = session.query(User).filter(User.organisation == effective_user.organisation)
27 return [serial.BaseUser.model_validate(u) for u in q]
30@http
31def get_whoami(user: User) -> serial.FullUser:
32 """
33 Return the authenticated user's full profile (roles & derived permissions).
34 """
35 return serial.FullUser.model_validate(user)
38@http
39def get_user(
40 session: Session, effective_user: User, target_user: User
41) -> serial.FullUser:
42 """
43 Retrieve a single user in the same organisation. Rejects cross‑organisation access.
44 """
45 if target_user.org_id != effective_user.org_id:
46 m = f"User {target_user.id} does not belong to {effective_user.org_id}"
47 raise AuthorizationFailure(m)
48 return serial.FullUser.model_validate(target_user)
51@http
52def put_user(session: Session, effective_user: User, user_doc: serial.EditableUser):
53 """
54 Update a user's name, email and roles. User may self‑update; otherwise MANAGE_USERS required.
55 Emits USER_UPDATED audit event with per‑field change records.
56 """
57 if effective_user.id != user_doc.id:
58 validate(effective_user, action=perms.MANAGE_USERS)
60 stmt = select(User).where(User.id == user_doc.id)
61 user: User = session.scalars(stmt).one()
63 if user.org_id != effective_user.org_id:
64 m = "It is not possible to manage users from another organisation"
65 raise AuthorizationFailure(m)
67 evt = AuditEvent.create(
68 session,
69 "USER_UPDATED",
70 object_id=user.id,
71 user_id=effective_user.id,
72 org_id=user.org_id,
73 )
74 session.add(evt)
75 for key in ("fullname", "email"):
76 doc_val = getattr(user_doc, key)
77 if getattr(user, key) != doc_val:
78 evt.add_change(key, getattr(user, key), doc_val)
79 setattr(user, key, doc_val)
81 session.query(UserRole).filter(UserRole.user == user).delete()
82 session.flush()
83 for role in user_doc.roles:
84 evt.add_change("role assigned", "", role)
85 user.add_role(role)
88@http
89def post_user(
90 session: Session, effective_user: User, user_doc: serial.EditableUser
91) -> serial.StringId:
92 """
93 Create a new user in the caller's organisation. Requires MANAGE_USERS.
94 Emits USER_UPDATED audit event capturing initial attributes & roles.
95 """
96 validate(effective_user, action=perms.MANAGE_USERS)
97 user = User(user_id=user_doc.id)
98 user.organisation = effective_user.organisation
99 user.fullname = user_doc.fullname
100 user.email = user_doc.email
102 evt = AuditEvent.create(
103 session,
104 "USER_UPDATED",
105 object_id=user.id,
106 user_id=effective_user.id,
107 org_id=user.org_id,
108 )
109 session.add(evt)
110 evt.add_change("fullname", "", user.fullname)
111 evt.add_change("email", "", user.email)
112 evt.add_change("org_id", "", user.org_id)
113 for role in user_doc.roles:
114 evt.add_change("role", "", role)
115 user.add_role(role)
116 session.add(user)
118 return serial.StringId(id=user.id)
121@http
122def delete_user(
123 session: Session, effective_user: User, user_doc: serial.EditableUser
124) -> serial.StringId:
125 """
126 Delete a user in the caller's organisation. Requires MANAGE_USERS.
127 Emits USER_DELETED audit event.
128 """
129 validate(effective_user, action=perms.MANAGE_USERS)
130 user = session.query(User).filter(User.id == user_doc.id).one()
131 if user.org_id != effective_user.org_id:
132 raise AuthorizationFailure("Cannot delete users from a different organisation")
133 session.delete(user)
134 evt = AuditEvent.create(
135 session,
136 "USER_DELETED",
137 object_id=user.id,
138 user_id=effective_user.id,
139 org_id=user.org_id,
140 )
141 session.add(evt)
143 return serial.StringId(id=user.id)
146@http
147def get_roles(session: Session, effective_user: User):
148 """
149 List available role names (built‑in plus organisation custom roles) for assignment.
150 """
151 roles = list(ROLES.keys())
152 for role in session.query(CustomRole).filter(
153 CustomRole.org_id == effective_user.org_id
154 ):
155 roles.append(role.name)
156 return roles
159@http
160def get_permissions() -> List[str]:
161 """
162 List all available permissions
163 """
164 return sorted(perms.ALL_PERMISSIONS)
167@http
168def get_issue_watchlist(
169 session: Session, effective_user: User, issue_id: int
170) -> serial.IssueWatchList:
171 """
172 Get watch list (users watching) for an Issue visible to the caller. Includes watch timestamps.
173 """
174 issue = fetch.issue(session, issue_id)
175 validate(effective_user, issue, action=perms.ISSUE_VIEW_ANSWERS)
176 ws = serial.Watcher
177 q = fetch.issue_watchers(session, issue)
178 watchers = [ws.model_validate(wl) for wl in q]
179 return serial.IssueWatchList(issue_id=int(issue.id), watchlist=watchers)
182def auth_wl_change(session, effective_user, issue_id, watchers_doc):
183 issue = fetch.issue(session, issue_id)
184 user_ids = {wl["targetUser"] for wl in watchers_doc}
185 if len(user_ids) == 1 and effective_user.id in user_ids:
186 # Any user can update their own watch list
187 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS)
188 else:
189 validate(effective_user, issue=None, action=perms.MANAGE_USERS)
190 return (user_ids, issue)
193@http
194def post_issue_watchers(
195 session: Session, effective_user: User, issue_id: int, watchers_doc: dict
196) -> List[int]:
197 """
198 Add users to an Issue's watch list. A user can add themselves; bulk changes require MANAGE_USERS.
199 Returns list of user IDs newly added.
200 """
201 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc)
202 added_ids = []
203 for user in issue.respondent.users:
204 if user.id in user_ids and user not in issue.watchers:
205 issue.watch_list.append(IssueWatchList(user=user))
206 added_ids.append(user.id)
207 return added_ids
210@http
211def delete_issue_watchers(
212 session: Session, effective_user: User, issue_id: int, watchers_doc: dict
213) -> List[int]:
214 """
215 Remove users from an Issue's watch list. A user can remove themselves; bulk changes require MANAGE_USERS.
216 Returns list of user IDs removed.
217 """
218 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc)
219 removed_ids = []
220 for user in issue.respondent.users:
221 if user.id in user_ids:
222 wl = issue.watch_list.filter_by(user_id=user.id).one()
223 session.delete(wl)
224 removed_ids.append(user.id)
225 return removed_ids