Coverage for postrfp/auth/endpoints.py: 100%
152 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1"""
2JWT-based Authentication Endpoints.
4Implements a standard token-based authentication flow:
5- `post_login`: Validates credentials, returns a short-lived JWT access token in the response body
6 and sets a long-lived, secure, HttpOnly refresh token cookie (`/api/auth` path). Handles account lockout logic.
7- `post_refresh_token`: Accepts a valid refresh token cookie and returns a new JWT access token.
8- `post_logout`: Revokes the refresh token(s) associated with the user and clears the refresh token cookie.
9- `post_password_emailkey`: Sends a password reset email containing a short-lived JWT reset token.
10- `post_password_update`: Updates the user's password using a valid JWT reset token.
12Access tokens are expected in the `Authorization: Bearer <token>` header for protected endpoints.
13Refresh tokens are handled automatically by the browser via cookies.
14"""
16from datetime import datetime, timezone, timedelta
18import jwt
19import webob
20import secrets
21import webob.exc
22from sqlalchemy import and_
23from sqlalchemy.orm import Session
24from sqlalchemy.orm.exc import NoResultFound
26from postrfp.shared.decorators import http
27from postrfp.shared import fetch
28from postrfp import conf
29from postrfp.model.audit import AuditEvent, evt_types
30from postrfp.model.humans import RefreshToken, User
31from postrfp.mail.schemas import PostmarkSimpleMessage
32from postrfp.mail.postmark import api_headers, send_simple_email
33from postrfp.shared.password import create_signature
34from postrfp.shared.serial.authmodels import Login, ResetDoc, UserDoc
35from postrfp.shared.serial.authmodels import Token
38# TODO fix this to be configurable. The auth api path should not be hardcoded
39AUTH_API_PATH = "/auth/api"
42@http
43def post_login(session: Session, login_doc: Login):
44 """
45 Authenticate user, handle lockout, return access token in body, refresh token as cookie.
46 """
47 try:
48 user = fetch.user_by_password(session, login_doc.user_id, login_doc.password)
49 now = datetime.now(timezone.utc)
51 _check_and_clear_lockout(session, user, now)
52 _update_user_on_login(session, user, now)
54 access_token, access_expiry_minutes = _generate_access_token(user.id, now)
55 refresh_token_value, refresh_max_age = _generate_and_store_refresh_token(
56 session, user.id, now
57 )
59 return _create_login_response(
60 access_token, refresh_token_value, access_expiry_minutes, refresh_max_age
61 )
63 except NoResultFound:
64 _handle_failed_login(session, login_doc.user_id)
65 # Reraise as HTTPForbidden regardless of whether user existed
66 raise webob.exc.HTTPForbidden("No matching user found or invalid password")
69def _lock_user_account(session: Session, user):
70 """Locks the user account and creates an audit event."""
71 user.locked_until = datetime.now(timezone.utc) + timedelta(
72 minutes=conf.CONF.lockout_duration_minutes
73 )
74 user.failed_login_attempts = 0 # Reset counter after locking
75 AuditEvent.create(
76 session,
77 evt_types.USER_ACCOUNT_LOCKED,
78 user=user,
79 )
80 # ToDo - send email to user about lockout
83def _handle_failed_login(session: Session, user_id: str):
84 """
85 Handle a failed login attempt: increment counters, log event, potentially lock account.
86 """
87 try:
88 failed_user = fetch.user(session, user_id)
89 failed_user.failed_login_attempts += 1
90 failed_user.total_failed_logins += 1
92 AuditEvent.create(session, evt_types.USER_LOGIN_FAILED, user=failed_user)
94 if failed_user.failed_login_attempts >= conf.CONF.max_failed_login_attempts:
95 _lock_user_account(session, failed_user)
97 session.commit() # Commit changes for the found user
99 except NoResultFound:
100 # If the user is not found, perform dummy hash to mitigate timing attacks.
101 # No database writes occur in this case.
102 fetch.dummy_hash(session)
105RESET_MESSAGE = """
106A password reset has been requested for user ID <user_id> at <hostname>.
108To reset your password follow this link within 15 minutes:
110https://<hostname>/vue/#/reset?key=<key>&user=<user_id>
112If you did not request this reset, please ignore this email.
114"""
116# Define a constant for the password reset token purpose
117PASSWORD_RESET_PURPOSE = "password_reset"
120@http
121def post_password_emailkey(session: Session, user_doc: UserDoc):
122 """
123 Trigger a password reset email to be sent to the user with the provided user_id.
125 The generated email will provide a URL link containing a short-lived JWT
126 for the user to reset their password.
127 """
128 user = fetch.user(session, user_doc.user_id)
129 if not user.email:
130 raise webob.exc.HTTPBadRequest("User has no email address")
132 # Generate a short-lived JWT for password reset
133 now = datetime.now(timezone.utc)
134 reset_expiry_minutes = 15 # Make reset link valid for 15 minutes
135 reset_expiry = now + timedelta(minutes=reset_expiry_minutes)
137 reset_payload = {
138 "user_id": user.id,
139 "exp": int(reset_expiry.timestamp()),
140 "iat": int(now.timestamp()),
141 "purpose": PASSWORD_RESET_PURPOSE, # Specific claim for reset token
142 }
143 reset_token = jwt.encode(reset_payload, conf.CONF.crypt_key, algorithm="HS256")
145 msg = (
146 RESET_MESSAGE.replace("<user_id>", user.id)
147 .replace("<key>", reset_token) # Use JWT as the key
148 .replace("<hostname>", conf.CONF.webapp_hostname)
149 )
151 model = PostmarkSimpleMessage(
152 To=user.email,
153 From=conf.CONF.email_from_address, # Use configured from address
154 Subject=f"{conf.CONF.system_name} Password Reset Request", # Use configured system name
155 TextBody=msg,
156 Tag="PasswordReset", # Use a specific tag
157 )
158 headers = api_headers(conf.CONF.postmark_api_key)
159 send_simple_email(model, headers)
162@http
163def post_password_update(session: Session, reset_doc: ResetDoc):
164 """
165 Reset the user's password using a JWT key value provided via email.
166 """
167 try:
168 # Decode and validate the JWT reset token
169 payload = jwt.decode(
170 reset_doc.key,
171 conf.CONF.crypt_key,
172 algorithms=["HS256"],
173 # Verify audience if you add it during encoding
174 # options={"verify_aud": True, "require": ["exp", "iat", "purpose"]}
175 )
177 if payload.get("purpose") != PASSWORD_RESET_PURPOSE:
178 raise jwt.InvalidTokenError("Invalid token purpose")
180 # Check if the user ID in the token matches the one in the request
181 token_user_id = payload.get("user_id")
182 if not token_user_id or token_user_id != reset_doc.user_id:
183 raise jwt.InvalidTokenError("User ID mismatch")
185 user = fetch.user(session, token_user_id)
187 user.password = create_signature(reset_doc.new_password)
188 # Optionally, log password reset event
189 AuditEvent.create(session, evt_types.USER_PASSWORD_RECOVERED, user=user)
191 except jwt.ExpiredSignatureError:
192 raise webob.exc.HTTPForbidden("Password reset link has expired")
193 except jwt.InvalidTokenError as e:
194 raise webob.exc.HTTPForbidden(f"Invalid password reset link: {e}")
195 except NoResultFound:
196 # Should not happen if token validation passes, but handle defensively
197 raise webob.exc.HTTPNotFound("User specified in reset token not found")
200@http
201def post_refresh_token(session: Session, request: webob.Request):
202 """
203 Exchange a valid refresh token cookie for a new access token.
204 """
205 refresh_token_value = request.cookies.get("refresh_token")
207 if not refresh_token_value:
208 raise webob.exc.HTTPForbidden("No refresh token provided")
210 refresh_token = (
211 session.query(RefreshToken)
212 .filter(
213 and_(
214 RefreshToken.token == refresh_token_value,
215 RefreshToken.revoked == False, # noqa E712
216 RefreshToken.expires_at > datetime.now(timezone.utc),
217 )
218 )
219 .first()
220 )
222 response: webob.Response
224 if not refresh_token:
225 response = webob.exc.HTTPForbidden("Invalid or expired refresh token")
226 response.delete_cookie("refresh_token", path=AUTH_API_PATH)
227 return response
229 user = fetch.user(session, refresh_token.user_id)
231 # Generate new access token
232 now = datetime.now(timezone.utc)
233 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes)
234 access_expiry = now + timedelta(minutes=jwt_expiry_minutes)
236 auth_doc = {
237 "user_id": user.id,
238 "exp": int(access_expiry.timestamp()),
239 "iat": int(now.timestamp()),
240 "token_type": "access",
241 }
242 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256")
244 token = Token(
245 access_token=access_token,
246 token_type="Bearer",
247 expires_in=jwt_expiry_minutes * 60,
248 )
250 response = webob.Response(
251 token.model_dump_json(), # Use model_dump_json() instead of json.dumps()
252 content_type="application/json",
253 charset="utf-8",
254 )
256 return response
259@http
260def post_logout(session: Session, request: webob.Request):
261 """
262 Logout by revoking the refresh token and clearing the associated http cookie.
263 """
264 refresh_token_value = request.cookies.get("refresh_token")
266 if not refresh_token_value:
267 # No refresh token to revoke, but still return success
268 result = {"result": "ok"}
269 response = webob.Response(
270 json_body=result,
271 content_type="application/json",
272 charset="utf-8",
273 )
274 response.delete_cookie("refresh_token", path=AUTH_API_PATH)
275 return response
277 # Find and revoke the refresh token, getting user_id from it
278 refresh_token = (
279 session.query(RefreshToken)
280 .filter(RefreshToken.token == refresh_token_value)
281 .first()
282 )
284 if refresh_token:
285 # Revoke this specific token
286 refresh_token.revoked = True
288 # Create audit event using the user from the token
289 try:
290 user = fetch.user(session, refresh_token.user_id)
291 AuditEvent.create(session, evt_types.USER_LOGGED_OUT, user=user)
292 except NoResultFound:
293 # User was deleted but token still exists - just revoke token
294 pass
296 result = {"result": "ok"}
297 response = webob.Response(
298 json_body=result,
299 content_type="application/json",
300 charset="utf-8",
301 )
303 response.delete_cookie("refresh_token", path=AUTH_API_PATH)
304 return response
307def _check_and_clear_lockout(session: Session, user: User, now: datetime):
308 """Checks if user is locked out, clears if expired, raises if still locked."""
309 if user.locked_until is not None:
310 if user.locked_until.tzinfo is None:
311 user_locked_until = user.locked_until.replace(tzinfo=timezone.utc)
312 else:
313 user_locked_until = user.locked_until
315 if user_locked_until > now:
316 raise webob.exc.HTTPForbidden("Account is locked")
317 else:
318 # Lockout expired
319 user.locked_until = None
320 AuditEvent.create(
321 session,
322 evt_types.USER_ACCOUNT_UNLOCKED,
323 user=user,
324 )
327def _generate_access_token(user_id: str, now: datetime) -> tuple[str, int]:
328 """Generates JWT access token string and expiry minutes."""
329 jwt_expiry_minutes = max(5, conf.CONF.jwt_expiry_minutes)
330 access_expiry = now + timedelta(minutes=jwt_expiry_minutes)
331 auth_doc = {
332 "user_id": user_id,
333 "exp": int(access_expiry.timestamp()),
334 "iat": int(now.timestamp()),
335 "token_type": "access",
336 }
337 access_token = jwt.encode(auth_doc, conf.CONF.crypt_key, algorithm="HS256")
338 return access_token, jwt_expiry_minutes
341def _generate_and_store_refresh_token(
342 session: Session, user_id: str, now: datetime
343) -> tuple[str, int]:
344 """Generates, stores, and returns refresh token value and max_age seconds."""
345 refresh_expiry = now + timedelta(hours=conf.CONF.jwt_refresh_token_expiry_hours)
346 refresh_token_value = secrets.token_urlsafe(32)
347 max_age = int(conf.CONF.jwt_refresh_token_expiry_hours * 3600)
349 # Store refresh token in database
350 refresh_token_record = RefreshToken(
351 user_id=user_id,
352 token=refresh_token_value,
353 issued_at=now,
354 expires_at=refresh_expiry,
355 revoked=False,
356 )
357 session.add(refresh_token_record)
358 return refresh_token_value, max_age
361def _update_user_on_login(session: Session, user, now: datetime):
362 """Updates user state and logs audit event on successful login."""
363 # Reset failed attempts count on successful login
364 if user.failed_login_attempts > 0:
365 user.failed_login_attempts = 0
367 if hasattr(user, "previous_login_date"):
368 user.previous_login_date = now
370 AuditEvent.create(session, evt_types.USER_LOGGED_IN, user=user)
373def _create_login_response(
374 access_token: str,
375 refresh_token_value: str,
376 access_expiry_minutes: int,
377 refresh_max_age: int,
378):
379 """Creates the webob.Response for a successful login."""
380 token_payload = Token(
381 access_token=access_token,
382 token_type="Bearer",
383 expires_in=access_expiry_minutes * 60,
384 )
386 response = webob.Response(
387 token_payload.model_dump_json(),
388 content_type="application/json",
389 charset="utf-8",
390 )
392 # NB - HttpOnly
393 response.set_cookie(
394 "refresh_token",
395 refresh_token_value,
396 max_age=refresh_max_age,
397 path=AUTH_API_PATH,
398 httponly=True,
399 secure=True,
400 samesite="Lax",
401 )
402 return response