Coverage for postrfp/web/suxint/sux.py: 100%

148 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import re 

2import logging 

3from types import ModuleType 

4import inspect 

5from inspect import Parameter 

6import importlib 

7from typing import Callable, Any, Generator, Iterable 

8 

9from webob.request import Request 

10 

11from .handler import Handler 

12from .validate import validate_handlers 

13from .openapi import create_spec 

14from .extractors import PathArg 

15from postrfp.shared.exceptions import RoutingError 

16from postrfp.shared.constants import HANDLER_PREFIX 

17from postrfp.shared.types import Authoriser, SuxHandler 

18 

19 

20ARG_REGEX = r"/(\d+|:\w+)" 

21 

22log = logging.getLogger(__name__) 

23 

24 

25class Sux: 

26 """ 

27 Sux is a dispatcher. It inspects an http request, finds a corresponding callable, 

28 and invokes it. The motivation is to avoid having a routes table which matches 

29 regular expresions to handler functions. 

30 

31 Handler Matching 

32 Handlers are matched by simply converting slashes in a URL into underscores 

33 

34 GET /this/that/ -> get_this_that 

35 POST /ding/dong/ -> post_ding_dong 

36 

37 A simple heuristic / stupid shortcut is used to match URL path arguments: 

38 anything that looks like an argument is stripped out before the function is matched. 

39 This stripping is done by applying a regular expresion (self.arg_regex). 

40 

41 The default regex is r'/(\\d+|:\\w+)' - i.e. extract digits or strings that 

42 start with a colon from the URL path. 

43 

44 GET /this/23/that/ -> get_this_that(23) 

45 GET /some/:cake -> get_some(cake) 

46 

47 Function arguments and invocation 

48 

49 Sux introspects the function arguments. For each argument, Sux will look for an "adaptor" 

50 function. An adaptor function takes an http request as its argument and extracts a value 

51 from the request. Thus the handler is decoupled from http. 

52 

53 Example 

54 1 http request GET /this/23/that?user_name=bob 

55 

56 2 lookup matching handler get_this_that(this_id, user_name) 

57 

58 3 extract argument names this_id, user_name 

59 

60 4 find corresponding adaptors functions this_id(request), user_name(request) 

61 

62 5 call adaptors and collect return vals [23, 'bob'] 

63 

64 6 invoke handler get_this_that(23, user_name='bob')) 

65 

66 

67 @param handler_module - string module name or actual module containing 

68 handler (end point) functions or a package reference, the __init__.py of which 

69 must import the modules that Sux should find. 

70 

71 @param adaptor_module - string module name or actual module containing adaptor 

72 functions. If None, then the handler_module is assumed to also contain 

73 adaptor funtions. 

74 

75 @params models_module - string name of module providing Pydantic schema models 

76 

77 @param arg_regex - regular expression (string) defining which url path name elements 

78 should be treated as arguments. By default any numerical name_elements 

79 are captured by the regex r'\\d+'. So /a/23/b/ would yield 23 

80 

81 @param mount_depth - Sux inspects the path_info element of the URL to resolve the matching 

82 handler method. If sux is mounted at a given path, e.g. '/url/' then 

83 this path element is ignored. The mount_depth argument indicates how many 

84 path arguments should be chopped. Default is 1, 

85 e.g. GET '/api/things/23' > get_api(things_id) 

86 @param api_name - The name of this API for documentation purposes 

87 

88 @param validate - boolean to determine whether Sux should check that the handler module 

89 contains at least one valid handler. Set to False for easier unit testing. 

90 

91 """ 

92 

93 def __init__( 

94 self, 

95 handler_module: str | object, 

96 adaptor_module: str | object | None = None, 

97 models_module: str = "postrfp.shared.serial", 

98 arg_regex: str | None = None, 

99 mount_depth: int = 1, 

100 api_name: str = "PostRFP API", 

101 validate: bool = True, 

102 description: str = "API for managing counterparty risk and intelligence", 

103 authoriser: Authoriser | None = None, 

104 ) -> None: 

105 if arg_regex is None: 

106 arg_regex = ARG_REGEX 

107 self.models_module = models_module 

108 self.arg_regex = re.compile(arg_regex) 

109 self.mount_depth = mount_depth 

110 self.api_name = api_name 

111 self.description = description 

112 self.authoriser = authoriser 

113 self.handler_module: ModuleType | None = None 

114 self.adaptor_module: ModuleType | None = None 

115 self.modules: dict[ 

116 str, ModuleType 

117 ] = {} # set of modules in package containing valid handlers 

118 

119 self._spec_dict: dict | None = None 

120 

121 self._handler_cache: dict[ 

122 str, Callable 

123 ] = {} # cache of all func names to handlers 

124 self._handler_argspec_cache: dict[ 

125 Callable, list[str] 

126 ] = {} # cache results of inspecting functions 

127 

128 self.load_handler_module(handler_module) 

129 self.load_adaptors(adaptor_module) 

130 self.load_handler_cache() 

131 

132 if validate: 

133 validate_handlers(self) 

134 if not len(self._handler_cache) > 0: 

135 raise RoutingError( 

136 "handler module does not contain any valid handler functions" 

137 ) 

138 

139 def load_handler_module(self, handler_module: str | object) -> None: 

140 if inspect.ismodule(handler_module): 

141 self.handler_module = handler_module 

142 elif isinstance(handler_module, str): 

143 self.handler_module = importlib.import_module(handler_module) 

144 else: 

145 raise RoutingError("handler_module must be a module or string module name") 

146 

147 def load_handler_cache(self) -> None: 

148 for handler in self.iter_handlers(): 

149 self._handler_cache[handler.name] = handler.func 

150 

151 def load_adaptors(self, adaptor_module: str | object | None) -> None: 

152 if adaptor_module is None: 

153 self.adaptor_module = self.handler_module 

154 elif inspect.ismodule(adaptor_module): 

155 self.adaptor_module = adaptor_module 

156 elif isinstance(adaptor_module, str): 

157 self.adaptor_module = importlib.import_module(adaptor_module) 

158 else: 

159 raise RoutingError("adaptor_module must be a module or string module name") 

160 

161 def extract_handler_name(self, request: Request) -> tuple[str, int]: 

162 """ 

163 Converts an http request path into a method name 

164 

165 - Replaces '/' with '_' characters 

166 - Removes anything that looks like a path argument (according to arg_regex) 

167 

168 e.g /get/project/1/people/ => get_project_people(project_id) 

169 """ 

170 method = request.method.lower() 

171 # strip digits and preceding slashes from the name 

172 bare_path, num_path_args = self.arg_regex.subn("", request.path_info) 

173 handler_suffix = bare_path.strip("/").replace("/", "_") 

174 

175 if not handler_suffix: 

176 # assume a call to the root '/', i.e. GET / maps to def get(): 

177 return (method, num_path_args) 

178 

179 derived_func_name = "%s_%s" % (method, handler_suffix) 

180 

181 return (derived_func_name, num_path_args) 

182 

183 def no_matching_handler(self, handler_name: str, request: Request) -> dict: 

184 if request.path_info_peek() == "openapi.json": 

185 if self._spec_dict is None: 

186 path = request.script_name.rstrip("/") + "/" 

187 spec = create_spec(self, path=path) 

188 self._spec_dict = spec.to_dict() 

189 return self._spec_dict 

190 

191 raise RoutingError( 

192 'Handler "%s" not found in module "%s" given path "%s"' 

193 % ( 

194 handler_name, 

195 self.handler_module.__name__ if self.handler_module else "Unknown", 

196 request.path, 

197 ) 

198 ) 

199 

200 def args_for_handler(self, handler_func: Callable) -> list[str]: 

201 """ 

202 Extract the names of the function arguments parameters declared by 

203 the given callable 

204 """ 

205 cache = self._handler_argspec_cache 

206 

207 if handler_func not in cache: 

208 sig = inspect.signature(handler_func) 

209 # skip the first handler_func argument if instance method(allow for self) 

210 skip = 0 if inspect.isfunction(handler_func) else 1 

211 arg_names = [ 

212 p.name 

213 for p in sig.parameters.values() 

214 if p.kind is Parameter.POSITIONAL_OR_KEYWORD 

215 ][skip:] 

216 cache[handler_func] = arg_names 

217 

218 return cache[handler_func] 

219 

220 def __call__(self, original_request: Request) -> Any: 

221 request = original_request.copy() 

222 for _ in range(self.mount_depth): 

223 request.path_info_pop() 

224 

225 handler_name, num_path_args = self.extract_handler_name(request) 

226 

227 try: 

228 handler = self._handler_cache[handler_name] 

229 except KeyError: 

230 return self.no_matching_handler(handler_name, request) 

231 

232 if not getattr(handler, "http_exposed", False): 

233 raise RoutingError("Handler %s is not http accessible" % handler) 

234 if getattr(handler, "etag", False): 

235 original_request.generate_etag = True 

236 

237 params = [] 

238 

239 handler_args = self.args_for_handler(handler) 

240 for handler_arg in handler_args: 

241 try: 

242 adaptor = getattr(self.adaptor_module, handler_arg) 

243 except AttributeError: 

244 raise AttributeError( 

245 'Adaptor "%s" not found in module "%s"' 

246 % ( 

247 handler_arg, 

248 self.adaptor_module.__name__ 

249 if self.adaptor_module 

250 else "Unknown", 

251 ) 

252 ) 

253 if num_path_args > 0: 

254 if isinstance(adaptor, PathArg): 

255 num_path_args -= 1 

256 elif num_path_args == 0 and isinstance(adaptor, PathArg): 

257 msg = ( 

258 f'Path argument "{adaptor.arg_name}" not found' 

259 f" for function {handler.__name__}" 

260 ) 

261 log.error(msg) 

262 raise RoutingError(msg) 

263 

264 command = adaptor(request) 

265 params.append(command) 

266 

267 if num_path_args > 0: 

268 # check that get_example_path() doesn't answer to GET /example/path/1/ 

269 # or GET /example/2/path 

270 raise RoutingError( 

271 f'Handler function for Request "{request.path}" ' 

272 "must consume all path arguments" 

273 ) 

274 

275 self.run_authoriser(request, handler, params, handler_args) 

276 

277 return self.execute_handler(handler, params) 

278 

279 def run_authoriser( 

280 self, request: Request, handler: Callable, params: list, handler_args: list 

281 ) -> None: 

282 """ 

283 Invoke the authoriser callable if one has been provided 

284 """ 

285 if self.authoriser is None: 

286 return 

287 request_args = { 

288 handler_arg: param for handler_arg, param in zip(handler_args, params) 

289 } 

290 request_args["deny_restricted"] = getattr(handler, "deny_restricted", False) 

291 

292 required_permission = getattr(handler, "required_permission", None) 

293 if required_permission is not None: 

294 self.authoriser( 

295 request.method, 

296 request.path_info, 

297 required_permission, 

298 request_args, 

299 ) 

300 else: 

301 log.warning( 

302 f'No permission associated with handler "{handler.__name__}"' 

303 " so no authorisation check performed" 

304 ) 

305 

306 def execute_handler(self, handler: Callable, params: list) -> Any: 

307 """Invoke the resolved handler. 

308 

309 Split into its own method so tests can patch the execution step when 

310 isolating authorization or validation behaviour. 

311 """ 

312 

313 return handler(*params) 

314 

315 def iter_handler_modules(self) -> Generator[tuple[str, ModuleType], None, None]: 

316 if self.handler_module is not None: 

317 yield ("__root__", self.handler_module) 

318 for name, module_member in inspect.getmembers( 

319 self.handler_module, inspect.ismodule 

320 ): 

321 yield (name, module_member) 

322 

323 def iter_handlers(self) -> Iterable[SuxHandler]: 

324 seen_funcs = set() 

325 for mod_name, mod in self.iter_handler_modules(): 

326 for func_name, func in inspect.getmembers(mod, inspect.isfunction): 

327 if func in seen_funcs: 

328 continue 

329 

330 if HANDLER_PREFIX.match(func_name) and getattr( 

331 func, "http_exposed", False 

332 ): 

333 if mod_name != "__root__" and mod_name not in self.modules: 

334 self.modules[mod_name] = mod 

335 tag = mod_name.capitalize() if mod_name != "__root__" else None 

336 seen_funcs.add(func) 

337 if self.adaptor_module is not None: 

338 yield Handler(func_name, func, self.adaptor_module, tag=tag) 

339 

340 def handler_by_name(self, handler_name: str) -> SuxHandler | None: 

341 for handler in self.iter_handlers(): 

342 if handler.name == handler_name: 

343 return handler 

344 return None