Coverage for postrfp/vendor/api/users.py: 98%

116 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2Manage user accounts and issue watchlists 

3""" 

4 

5from postrfp.authorisation import perms 

6from postrfp.shared.exceptions import AuthorizationFailure 

7from postrfp.model.notify import IssueWatchList 

8from typing import List 

9 

10from sqlalchemy import select 

11from sqlalchemy.orm import Session 

12 

13from postrfp.shared import fetch 

14from postrfp.shared.decorators import http 

15from postrfp.authorisation.roles import ROLES 

16from postrfp.model import AuditEvent, User, CustomRole, UserRole 

17from postrfp.shared import serial 

18from postrfp.vendor.validation import validate 

19 

20 

21@http 

22def get_users(session: Session, effective_user: User) -> List[serial.BaseUser]: 

23 """ 

24 List users in the caller's organisation (respondent scope). Returns basic identity fields. 

25 """ 

26 q = session.query(User).filter(User.organisation == effective_user.organisation) 

27 return [serial.BaseUser.model_validate(u) for u in q] 

28 

29 

30@http 

31def get_whoami(user: User) -> serial.FullUser: 

32 """ 

33 Return the authenticated user's full profile (roles & derived permissions). 

34 """ 

35 return serial.FullUser.model_validate(user) 

36 

37 

38@http 

39def get_user( 

40 session: Session, effective_user: User, target_user: User 

41) -> serial.FullUser: 

42 """ 

43 Retrieve a single user in the same organisation. Rejects cross‑organisation access. 

44 """ 

45 if target_user.org_id != effective_user.org_id: 

46 m = f"User {target_user.id} does not belong to {effective_user.org_id}" 

47 raise AuthorizationFailure(m) 

48 return serial.FullUser.model_validate(target_user) 

49 

50 

51@http 

52def put_user(session: Session, effective_user: User, user_doc: serial.EditableUser): 

53 """ 

54 Update a user's name, email and roles. User may self‑update; otherwise MANAGE_USERS required. 

55 Emits USER_UPDATED audit event with per‑field change records. 

56 """ 

57 if effective_user.id != user_doc.id: 

58 validate(effective_user, action=perms.MANAGE_USERS) 

59 

60 stmt = select(User).where(User.id == user_doc.id) 

61 user: User = session.scalars(stmt).one() 

62 

63 if user.org_id != effective_user.org_id: 

64 m = "It is not possible to manage users from another organisation" 

65 raise AuthorizationFailure(m) 

66 

67 evt = AuditEvent.create( 

68 session, 

69 "USER_UPDATED", 

70 object_id=user.id, 

71 user_id=effective_user.id, 

72 org_id=user.org_id, 

73 ) 

74 session.add(evt) 

75 for key in ("fullname", "email"): 

76 doc_val = getattr(user_doc, key) 

77 if getattr(user, key) != doc_val: 

78 evt.add_change(key, getattr(user, key), doc_val) 

79 setattr(user, key, doc_val) 

80 

81 session.query(UserRole).filter(UserRole.user == user).delete() 

82 session.flush() 

83 for role in user_doc.roles: 

84 evt.add_change("role assigned", "", role) 

85 user.add_role(role) 

86 

87 

88@http 

89def post_user( 

90 session: Session, effective_user: User, user_doc: serial.EditableUser 

91) -> serial.StringId: 

92 """ 

93 Create a new user in the caller's organisation. Requires MANAGE_USERS. 

94 Emits USER_UPDATED audit event capturing initial attributes & roles. 

95 """ 

96 validate(effective_user, action=perms.MANAGE_USERS) 

97 user = User(user_id=user_doc.id) 

98 user.organisation = effective_user.organisation 

99 user.fullname = user_doc.fullname 

100 user.email = user_doc.email 

101 

102 evt = AuditEvent.create( 

103 session, 

104 "USER_UPDATED", 

105 object_id=user.id, 

106 user_id=effective_user.id, 

107 org_id=user.org_id, 

108 ) 

109 session.add(evt) 

110 evt.add_change("fullname", "", user.fullname) 

111 evt.add_change("email", "", user.email) 

112 evt.add_change("org_id", "", user.org_id) 

113 for role in user_doc.roles: 

114 evt.add_change("role", "", role) 

115 user.add_role(role) 

116 session.add(user) 

117 

118 return serial.StringId(id=user.id) 

119 

120 

121@http 

122def delete_user( 

123 session: Session, effective_user: User, user_doc: serial.EditableUser 

124) -> serial.StringId: 

125 """ 

126 Delete a user in the caller's organisation. Requires MANAGE_USERS. 

127 Emits USER_DELETED audit event. 

128 """ 

129 validate(effective_user, action=perms.MANAGE_USERS) 

130 user = session.query(User).filter(User.id == user_doc.id).one() 

131 if user.org_id != effective_user.org_id: 

132 raise AuthorizationFailure("Cannot delete users from a different organisation") 

133 session.delete(user) 

134 evt = AuditEvent.create( 

135 session, 

136 "USER_DELETED", 

137 object_id=user.id, 

138 user_id=effective_user.id, 

139 org_id=user.org_id, 

140 ) 

141 session.add(evt) 

142 

143 return serial.StringId(id=user.id) 

144 

145 

146@http 

147def get_roles(session: Session, effective_user: User): 

148 """ 

149 List available role names (built‑in plus organisation custom roles) for assignment. 

150 """ 

151 roles = list(ROLES.keys()) 

152 for role in session.query(CustomRole).filter( 

153 CustomRole.org_id == effective_user.org_id 

154 ): 

155 roles.append(role.name) 

156 return roles 

157 

158 

159@http 

160def get_permissions() -> List[str]: 

161 """ 

162 List all available permissions 

163 """ 

164 return sorted(perms.ALL_PERMISSIONS) 

165 

166 

167@http 

168def get_issue_watchlist( 

169 session: Session, effective_user: User, issue_id: int 

170) -> serial.IssueWatchList: 

171 """ 

172 Get watch list (users watching) for an Issue visible to the caller. Includes watch timestamps. 

173 """ 

174 issue = fetch.issue(session, issue_id) 

175 validate(effective_user, issue, action=perms.ISSUE_VIEW_ANSWERS) 

176 ws = serial.Watcher 

177 q = fetch.issue_watchers(session, issue) 

178 watchers = [ws.model_validate(wl) for wl in q] 

179 return serial.IssueWatchList(issue_id=int(issue.id), watchlist=watchers) 

180 

181 

182def auth_wl_change(session, effective_user, issue_id, watchers_doc): 

183 issue = fetch.issue(session, issue_id) 

184 user_ids = {wl["targetUser"] for wl in watchers_doc} 

185 if len(user_ids) == 1 and effective_user.id in user_ids: 

186 # Any user can update their own watch list 

187 validate(effective_user, issue=issue, action=perms.ISSUE_VIEW_ANSWERS) 

188 else: 

189 validate(effective_user, issue=None, action=perms.MANAGE_USERS) 

190 return (user_ids, issue) 

191 

192 

193@http 

194def post_issue_watchers( 

195 session: Session, effective_user: User, issue_id: int, watchers_doc: dict 

196) -> List[int]: 

197 """ 

198 Add users to an Issue's watch list. A user can add themselves; bulk changes require MANAGE_USERS. 

199 Returns list of user IDs newly added. 

200 """ 

201 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc) 

202 added_ids = [] 

203 for user in issue.respondent.users: 

204 if user.id in user_ids and user not in issue.watchers: 

205 issue.watch_list.append(IssueWatchList(user=user)) 

206 added_ids.append(user.id) 

207 return added_ids 

208 

209 

210@http 

211def delete_issue_watchers( 

212 session: Session, effective_user: User, issue_id: int, watchers_doc: dict 

213) -> List[int]: 

214 """ 

215 Remove users from an Issue's watch list. A user can remove themselves; bulk changes require MANAGE_USERS. 

216 Returns list of user IDs removed. 

217 """ 

218 user_ids, issue = auth_wl_change(session, effective_user, issue_id, watchers_doc) 

219 removed_ids = [] 

220 for user in issue.respondent.users: 

221 if user.id in user_ids: 

222 wl = issue.watch_list.filter_by(user_id=user.id).one() 

223 session.delete(wl) 

224 removed_ids.append(user.id) 

225 return removed_ids