Coverage for postrfp/auth/endpoints.py: 100%

152 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1""" 

2JWT-based Authentication Endpoints. 

3 

4Implements a standard token-based authentication flow: 

5- `post_login`: Validates credentials, returns a short-lived JWT access token in the response body 

6 and sets a long-lived, secure, HttpOnly refresh token cookie (`/api/auth` path). Handles account lockout logic. 

7- `post_refresh_token`: Accepts a valid refresh token cookie and returns a new JWT access token. 

8- `post_logout`: Revokes the refresh token(s) associated with the user and clears the refresh token cookie. 

9- `post_password_emailkey`: Sends a password reset email containing a short-lived JWT reset token. 

10- `post_password_update`: Updates the user's password using a valid JWT reset token. 

11 

12Access tokens are expected in the `Authorization: Bearer <token>` header for protected endpoints. 

13Refresh tokens are handled automatically by the browser via cookies. 

14""" 

15 

16from datetime import datetime, timezone, timedelta 

17 

18import jwt 

19import webob 

20import secrets 

21import webob.exc 

22from sqlalchemy import and_ 

23from sqlalchemy.orm import Session 

24from sqlalchemy.orm.exc import NoResultFound 

25 

26from postrfp.shared.decorators import http 

27from postrfp.shared import fetch 

28from postrfp import conf 

29from postrfp.model.audit import AuditEvent, evt_types 

30from postrfp.model.humans import RefreshToken, User 

31from postrfp.mail.schemas import PostmarkSimpleMessage 

32from postrfp.mail.postmark import api_headers, send_simple_email 

33from postrfp.shared.password import create_signature 

34from postrfp.shared.serial.authmodels import Login, ResetDoc, UserDoc 

35from postrfp.shared.serial.authmodels import Token 

36 

37 

38# TODO fix this to be configurable. The auth api path should not be hardcoded 

39AUTH_API_PATH = "/auth/api" 

40 

41 

42@http 

43def post_login(session: Session, login_doc: Login): 

44 """ 

45 Authenticate user, handle lockout, return access token in body, refresh token as cookie. 

46 """ 

47 try: 

48 user = fetch.user_by_password(session, login_doc.user_id, login_doc.password) 

49 now = datetime.now(timezone.utc) 

50 

51 _check_and_clear_lockout(session, user, now) 

52 _update_user_on_login(session, user, now) 

53 

54 access_token, access_expiry_minutes = _generate_access_token(user.id, now) 

55 refresh_token_value, refresh_max_age = _generate_and_store_refresh_token( 

56 session, user.id, now 

57 ) 

58 

59 return _create_login_response( 

60 access_token, refresh_token_value, access_expiry_minutes, refresh_max_age 

61 ) 

62 

63 except NoResultFound: 

64 _handle_failed_login(session, login_doc.user_id) 

65 # Reraise as HTTPForbidden regardless of whether user existed 

66 raise webob.exc.HTTPForbidden("No matching user found or invalid password") 

67 

68 

69def _lock_user_account(session: Session, user): 

70 """Locks the user account and creates an audit event.""" 

71 user.locked_until = datetime.now(timezone.utc) + timedelta( 

72 minutes=conf.CONF.lockout_duration_minutes 

73 ) 

74 user.failed_login_attempts = 0 # Reset counter after locking 

75 AuditEvent.create( 

76 session, 

77 evt_types.USER_ACCOUNT_LOCKED, 

78 user=user, 

79 ) 

80 # ToDo - send email to user about lockout 

81 

82 

83def _handle_failed_login(session: Session, user_id: str): 

84 """ 

85 Handle a failed login attempt: increment counters, log event, potentially lock account. 

86 """ 

87 try: 

88 failed_user = fetch.user(session, user_id) 

89 failed_user.failed_login_attempts += 1 

90 failed_user.total_failed_logins += 1 

91 

92 AuditEvent.create(session, evt_types.USER_LOGIN_FAILED, user=failed_user) 

93 

94 if failed_user.failed_login_attempts >= conf.CONF.max_failed_login_attempts: 

95 _lock_user_account(session, failed_user) 

96 

97 session.commit() # Commit changes for the found user 

98 

99 except NoResultFound: 

100 # If the user is not found, perform dummy hash to mitigate timing attacks. 

101 # No database writes occur in this case. 

102 fetch.dummy_hash(session) 

103 

104 

105RESET_MESSAGE = """ 

106A password reset has been requested for user ID <user_id> at <hostname>. 

107 

108To reset your password follow this link within 15 minutes: 

109 

110https://<hostname>/vue/#/reset?key=<key>&user=<user_id> 

111 

112If you did not request this reset, please ignore this email. 

113 

114""" 

115 

116# Define a constant for the password reset token purpose 

117PASSWORD_RESET_PURPOSE = "password_reset" 

118 

119 

120@http 

121def post_password_emailkey(session: Session, user_doc: UserDoc): 

122 """ 

123 Trigger a password reset email to be sent to the user with the provided user_id. 

124 

125 The generated email will provide a URL link containing a short-lived JWT 

126 for the user to reset their password. 

127 """ 

128 user = fetch.user(session, user_doc.user_id) 

129 if not user.email: 

130 raise webob.exc.HTTPBadRequest("User has no email address") 

131 

132 # Generate a short-lived JWT for password reset 

133 now = datetime.now(timezone.utc) 

134 reset_expiry_minutes = 15 # Make reset link valid for 15 minutes 

135 reset_expiry = now + timedelta(minutes=reset_expiry_minutes) 

136 

137 reset_payload = { 

138 "user_id": user.id, 

139 "exp": int(reset_expiry.timestamp()), 

140 "iat": int(now.timestamp()), 

141 "purpose": PASSWORD_RESET_PURPOSE, # Specific claim for reset token 

142 } 

143 reset_token = jwt.encode(reset_payload, conf.CONF.crypt_key, algorithm="HS256") 

144 

145 msg = ( 

146 RESET_MESSAGE.replace("<user_id>", user.id) 

147 .replace("<key>", reset_token) # Use JWT as the key 

148 .replace("<hostname>", conf.CONF.webapp_hostname) 

149 ) 

150 

151 model = PostmarkSimpleMessage( 

152 To=user.email, 

153 From=conf.CONF.email_from_address, # Use configured from address 

154 Subject=f"{conf.CONF.system_name} Password Reset Request", # Use configured system name 

155 TextBody=msg, 

156 Tag="PasswordReset", # Use a specific tag 

157 ) 

158 headers = api_headers(conf.CONF.postmark_api_key) 

159 send_simple_email(model, headers) 

160 

161 

162@http 

163def post_password_update(session: Session, reset_doc: ResetDoc): 

164 """ 

165 Reset the user's password using a JWT key value provided via email. 

166 """ 

167 try: 

168 # Decode and validate the JWT reset token 

169 payload = jwt.decode( 

170 reset_doc.key, 

171 conf.CONF.crypt_key, 

172 algorithms=["HS256"], 

173 # Verify audience if you add it during encoding 

174 # options={"verify_aud": True, "require": ["exp", "iat", "purpose"]} 

175 ) 

176 

177 if payload.get("purpose") != PASSWORD_RESET_PURPOSE: 

178 raise jwt.InvalidTokenError("Invalid token purpose") 

179 

180 # Check if the user ID in the token matches the one in the request 

181 token_user_id = payload.get("user_id") 

182 if not token_user_id or token_user_id != reset_doc.user_id: 

183 raise jwt.InvalidTokenError("User ID mismatch") 

184 

185 user = fetch.user(session, token_user_id) 

186 

187 user.password = create_signature(reset_doc.new_password) 

188 # Optionally, log password reset event 

189 AuditEvent.create(session, evt_types.USER_PASSWORD_RECOVERED, user=user) 

190 

191 except jwt.ExpiredSignatureError: 

192 raise webob.exc.HTTPForbidden("Password reset link has expired") 

193 except jwt.InvalidTokenError as e: 

194 raise webob.exc.HTTPForbidden(f"Invalid password reset link: {e}") 

195 except NoResultFound: 

196 # Should not happen if token validation passes, but handle defensively 

197 raise webob.exc.HTTPNotFound("User specified in reset token not found") 

198 

199 

200@http 

201def post_refresh_token(session: Session, request: webob.Request): 

202 """ 

203 Exchange a valid refresh token cookie for a new access token. 

204 """ 

205 refresh_token_value = request.cookies.get("refresh_token") 

206 

207 if not refresh_token_value: 

208 raise webob.exc.HTTPForbidden("No refresh token provided") 

209 

210 refresh_token = ( 

211 session.query(RefreshToken) 

212 .filter( 

213 and_( 

214 RefreshToken.token == refresh_token_value, 

215 RefreshToken.revoked == False, # noqa E712 

216 RefreshToken.expires_at > datetime.now(timezone.utc), 

217 ) 

218 ) 

219 .first() 

220 ) 

221 

222 response: webob.Response 

223 

224 if not refresh_token: 

225 response = webob.exc.HTTPForbidden("Invalid or expired refresh token") 

226 response.delete_cookie("refresh_token", path=AUTH_API_PATH) 

227 return response 

228 

229 user = fetch.user(session, refresh_token.user_id) 

230 

231 # Generate new access token 

232 now = datetime.now(timezone.utc) 

233 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes) 

234 access_expiry = now + timedelta(minutes=jwt_expiry_minutes) 

235 

236 auth_doc = { 

237 "user_id": user.id, 

238 "exp": int(access_expiry.timestamp()), 

239 "iat": int(now.timestamp()), 

240 "token_type": "access", 

241 } 

242 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256") 

243 

244 token = Token( 

245 access_token=access_token, 

246 token_type="Bearer", 

247 expires_in=jwt_expiry_minutes * 60, 

248 ) 

249 

250 response = webob.Response( 

251 token.model_dump_json(), # Use model_dump_json() instead of json.dumps() 

252 content_type="application/json", 

253 charset="utf-8", 

254 ) 

255 

256 return response 

257 

258 

259@http 

260def post_logout(session: Session, request: webob.Request): 

261 """ 

262 Logout by revoking the refresh token and clearing the associated http cookie. 

263 """ 

264 refresh_token_value = request.cookies.get("refresh_token") 

265 

266 if not refresh_token_value: 

267 # No refresh token to revoke, but still return success 

268 result = {"result": "ok"} 

269 response = webob.Response( 

270 json_body=result, 

271 content_type="application/json", 

272 charset="utf-8", 

273 ) 

274 response.delete_cookie("refresh_token", path=AUTH_API_PATH) 

275 return response 

276 

277 # Find and revoke the refresh token, getting user_id from it 

278 refresh_token = ( 

279 session.query(RefreshToken) 

280 .filter(RefreshToken.token == refresh_token_value) 

281 .first() 

282 ) 

283 

284 if refresh_token: 

285 # Revoke this specific token 

286 refresh_token.revoked = True 

287 

288 # Create audit event using the user from the token 

289 try: 

290 user = fetch.user(session, refresh_token.user_id) 

291 AuditEvent.create(session, evt_types.USER_LOGGED_OUT, user=user) 

292 except NoResultFound: 

293 # User was deleted but token still exists - just revoke token 

294 pass 

295 

296 result = {"result": "ok"} 

297 response = webob.Response( 

298 json_body=result, 

299 content_type="application/json", 

300 charset="utf-8", 

301 ) 

302 

303 response.delete_cookie("refresh_token", path=AUTH_API_PATH) 

304 return response 

305 

306 

307def _check_and_clear_lockout(session: Session, user: User, now: datetime): 

308 """Checks if user is locked out, clears if expired, raises if still locked.""" 

309 if user.locked_until is not None: 

310 if user.locked_until.tzinfo is None: 

311 user_locked_until = user.locked_until.replace(tzinfo=timezone.utc) 

312 else: 

313 user_locked_until = user.locked_until 

314 

315 if user_locked_until > now: 

316 raise webob.exc.HTTPForbidden("Account is locked") 

317 else: 

318 # Lockout expired 

319 user.locked_until = None 

320 AuditEvent.create( 

321 session, 

322 evt_types.USER_ACCOUNT_UNLOCKED, 

323 user=user, 

324 ) 

325 

326 

327def _generate_access_token(user_id: str, now: datetime) -> tuple[str, int]: 

328 """Generates JWT access token string and expiry minutes.""" 

329 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes) 

330 access_expiry = now + timedelta(minutes=jwt_expiry_minutes) 

331 auth_doc = { 

332 "user_id": user_id, 

333 "exp": int(access_expiry.timestamp()), 

334 "iat": int(now.timestamp()), 

335 "token_type": "access", 

336 } 

337 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256") 

338 return access_token, jwt_expiry_minutes 

339 

340 

341def _generate_and_store_refresh_token( 

342 session: Session, user_id: str, now: datetime 

343) -> tuple[str, int]: 

344 """Generates, stores, and returns refresh token value and max_age seconds.""" 

345 refresh_expiry = now + timedelta(hours=conf.CONF.jwt_refresh_token_expiry_hours) 

346 refresh_token_value = secrets.token_urlsafe(32) 

347 max_age = int(conf.CONF.jwt_refresh_token_expiry_hours * 3600) 

348 

349 # Store refresh token in database 

350 refresh_token_record = RefreshToken( 

351 user_id=user_id, 

352 token=refresh_token_value, 

353 issued_at=now, 

354 expires_at=refresh_expiry, 

355 revoked=False, 

356 ) 

357 session.add(refresh_token_record) 

358 return refresh_token_value, max_age 

359 

360 

361def _update_user_on_login(session: Session, user, now: datetime): 

362 """Updates user state and logs audit event on successful login.""" 

363 # Reset failed attempts count on successful login 

364 if user.failed_login_attempts > 0: 

365 user.failed_login_attempts = 0 

366 

367 if hasattr(user, "previous_login_date"): 

368 user.previous_login_date = now 

369 

370 AuditEvent.create(session, evt_types.USER_LOGGED_IN, user=user) 

371 

372 

373def _create_login_response( 

374 access_token: str, 

375 refresh_token_value: str, 

376 access_expiry_minutes: int, 

377 refresh_max_age: int, 

378): 

379 """Creates the webob.Response for a successful login.""" 

380 token_payload = Token( 

381 access_token=access_token, 

382 token_type="Bearer", 

383 expires_in=access_expiry_minutes * 60, 

384 ) 

385 

386 response = webob.Response( 

387 token_payload.model_dump_json(), 

388 content_type="application/json", 

389 charset="utf-8", 

390 ) 

391 

392 # NB - HttpOnly 

393 response.set_cookie( 

394 "refresh_token", 

395 refresh_token_value, 

396 max_age=refresh_max_age, 

397 path=AUTH_API_PATH, 

398 httponly=True, 

399 secure=True, 

400 samesite="Lax", 

401 ) 

402 return response