Coverage for postrfp / auth / policy.py: 98%

91 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1import abc 

2import logging 

3from contextlib import contextmanager 

4 

5import jwt 

6import webob.exc 

7from webob.request import Request 

8 

9 

10import postrfp.conf 

11 

12from postrfp.shared.exceptions import NotLoggedIn 

13 

14 

15log = logging.getLogger(__name__) 

16 

17 

18class AbstractIdentityPolicy: # pragma: no cover 

19 __metaclass__ = abc.ABCMeta 

20 

21 @abc.abstractmethod 

22 def identify(self, request): 

23 """Set remote_user(id) user(object) on the request""" 

24 pass 

25 

26 @abc.abstractmethod 

27 def remember(self, request, response): 

28 """Remember the users identity for subsequent requests""" 

29 pass 

30 

31 

32def set_user_object(request): 

33 from postrfp.model.humans import User 

34 

35 user_id = request.remote_user 

36 if not user_id: 

37 raise NotLoggedIn("User ID Not Found for Request") 

38 request.user = request.session.get(User, user_id) 

39 if request.user is None: 

40 raise NotLoggedIn(f'User ID "{user_id}" not found in database ') 

41 

42 

43@contextmanager 

44def jwt_exceptions(): 

45 """Convert JWT exceptions to HTTP error responses""" 

46 try: 

47 yield 

48 except jwt.PyJWTError as jwt_err: 

49 match jwt_err: 

50 case jwt.ExpiredSignatureError(): 

51 raise webob.exc.HTTPForbidden( 

52 "Authentication token has expired" 

53 ) from jwt_err 

54 case jwt.MissingRequiredClaimError(): 

55 raise webob.exc.HTTPBadRequest(f"'{jwt_err}'") from jwt_err 

56 case jwt.InvalidAlgorithmError(): 

57 raise webob.exc.HTTPBadRequest( 

58 "Invalid algorithm specified in JWT token" 

59 ) from jwt_err 

60 case jwt.DecodeError(): 

61 raise webob.exc.HTTPBadRequest( 

62 f"Failed to decode JWT token: '{jwt_err}'" 

63 ) from jwt_err 

64 case jwt.InvalidTokenError(): 

65 raise webob.exc.HTTPBadRequest( 

66 f"Invalid JWT token: '{jwt_err}'" 

67 ) from jwt_err 

68 case _: 

69 raise jwt_err 

70 

71 

72class JwtBearerPolicy(AbstractIdentityPolicy): 

73 def identify(self, request: Request) -> None: 

74 if request.remote_user is not None: 

75 msg = "REMOTE_USER illegally set to [%s] upstream" 

76 raise webob.exc.HTTPBadRequest(msg % request.remote_user) 

77 

78 if request.authorization is None or request.authorization.authtype != "Bearer": 

79 raise webob.exc.HTTPUnauthorized("Bearer Authentication token required") 

80 

81 token: str = request.authorization.params # type: ignore 

82 jwt_doc = self._decode_jwt_token(token) 

83 

84 request.remote_user = jwt_doc["user_id"] 

85 set_user_object(request) 

86 

87 def remember(self, request, response): 

88 """Client is responsible for storing token""" 

89 pass 

90 

91 def _decode_jwt_token(self, token: str) -> dict: 

92 """ 

93 Helper method to decode JWT token and handle exceptions. 

94 """ 

95 with jwt_exceptions(): 

96 decoded = jwt.decode( 

97 token, 

98 postrfp.conf.CONF.crypt_key, 

99 algorithms=["HS256"], 

100 options={"require": ["exp", "iat"]}, 

101 ) 

102 

103 # Verify token type if present 

104 # For backward compatibility, only check token_type if it exists 

105 token_type = decoded.get("token_type") 

106 if token_type is not None and token_type != "access": 

107 raise webob.exc.HTTPForbidden( 

108 f"Invalid token type: '{token_type}'. Expected 'access'" 

109 ) 

110 

111 return decoded 

112 

113 

114class PassthroughPolicy(AbstractIdentityPolicy): 

115 """ 

116 Identity Policies purpose is to identify authenticated 

117 users for a given request. For login on we are dealing 

118 with unauthenticated users, so this is implementation does 

119 nothing. 

120 """ 

121 

122 def identify(self, request): 

123 pass 

124 

125 def remember(self, request, response): 

126 pass 

127 

128 

129class DevHeaderPolicy(AbstractIdentityPolicy): 

130 def __init__(self, default_user=None): 

131 super().__init__() 

132 self.default_user = default_user 

133 if default_user: 

134 log.warning( 

135 "Initialised DevHeaderPolicy with default user '%s'", default_user 

136 ) 

137 

138 def identify(self, request): 

139 try: 

140 if "POSTRFP-TEST-USER" in request.headers: 

141 user_id = request.headers["POSTRFP-TEST-USER"] 

142 log.info( 

143 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER HTTP Header", 

144 user_id, 

145 ) 

146 else: 

147 user_id = request.environ["POSTRFP-TEST-USER"] 

148 log.info( 

149 "Unsafely Authenticated user [%s] via POSTRFP-TEST-USER wsgi Environ key", 

150 user_id, 

151 ) 

152 except KeyError: 

153 if self.default_user is None: 

154 raise webob.exc.HTTPUnauthorized( 

155 "POSTRFP-TEST-USER header not set and no default" 

156 ) 

157 user_id = self.default_user 

158 log.info( 

159 "Unsafely Authenticated user [%s] as default user for DevHeaderPolicy", 

160 user_id, 

161 ) 

162 request.remote_user = str(user_id) 

163 set_user_object(request) 

164 

165 def remember(self, request, response): 

166 pass 

167 

168 

169class FallbackPolicy(AbstractIdentityPolicy): 

170 def __init__(self, default_user=None): 

171 self.default_user = None 

172 self.header_policy = DevHeaderPolicy(default_user=default_user) 

173 self.bearer_policy = JwtBearerPolicy() 

174 

175 def identify(self, request): 

176 try: 

177 self.bearer_policy.identify(request) 

178 log.warning( 

179 f"Authenticated user {request.remote_user} with JWT Bearer policy" 

180 ) 

181 except webob.exc.HTTPUnauthorized: 

182 self.header_policy.identify(request) 

183 log.warning( 

184 f"Authenticated user {request.remote_user} with HTTP Test Header policy" 

185 ) 

186 

187 def remember(self, request, response): 

188 pass