Coverage for postrfp / auth / policy.py: 98%
91 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1import abc
2import logging
3from contextlib import contextmanager
5import jwt
6import webob.exc
7from webob.request import Request
10import postrfp.conf
12from postrfp.shared.exceptions import NotLoggedIn
15log = logging.getLogger(__name__)
18class AbstractIdentityPolicy: # pragma: no cover
19 __metaclass__ = abc.ABCMeta
21 @abc.abstractmethod
22 def identify(self, request):
23 """Set remote_user(id) user(object) on the request"""
24 pass
26 @abc.abstractmethod
27 def remember(self, request, response):
28 """Remember the users identity for subsequent requests"""
29 pass
32def set_user_object(request):
33 from postrfp.model.humans import User
35 user_id = request.remote_user
36 if not user_id:
37 raise NotLoggedIn("User ID Not Found for Request")
38 request.user = request.session.get(User, user_id)
39 if request.user is None:
40 raise NotLoggedIn(f'User ID "{user_id}" not found in database ')
43@contextmanager
44def jwt_exceptions():
45 """Convert JWT exceptions to HTTP error responses"""
46 try:
47 yield
48 except jwt.PyJWTError as jwt_err:
49 match jwt_err:
50 case jwt.ExpiredSignatureError():
51 raise webob.exc.HTTPForbidden(
52 "Authentication token has expired"
53 ) from jwt_err
54 case jwt.MissingRequiredClaimError():
55 raise webob.exc.HTTPBadRequest(f"'{jwt_err}'") from jwt_err
56 case jwt.InvalidAlgorithmError():
57 raise webob.exc.HTTPBadRequest(
58 "Invalid algorithm specified in JWT token"
59 ) from jwt_err
60 case jwt.DecodeError():
61 raise webob.exc.HTTPBadRequest(
62 f"Failed to decode JWT token: '{jwt_err}'"
63 ) from jwt_err
64 case jwt.InvalidTokenError():
65 raise webob.exc.HTTPBadRequest(
66 f"Invalid JWT token: '{jwt_err}'"
67 ) from jwt_err
68 case _:
69 raise jwt_err
72class JwtBearerPolicy(AbstractIdentityPolicy):
73 def identify(self, request: Request) -> None:
74 if request.remote_user is not None:
75 msg = "REMOTE_USER illegally set to [%s] upstream"
76 raise webob.exc.HTTPBadRequest(msg % request.remote_user)
78 if request.authorization is None or request.authorization.authtype != "Bearer":
79 raise webob.exc.HTTPUnauthorized("Bearer Authentication token required")
81 token: str = request.authorization.params # type: ignore
82 jwt_doc = self._decode_jwt_token(token)
84 request.remote_user = jwt_doc["user_id"]
85 set_user_object(request)
87 def remember(self, request, response):
88 """Client is responsible for storing token"""
89 pass
91 def _decode_jwt_token(self, token: str) -> dict:
92 """
93 Helper method to decode JWT token and handle exceptions.
94 """
95 with jwt_exceptions():
96 decoded = jwt.decode(
97 token,
98 postrfp.conf.CONF.crypt_key,
99 algorithms=["HS256"],
100 options={"require": ["exp", "iat"]},
101 )
103 # Verify token type if present
104 # For backward compatibility, only check token_type if it exists
105 token_type = decoded.get("token_type")
106 if token_type is not None and token_type != "access":
107 raise webob.exc.HTTPForbidden(
108 f"Invalid token type: '{token_type}'. Expected 'access'"
109 )
111 return decoded
114class PassthroughPolicy(AbstractIdentityPolicy):
115 """
116 Identity Policies purpose is to identify authenticated
117 users for a given request. For login on we are dealing
118 with unauthenticated users, so this is implementation does
119 nothing.
120 """
122 def identify(self, request):
123 pass
125 def remember(self, request, response):
126 pass
129class DevHeaderPolicy(AbstractIdentityPolicy):
130 def __init__(self, default_user=None):
131 super().__init__()
132 self.default_user = default_user
133 if default_user:
134 log.warning(
135 "Initialised DevHeaderPolicy with default user '%s'", default_user
136 )
138 def identify(self, request):
139 try:
140 if "POSTRFP-TEST-USER" in request.headers:
141 user_id = request.headers["POSTRFP-TEST-USER"]
142 log.info(
143 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER HTTP Header",
144 user_id,
145 )
146 else:
147 user_id = request.environ["POSTRFP-TEST-USER"]
148 log.info(
149 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER wsgi Environ key",
150 user_id,
151 )
152 except KeyError:
153 if self.default_user is None:
154 raise webob.exc.HTTPUnauthorized(
155 "POSTRFP-TEST-USER header not set and no default"
156 )
157 user_id = self.default_user
158 log.info(
159 "Unsafely Authenticated user [%s] as default user for DevHeaderPolicy",
160 user_id,
161 )
162 request.remote_user = str(user_id)
163 set_user_object(request)
165 def remember(self, request, response):
166 pass
169class FallbackPolicy(AbstractIdentityPolicy):
170 def __init__(self, default_user=None):
171 self.default_user = None
172 self.header_policy = DevHeaderPolicy(default_user=default_user)
173 self.bearer_policy = JwtBearerPolicy()
175 def identify(self, request):
176 try:
177 self.bearer_policy.identify(request)
178 log.warning(
179 f"Authenticated user {request.remote_user} with JWT Bearer policy"
180 )
181 except webob.exc.HTTPUnauthorized:
182 self.header_policy.identify(request)
183 log.warning(
184 f"Authenticated user {request.remote_user} with HTTP Test Header policy"
185 )
187 def remember(self, request, response):
188 pass