Coverage for postrfp/web/suxint/sux.py: 100%
148 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 21:34 +0000
1import re
2import logging
3from types import ModuleType
4import inspect
5from inspect import Parameter
6import importlib
7from typing import Callable, Any, Generator, Iterable
9from webob.request import Request
11from .handler import Handler
12from .validate import validate_handlers
13from .openapi import create_spec
14from .extractors import PathArg
15from postrfp.shared.exceptions import RoutingError
16from postrfp.shared.constants import HANDLER_PREFIX
17from postrfp.shared.types import Authoriser, SuxHandler
20ARG_REGEX = r"/(\d+|:\w+)"
22log = logging.getLogger(__name__)
25class Sux:
26 """
27 Sux is a dispatcher. It inspects an http request, finds a corresponding callable,
28 and invokes it. The motivation is to avoid having a routes table which matches
29 regular expresions to handler functions.
31 Handler Matching
32 Handlers are matched by simply converting slashes in a URL into underscores
34 GET /this/that/ -> get_this_that
35 POST /ding/dong/ -> post_ding_dong
37 A simple heuristic / stupid shortcut is used to match URL path arguments:
38 anything that looks like an argument is stripped out before the function is matched.
39 This stripping is done by applying a regular expresion (self.arg_regex).
41 The default regex is r'/(\\d+|:\\w+)' - i.e. extract digits or strings that
42 start with a colon from the URL path.
44 GET /this/23/that/ -> get_this_that(23)
45 GET /some/:cake -> get_some(cake)
47 Function arguments and invocation
49 Sux introspects the function arguments. For each argument, Sux will look for an "adaptor"
50 function. An adaptor function takes an http request as its argument and extracts a value
51 from the request. Thus the handler is decoupled from http.
53 Example
54 1 http request GET /this/23/that?user_name=bob
56 2 lookup matching handler get_this_that(this_id, user_name)
58 3 extract argument names this_id, user_name
60 4 find corresponding adaptors functions this_id(request), user_name(request)
62 5 call adaptors and collect return vals [23, 'bob']
64 6 invoke handler get_this_that(23, user_name='bob'))
67 @param handler_module - string module name or actual module containing
68 handler (end point) functions or a package reference, the __init__.py of which
69 must import the modules that Sux should find.
71 @param adaptor_module - string module name or actual module containing adaptor
72 functions. If None, then the handler_module is assumed to also contain
73 adaptor funtions.
75 @params models_module - string name of module providing Pydantic schema models
77 @param arg_regex - regular expression (string) defining which url path name elements
78 should be treated as arguments. By default any numerical name_elements
79 are captured by the regex r'\\d+'. So /a/23/b/ would yield 23
81 @param mount_depth - Sux inspects the path_info element of the URL to resolve the matching
82 handler method. If sux is mounted at a given path, e.g. '/url/' then
83 this path element is ignored. The mount_depth argument indicates how many
84 path arguments should be chopped. Default is 1,
85 e.g. GET '/api/things/23' > get_api(things_id)
86 @param api_name - The name of this API for documentation purposes
88 @param validate - boolean to determine whether Sux should check that the handler module
89 contains at least one valid handler. Set to False for easier unit testing.
91 """
93 def __init__(
94 self,
95 handler_module: str | object,
96 adaptor_module: str | object | None = None,
97 models_module: str = "postrfp.shared.serial",
98 arg_regex: str | None = None,
99 mount_depth: int = 1,
100 api_name: str = "PostRFP API",
101 validate: bool = True,
102 description: str = "API for managing counterparty risk and intelligence",
103 authoriser: Authoriser | None = None,
104 ) -> None:
105 if arg_regex is None:
106 arg_regex = ARG_REGEX
107 self.models_module = models_module
108 self.arg_regex = re.compile(arg_regex)
109 self.mount_depth = mount_depth
110 self.api_name = api_name
111 self.description = description
112 self.authoriser = authoriser
113 self.handler_module: ModuleType | None = None
114 self.adaptor_module: ModuleType | None = None
115 self.modules: dict[
116 str, ModuleType
117 ] = {} # set of modules in package containing valid handlers
119 self._spec_dict: dict | None = None
121 self._handler_cache: dict[
122 str, Callable
123 ] = {} # cache of all func names to handlers
124 self._handler_argspec_cache: dict[
125 Callable, list[str]
126 ] = {} # cache results of inspecting functions
128 self.load_handler_module(handler_module)
129 self.load_adaptors(adaptor_module)
130 self.load_handler_cache()
132 if validate:
133 validate_handlers(self)
134 if not len(self._handler_cache) > 0:
135 raise RoutingError(
136 "handler module does not contain any valid handler functions"
137 )
139 def load_handler_module(self, handler_module: str | object) -> None:
140 if inspect.ismodule(handler_module):
141 self.handler_module = handler_module
142 elif isinstance(handler_module, str):
143 self.handler_module = importlib.import_module(handler_module)
144 else:
145 raise RoutingError("handler_module must be a module or string module name")
147 def load_handler_cache(self) -> None:
148 for handler in self.iter_handlers():
149 self._handler_cache[handler.name] = handler.func
151 def load_adaptors(self, adaptor_module: str | object | None) -> None:
152 if adaptor_module is None:
153 self.adaptor_module = self.handler_module
154 elif inspect.ismodule(adaptor_module):
155 self.adaptor_module = adaptor_module
156 elif isinstance(adaptor_module, str):
157 self.adaptor_module = importlib.import_module(adaptor_module)
158 else:
159 raise RoutingError("adaptor_module must be a module or string module name")
161 def extract_handler_name(self, request: Request) -> tuple[str, int]:
162 """
163 Converts an http request path into a method name
165 - Replaces '/' with '_' characters
166 - Removes anything that looks like a path argument (according to arg_regex)
168 e.g /get/project/1/people/ => get_project_people(project_id)
169 """
170 method = request.method.lower()
171 # strip digits and preceding slashes from the name
172 bare_path, num_path_args = self.arg_regex.subn("", request.path_info)
173 handler_suffix = bare_path.strip("/").replace("/", "_")
175 if not handler_suffix:
176 # assume a call to the root '/', i.e. GET / maps to def get():
177 return (method, num_path_args)
179 derived_func_name = "%s_%s" % (method, handler_suffix)
181 return (derived_func_name, num_path_args)
183 def no_matching_handler(self, handler_name: str, request: Request) -> dict:
184 if request.path_info_peek() == "openapi.json":
185 if self._spec_dict is None:
186 path = request.script_name.rstrip("/") + "/"
187 spec = create_spec(self, path=path)
188 self._spec_dict = spec.to_dict()
189 return self._spec_dict
191 raise RoutingError(
192 'Handler "%s" not found in module "%s" given path "%s"'
193 % (
194 handler_name,
195 self.handler_module.__name__ if self.handler_module else "Unknown",
196 request.path,
197 )
198 )
200 def args_for_handler(self, handler_func: Callable) -> list[str]:
201 """
202 Extract the names of the function arguments parameters declared by
203 the given callable
204 """
205 cache = self._handler_argspec_cache
207 if handler_func not in cache:
208 sig = inspect.signature(handler_func)
209 # skip the first handler_func argument if instance method(allow for self)
210 skip = 0 if inspect.isfunction(handler_func) else 1
211 arg_names = [
212 p.name
213 for p in sig.parameters.values()
214 if p.kind is Parameter.POSITIONAL_OR_KEYWORD
215 ][skip:]
216 cache[handler_func] = arg_names
218 return cache[handler_func]
220 def __call__(self, original_request: Request) -> Any:
221 request = original_request.copy()
222 for _ in range(self.mount_depth):
223 request.path_info_pop()
225 handler_name, num_path_args = self.extract_handler_name(request)
227 try:
228 handler = self._handler_cache[handler_name]
229 except KeyError:
230 return self.no_matching_handler(handler_name, request)
232 if not getattr(handler, "http_exposed", False):
233 raise RoutingError("Handler %s is not http accessible" % handler)
234 if getattr(handler, "etag", False):
235 original_request.generate_etag = True
237 params = []
239 handler_args = self.args_for_handler(handler)
240 for handler_arg in handler_args:
241 try:
242 adaptor = getattr(self.adaptor_module, handler_arg)
243 except AttributeError:
244 raise AttributeError(
245 'Adaptor "%s" not found in module "%s"'
246 % (
247 handler_arg,
248 self.adaptor_module.__name__
249 if self.adaptor_module
250 else "Unknown",
251 )
252 )
253 if num_path_args > 0:
254 if isinstance(adaptor, PathArg):
255 num_path_args -= 1
256 elif num_path_args == 0 and isinstance(adaptor, PathArg):
257 msg = (
258 f'Path argument "{adaptor.arg_name}" not found'
259 f" for function {handler.__name__}"
260 )
261 log.error(msg)
262 raise RoutingError(msg)
264 command = adaptor(request)
265 params.append(command)
267 if num_path_args > 0:
268 # check that get_example_path() doesn't answer to GET /example/path/1/
269 # or GET /example/2/path
270 raise RoutingError(
271 f'Handler function for Request "{request.path}" '
272 "must consume all path arguments"
273 )
275 self.run_authoriser(request, handler, params, handler_args)
277 return self.execute_handler(handler, params)
279 def run_authoriser(
280 self, request: Request, handler: Callable, params: list, handler_args: list
281 ) -> None:
282 """
283 Invoke the authoriser callable if one has been provided
284 """
285 if self.authoriser is None:
286 return
287 request_args = {
288 handler_arg: param for handler_arg, param in zip(handler_args, params)
289 }
290 request_args["deny_restricted"] = getattr(handler, "deny_restricted", False)
292 required_permission = getattr(handler, "required_permission", None)
293 if required_permission is not None:
294 self.authoriser(
295 request.method,
296 request.path_info,
297 required_permission,
298 request_args,
299 )
300 else:
301 log.warning(
302 f'No permission associated with handler "{handler.__name__}"'
303 " so no authorisation check performed"
304 )
306 def execute_handler(self, handler: Callable, params: list) -> Any:
307 """Invoke the resolved handler.
309 Split into its own method so tests can patch the execution step when
310 isolating authorization or validation behaviour.
311 """
313 return handler(*params)
315 def iter_handler_modules(self) -> Generator[tuple[str, ModuleType], None, None]:
316 if self.handler_module is not None:
317 yield ("__root__", self.handler_module)
318 for name, module_member in inspect.getmembers(
319 self.handler_module, inspect.ismodule
320 ):
321 yield (name, module_member)
323 def iter_handlers(self) -> Iterable[SuxHandler]:
324 seen_funcs = set()
325 for mod_name, mod in self.iter_handler_modules():
326 for func_name, func in inspect.getmembers(mod, inspect.isfunction):
327 if func in seen_funcs:
328 continue
330 if HANDLER_PREFIX.match(func_name) and getattr(
331 func, "http_exposed", False
332 ):
333 if mod_name != "__root__" and mod_name not in self.modules:
334 self.modules[mod_name] = mod
335 tag = mod_name.capitalize() if mod_name != "__root__" else None
336 seen_funcs.add(func)
337 if self.adaptor_module is not None:
338 yield Handler(func_name, func, self.adaptor_module, tag=tag)
340 def handler_by_name(self, handler_name: str) -> SuxHandler | None:
341 for handler in self.iter_handlers():
342 if handler.name == handler_name:
343 return handler
344 return None