Coverage for postrfp/auth/policy.py: 98%

94 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import abc 

2import logging 

3from contextlib import contextmanager 

4 

5import jwt 

6import webob 

7import webob.exc 

8from sqlalchemy.orm.exc import NoResultFound 

9 

10 

11import postrfp.conf 

12from postrfp.shared import fetch 

13from postrfp.shared.exceptions import NotLoggedIn 

14 

15 

16log = logging.getLogger(__name__) 

17 

18 

19class AbstractIdentityPolicy: # pragma: no cover 

20 __metaclass__ = abc.ABCMeta 

21 

22 @abc.abstractmethod 

23 def identify(self, request): 

24 """Set remote_user(id) user(object) on the request""" 

25 pass 

26 

27 @abc.abstractmethod 

28 def remember(self, request, response): 

29 """Remember the users identity for subsequent requests""" 

30 pass 

31 

32 

33def set_user_object(request): 

34 user_id = request.remote_user 

35 try: 

36 if not user_id: 

37 raise NotLoggedIn("User ID Not Found for Request") 

38 request.user = fetch.user(request.session, user_id) 

39 

40 except NoResultFound as nre: 

41 log.warning('User ID "%s" not in database ', user_id) 

42 raise NotLoggedIn(f'User ID "{user_id}" not found ') from nre 

43 

44 

45@contextmanager 

46def jwt_exceptions(): 

47 """Convert JWT exceptions to HTTP error responses""" 

48 try: 

49 yield 

50 except jwt.PyJWTError as jwt_err: 

51 match jwt_err: 

52 case jwt.ExpiredSignatureError(): 

53 raise webob.exc.HTTPForbidden( 

54 "Authentication token has expired" 

55 ) from jwt_err 

56 case jwt.MissingRequiredClaimError(): 

57 raise webob.exc.HTTPBadRequest(f"'{jwt_err}'") from jwt_err 

58 case jwt.InvalidAlgorithmError(): 

59 raise webob.exc.HTTPBadRequest( 

60 "Invalid algorithm specified in JWT token" 

61 ) from jwt_err 

62 case jwt.DecodeError(): 

63 raise webob.exc.HTTPBadRequest( 

64 f"Failed to decode JWT token: '{jwt_err}'" 

65 ) from jwt_err 

66 case jwt.InvalidTokenError(): 

67 raise webob.exc.HTTPBadRequest( 

68 f"Invalid JWT token: '{jwt_err}'" 

69 ) from jwt_err 

70 case _: 

71 raise jwt_err 

72 

73 

74class JwtBearerPolicy(AbstractIdentityPolicy): 

75 def identify(self, request: webob.Request): 

76 if request.remote_user is not None: 

77 msg = "REMOTE_USER illegally set to [%s] upstream" 

78 raise webob.exc.HTTPBadRequest(msg % request.remote_user) 

79 

80 if request.authorization is None or request.authorization.authtype != "Bearer": 

81 raise webob.exc.HTTPUnauthorized("Bearer Authentication token required") 

82 

83 token: str = request.authorization.params # type: ignore 

84 jwt_doc = self._decode_jwt_token(token) 

85 

86 request.remote_user = jwt_doc["user_id"] 

87 set_user_object(request) 

88 

89 def remember(self, request, response): 

90 """Client is responsible for storing token""" 

91 pass 

92 

93 def _decode_jwt_token(self, token: str) -> dict: 

94 """ 

95 Helper method to decode JWT token and handle exceptions. 

96 """ 

97 with jwt_exceptions(): 

98 decoded = jwt.decode( 

99 token, 

100 postrfp.conf.CONF.crypt_key, 

101 algorithms=["HS256"], 

102 options={"require": ["exp", "iat"]}, 

103 ) 

104 

105 # Verify token type if present 

106 # For backward compatibility, only check token_type if it exists 

107 token_type = decoded.get("token_type") 

108 if token_type is not None and token_type != "access": 

109 raise webob.exc.HTTPForbidden( 

110 f"Invalid token type: '{token_type}'. Expected 'access'" 

111 ) 

112 

113 return decoded 

114 

115 

116class PassthroughPolicy(AbstractIdentityPolicy): 

117 """ 

118 Identity Policies purpose is to identify authenticated 

119 users for a given request. For login on we are dealing 

120 with unauthenticated users, so this is implementation does 

121 nothing. 

122 """ 

123 

124 def identify(self, request): 

125 pass 

126 

127 def remember(self, request, response): 

128 pass 

129 

130 

131class DevHeaderPolicy(AbstractIdentityPolicy): 

132 def __init__(self, default_user=None): 

133 super().__init__() 

134 self.default_user = default_user 

135 if default_user: 

136 log.warning( 

137 "Initialised DevHeaderPolicy with default user '%s'", default_user 

138 ) 

139 

140 def identify(self, request): 

141 try: 

142 if "POSTRFP-TEST-USER" in request.headers: 

143 user_id = request.headers["POSTRFP-TEST-USER"] 

144 log.info( 

145 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER HTTP Header", 

146 user_id, 

147 ) 

148 else: 

149 user_id = request.environ["POSTRFP-TEST-USER"] 

150 log.info( 

151 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER wsgi Environ key", 

152 user_id, 

153 ) 

154 except KeyError: 

155 if self.default_user is None: 

156 raise webob.exc.HTTPUnauthorized( 

157 "POSTRFP-TEST-USER header not set and no default" 

158 ) 

159 user_id = self.default_user 

160 log.info( 

161 "Unsafely Authenticated user [%s] as default user for DevHeaderPolicy", 

162 user_id, 

163 ) 

164 request.remote_user = str(user_id) 

165 set_user_object(request) 

166 

167 def remember(self, request, response): 

168 pass 

169 

170 

171class FallbackPolicy(AbstractIdentityPolicy): 

172 def __init__(self, default_user=None): 

173 self.default_user = None 

174 self.header_policy = DevHeaderPolicy(default_user=default_user) 

175 self.bearer_policy = JwtBearerPolicy() 

176 

177 def identify(self, request): 

178 try: 

179 self.bearer_policy.identify(request) 

180 log.warning( 

181 f"Authenticated user {request.remote_user} with JWT Bearer policy" 

182 ) 

183 except webob.exc.HTTPUnauthorized: 

184 self.header_policy.identify(request) 

185 log.warning( 

186 f"Authenticated user {request.remote_user} with HTTP Test Header policy" 

187 ) 

188 

189 def remember(self, request, response): 

190 pass