Coverage for postrfp/auth/policy.py: 98%
94 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import abc
2import logging
3from contextlib import contextmanager
5import jwt
6import webob
7import webob.exc
8from sqlalchemy.orm.exc import NoResultFound
11import postrfp.conf
12from postrfp.shared import fetch
13from postrfp.shared.exceptions import NotLoggedIn
16log = logging.getLogger(__name__)
19class AbstractIdentityPolicy: # pragma: no cover
20 __metaclass__ = abc.ABCMeta
22 @abc.abstractmethod
23 def identify(self, request):
24 """Set remote_user(id) user(object) on the request"""
25 pass
27 @abc.abstractmethod
28 def remember(self, request, response):
29 """Remember the users identity for subsequent requests"""
30 pass
33def set_user_object(request):
34 user_id = request.remote_user
35 try:
36 if not user_id:
37 raise NotLoggedIn("User ID Not Found for Request")
38 request.user = fetch.user(request.session, user_id)
40 except NoResultFound as nre:
41 log.warning('User ID "%s" not in database ', user_id)
42 raise NotLoggedIn(f'User ID "{user_id}" not found ') from nre
45@contextmanager
46def jwt_exceptions():
47 """Convert JWT exceptions to HTTP error responses"""
48 try:
49 yield
50 except jwt.PyJWTError as jwt_err:
51 match jwt_err:
52 case jwt.ExpiredSignatureError():
53 raise webob.exc.HTTPForbidden(
54 "Authentication token has expired"
55 ) from jwt_err
56 case jwt.MissingRequiredClaimError():
57 raise webob.exc.HTTPBadRequest(f"'{jwt_err}'") from jwt_err
58 case jwt.InvalidAlgorithmError():
59 raise webob.exc.HTTPBadRequest(
60 "Invalid algorithm specified in JWT token"
61 ) from jwt_err
62 case jwt.DecodeError():
63 raise webob.exc.HTTPBadRequest(
64 f"Failed to decode JWT token: '{jwt_err}'"
65 ) from jwt_err
66 case jwt.InvalidTokenError():
67 raise webob.exc.HTTPBadRequest(
68 f"Invalid JWT token: '{jwt_err}'"
69 ) from jwt_err
70 case _:
71 raise jwt_err
74class JwtBearerPolicy(AbstractIdentityPolicy):
75 def identify(self, request: webob.Request):
76 if request.remote_user is not None:
77 msg = "REMOTE_USER illegally set to [%s] upstream"
78 raise webob.exc.HTTPBadRequest(msg % request.remote_user)
80 if request.authorization is None or request.authorization.authtype != "Bearer":
81 raise webob.exc.HTTPUnauthorized("Bearer Authentication token required")
83 token: str = request.authorization.params # type: ignore
84 jwt_doc = self._decode_jwt_token(token)
86 request.remote_user = jwt_doc["user_id"]
87 set_user_object(request)
89 def remember(self, request, response):
90 """Client is responsible for storing token"""
91 pass
93 def _decode_jwt_token(self, token: str) -> dict:
94 """
95 Helper method to decode JWT token and handle exceptions.
96 """
97 with jwt_exceptions():
98 decoded = jwt.decode(
99 token,
100 postrfp.conf.CONF.crypt_key,
101 algorithms=["HS256"],
102 options={"require": ["exp", "iat"]},
103 )
105 # Verify token type if present
106 # For backward compatibility, only check token_type if it exists
107 token_type = decoded.get("token_type")
108 if token_type is not None and token_type != "access":
109 raise webob.exc.HTTPForbidden(
110 f"Invalid token type: '{token_type}'. Expected 'access'"
111 )
113 return decoded
116class PassthroughPolicy(AbstractIdentityPolicy):
117 """
118 Identity Policies purpose is to identify authenticated
119 users for a given request. For login on we are dealing
120 with unauthenticated users, so this is implementation does
121 nothing.
122 """
124 def identify(self, request):
125 pass
127 def remember(self, request, response):
128 pass
131class DevHeaderPolicy(AbstractIdentityPolicy):
132 def __init__(self, default_user=None):
133 super().__init__()
134 self.default_user = default_user
135 if default_user:
136 log.warning(
137 "Initialised DevHeaderPolicy with default user '%s'", default_user
138 )
140 def identify(self, request):
141 try:
142 if "POSTRFP-TEST-USER" in request.headers:
143 user_id = request.headers["POSTRFP-TEST-USER"]
144 log.info(
145 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER HTTP Header",
146 user_id,
147 )
148 else:
149 user_id = request.environ["POSTRFP-TEST-USER"]
150 log.info(
151 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER wsgi Environ key",
152 user_id,
153 )
154 except KeyError:
155 if self.default_user is None:
156 raise webob.exc.HTTPUnauthorized(
157 "POSTRFP-TEST-USER header not set and no default"
158 )
159 user_id = self.default_user
160 log.info(
161 "Unsafely Authenticated user [%s] as default user for DevHeaderPolicy",
162 user_id,
163 )
164 request.remote_user = str(user_id)
165 set_user_object(request)
167 def remember(self, request, response):
168 pass
171class FallbackPolicy(AbstractIdentityPolicy):
172 def __init__(self, default_user=None):
173 self.default_user = None
174 self.header_policy = DevHeaderPolicy(default_user=default_user)
175 self.bearer_policy = JwtBearerPolicy()
177 def identify(self, request):
178 try:
179 self.bearer_policy.identify(request)
180 log.warning(
181 f"Authenticated user {request.remote_user} with JWT Bearer policy"
182 )
183 except webob.exc.HTTPUnauthorized:
184 self.header_policy.identify(request)
185 log.warning(
186 f"Authenticated user {request.remote_user} with HTTP Test Header policy"
187 )
189 def remember(self, request, response):
190 pass