Coverage for postrfp/web/suxint/handler.py: 100%

105 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 21:34 +0000

1import re 

2import inspect 

3import textwrap 

4from itertools import chain 

5from types import ModuleType 

6from inspect import Parameter, Signature 

7from typing import Callable, Any, Generator, Iterator, Iterable 

8 

9from .extractors import PathArg, ArgExtractor 

10from postrfp.shared.types import Adaptor 

11 

12 

13class Handler: 

14 """ 

15 Provides a wrapper around a real api endpoint (function or method) 

16 

17 Used primarily for generating documentation 

18 """ 

19 

20 def __init__( 

21 self, name: str, func: Callable, adaptor_mod: ModuleType, tag: str | None = None 

22 ) -> None: 

23 self.name = name 

24 self.func = func 

25 self.adaptor_mod = adaptor_mod 

26 self.name_elements = name.split("_") 

27 self._signature: Signature | None = None 

28 self._retval: Any = None 

29 self._docs = DocString(func) 

30 self.tag = tag 

31 

32 @property 

33 def method(self) -> str: 

34 """HTTP method, e.g. 'GET' or 'POST""" 

35 return self.name_elements[0].upper() 

36 

37 @property 

38 def path(self) -> str: 

39 """ 

40 Show the documentation path for this Handler 

41 

42 This effectively reverses the process that Sux follows - it constructs the 

43 'real' HTTP path by interleaving the function name and the arguments to build 

44 a URL. so get_this_that(this_id) becomes /this/{this_id}/that/ 

45 

46 adaptor.name is used to match incoming http request path elements 

47 adaptor_map will look like { 

48 'project': project_id() - an ArgExtractor function which extracts 

49 the project id from the path 

50 'issue': issue_id() 

51 } 

52 """ 

53 adaptor_map = { 

54 adaptor.arg_name: adaptor for arg_name, adaptor in self.path_params() 

55 } 

56 

57 def lookup(match): 

58 path_part = match.groups()[0] 

59 if path_part in adaptor_map: 

60 adaptor = adaptor_map[path_part] 

61 # Interleave the parameter name into the function name derived path 

62 return "/%s/{%s}" % (path_part, adaptor.doc_name) 

63 return "/" + path_part 

64 

65 unparameterised_path = "/" + "/".join(self.name_elements[1:]) 

66 return re.sub(r"/(\w+)", lookup, unparameterised_path) 

67 

68 def path_params(self) -> Generator[tuple[str, PathArg], None, None]: 

69 """Generator which yields all PathArg arguments for this handler""" 

70 for arg_name in self.required_arguments: 

71 adaptor = getattr(self.adaptor_mod, arg_name, None) 

72 if adaptor is not None and isinstance(adaptor, PathArg): 

73 yield arg_name, adaptor 

74 

75 def adaptors(self) -> Iterable[Adaptor]: 

76 for arg_name in chain(self.required_arguments, self.optional_arguments): 

77 adaptor = getattr(self.adaptor_mod, arg_name, None) 

78 if adaptor is not None and isinstance(adaptor, ArgExtractor): 

79 yield adaptor 

80 

81 @property 

82 def parameters(self) -> Iterator[Parameter]: 

83 """Returns a list in inspect.Parameter objects for this handler's function""" 

84 if self._signature is None: 

85 self._signature = inspect.signature(self.func) 

86 return iter(self._signature.parameters.values()) 

87 

88 @property 

89 def required_arguments(self) -> list[str]: 

90 """The list of non kwarg argument names for the wrapped func""" 

91 return [p.name for p in self.parameters if p.default is Signature.empty] 

92 

93 @property 

94 def optional_arguments(self) -> list[str]: 

95 """A list of optional (keyword) arguments for the wrapped function""" 

96 return [p.name for p in self.parameters if p.default is not Signature.empty] 

97 

98 @property 

99 def docs(self) -> str: 

100 info = self._docs.intro 

101 perms = self.permissions 

102 if perms: 

103 info += "\n\nPermissions: " + perms 

104 errs = self.errors 

105 if errs: 

106 info += "\n\nErrors: " + errs 

107 return info 

108 

109 @property 

110 def permissions(self) -> str | None: 

111 if not self._docs.permissions: 

112 return None 

113 return ", ".join(self._docs.permissions) 

114 

115 @property 

116 def errors(self) -> str | None: 

117 if not self._docs.permissions: 

118 return None 

119 return ", ".join(self._docs.errors) 

120 

121 @property 

122 def return_annotation(self) -> Any: 

123 from typing import get_args 

124 

125 retval = self.return_value 

126 try: 

127 return get_args(retval)[1] 

128 except IndexError: 

129 return None 

130 

131 @property 

132 def return_value(self) -> Any: 

133 if self._retval is None: 

134 self._retval = inspect.get_annotations(self.func).get("return", None) 

135 return self._retval 

136 

137 def __repr__(self): 

138 return ( 

139 f"<suxint.Handler '{self.name}' Method: {self.method}, Path: {self.path}>" 

140 ) 

141 

142 

143class DocString: 

144 """Extracts information from a function's docstring using @thing notation""" 

145 

146 def __init__(self, func: Callable) -> None: 

147 if func.__doc__ is not None: 

148 self.docstring = textwrap.dedent(func.__doc__).strip() 

149 else: 

150 self.docstring = "" 

151 self.param_pairs = re.findall(r"@(\w+)\s+(.*)$", self.docstring, re.MULTILINE) 

152 

153 def _key(self, key: str) -> list[str]: 

154 return [v for k, v in self.param_pairs if k == key] 

155 

156 @property 

157 def intro(self) -> str: 

158 intro, _at, _after_at = self.docstring.partition("@") 

159 return intro.strip() 

160 

161 @property 

162 def permissions(self) -> list[str]: 

163 return self._key("permissions") 

164 

165 @property 

166 def errors(self) -> list[str]: 

167 return self._key("raises")