"""PrometheusExporter — HTTP server exposing a ``/metrics`` endpoint.
Requires: ``pip install ractogateway[prometheus]``
Example::
from ractogateway.telemetry import GatewayMetricsMiddleware, PrometheusExporter
metrics = GatewayMetricsMiddleware()
exporter = PrometheusExporter(port=8000)
exporter.start()
# Prometheus can now scrape http://localhost:8000/metrics
# ...
exporter.stop()
"""
from __future__ import annotations
import threading
from typing import Any
def _require_prometheus() -> Any:
try:
import prometheus_client
except ImportError as exc:
raise ImportError(
"prometheus_client is required for PrometheusExporter. "
"Install with: pip install ractogateway[prometheus]"
) from exc
return prometheus_client
[docs]
class PrometheusExporter:
"""Start an HTTP server that exposes all registered Prometheus metrics.
The server listens on ``http://0.0.0.0:<port>/metrics`` and responds
with the standard Prometheus text exposition format. This is designed
to be used alongside :class:`~ractogateway.telemetry.GatewayMetricsMiddleware`
but works with any metrics registered in the global registry.
Parameters
----------
port:
HTTP port to listen on. Defaults to ``8000``.
registry:
Custom ``prometheus_client.CollectorRegistry``. When ``None``
the global ``REGISTRY`` is used.
Example::
from ractogateway.telemetry import GatewayMetricsMiddleware, PrometheusExporter
metrics = GatewayMetricsMiddleware()
exporter = PrometheusExporter(port=9090)
exporter.start()
# Prometheus can now scrape http://localhost:9090/metrics
exporter.stop()
Requires: ``pip install ractogateway[prometheus]``
"""
def __init__(
self,
port: int = 8000,
*,
registry: Any | None = None,
) -> None:
self._port = port
self._registry = registry
self._server: Any | None = None
self._lock: threading.Lock = threading.Lock()
# ------------------------------------------------------------------
# Lifecycle
# ------------------------------------------------------------------
[docs]
def start(self) -> None:
"""Start the metrics HTTP server in a background daemon thread.
Calling ``start()`` a second time on an already-running exporter
is a no-op.
"""
prom = _require_prometheus()
with self._lock:
if self._server is not None:
return
kw: dict[str, Any] = {"port": self._port, "addr": ""}
if self._registry is not None:
kw["registry"] = self._registry
server, _ = prom.start_http_server(**kw)
self._server = server
[docs]
def stop(self) -> None:
"""Shut down the metrics HTTP server.
Safe to call even if the server was never started.
"""
with self._lock:
if self._server is not None:
self._server.shutdown()
self._server = None
# ------------------------------------------------------------------
# Convenience
# ------------------------------------------------------------------
@property
def port(self) -> int:
"""The configured HTTP port."""
return self._port
@property
def is_running(self) -> bool:
"""``True`` when the HTTP server is active."""
with self._lock:
return self._server is not None
[docs]
def generate_latest(self) -> str:
"""Return current metrics in Prometheus text format.
No HTTP server required — useful for testing or embedding the
metrics in a custom endpoint::
text = exporter.generate_latest()
assert "process_resident_memory_bytes" in text
Returns
-------
str
UTF-8 decoded Prometheus text exposition string.
"""
prom = _require_prometheus()
if self._registry is not None:
raw: bytes = prom.generate_latest(self._registry)
else:
raw = prom.generate_latest()
return raw.decode()