Coverage for postrfp / web / apps / internal.py: 71%
28 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 01:35 +0000
1from webob.response import Response
3from postrfp.auth.policy import PassthroughPolicy
4from postrfp.web.suxint import Sux
5from postrfp.web.base import WSGIApp
6from postrfp.templates import get_template
7from postrfp import conf
10internal_sux = Sux(
11 "postrfp.jobs.internal.endpoints",
12 "postrfp.web.adaptors.internal",
13 models_module="postrfp.jobs.internal.schemas",
14 api_name="PostRFP Internal API",
15 description="Internal API Endpoints ",
16)
19class InternalApp(WSGIApp):
20 """
21 WSGI Web Application for internal PostRFP API endpoints.
22 """
24 routes = {}
26 def __init__(self, **kwargs):
27 if not kwargs.get("auth_policy", None):
28 kwargs["auth_policy"] = PassthroughPolicy()
29 super().__init__(**kwargs)
31 def validate_user(self, request):
32 pass
34 def build_sux(self):
35 self.sux_instance = internal_sux
38@InternalApp.route("/eventconfig")
39def get_eventconfig(request):
40 """Health check endpoint for Dagu to verify service is up."""
42 from postrfp.jobs.executor import get_executor
43 from postrfp.jobs.events.action import registry, webhook_evt_types
45 confdata = {
46 "status": "ok",
47 "executor": get_executor().__class__.__name__,
48 "registered_handlers": list(registry.keys()),
49 "webhook_enabled_types": list(webhook_evt_types()),
50 }
51 tmpl = get_template("tools/event_config.html")
52 return Response(
53 tmpl.render(confdata=confdata, request=request), content_type="text/html"
54 )
57@InternalApp.route("/config")
58def config(request):
59 """Show config information for server - db name etc"""
60 tmpl = get_template("tools/conf_info.html")
61 html = tmpl.render(conf=conf.CONF, request=request, user=None)
62 return Response(html)