Coverage for postrfp / web / apps / internal.py: 71%

28 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 01:35 +0000

1from webob.response import Response 

2 

3from postrfp.auth.policy import PassthroughPolicy 

4from postrfp.web.suxint import Sux 

5from postrfp.web.base import WSGIApp 

6from postrfp.templates import get_template 

7from postrfp import conf 

8 

9 

10internal_sux = Sux( 

11 "postrfp.jobs.internal.endpoints", 

12 "postrfp.web.adaptors.internal", 

13 models_module="postrfp.jobs.internal.schemas", 

14 api_name="PostRFP Internal API", 

15 description="Internal API Endpoints ", 

16) 

17 

18 

19class InternalApp(WSGIApp): 

20 """ 

21 WSGI Web Application for internal PostRFP API endpoints. 

22 """ 

23 

24 routes = {} 

25 

26 def __init__(self, **kwargs): 

27 if not kwargs.get("auth_policy", None): 

28 kwargs["auth_policy"] = PassthroughPolicy() 

29 super().__init__(**kwargs) 

30 

31 def validate_user(self, request): 

32 pass 

33 

34 def build_sux(self): 

35 self.sux_instance = internal_sux 

36 

37 

38@InternalApp.route("/eventconfig") 

39def get_eventconfig(request): 

40 """Health check endpoint for Dagu to verify service is up.""" 

41 

42 from postrfp.jobs.executor import get_executor 

43 from postrfp.jobs.events.action import registry, webhook_evt_types 

44 

45 confdata = { 

46 "status": "ok", 

47 "executor": get_executor().__class__.__name__, 

48 "registered_handlers": list(registry.keys()), 

49 "webhook_enabled_types": list(webhook_evt_types()), 

50 } 

51 tmpl = get_template("tools/event_config.html") 

52 return Response( 

53 tmpl.render(confdata=confdata, request=request), content_type="text/html" 

54 ) 

55 

56 

57@InternalApp.route("/config") 

58def config(request): 

59 """Show config information for server - db name etc""" 

60 tmpl = get_template("tools/conf_info.html") 

61 html = tmpl.render(conf=conf.CONF, request=request, user=None) 

62 return Response(html)