"""RactoTracer — OpenTelemetry integration for RactoGateway.
Pass a ``RactoTracer`` instance as ``tracer=`` to any developer kit to
automatically emit OTEL spans for every LLM call.
Requires: ``pip install ractogateway[telemetry]``
Example::
from ractogateway import openai_developer_kit as opd
from ractogateway.telemetry import RactoTracer
tracer = RactoTracer(
otlp_endpoint="http://localhost:4317",
console=True,
)
kit = opd.OpenAIDeveloperKit(
model="gpt-4o",
default_prompt=my_prompt,
tracer=tracer,
)
response = kit.chat(opd.ChatConfig(user_message="Hello"))
# A span named "llm.chat" is now in your OTEL backend.
"""
from __future__ import annotations
import threading
import time
from typing import Any
from ractogateway.telemetry._models import ModelPricing, SpanRecord
from ractogateway.telemetry._pricing import DEFAULT_COST_TABLE
def _require_otel_sdk() -> Any:
try:
from opentelemetry.sdk.trace import TracerProvider
except ImportError as exc:
raise ImportError(
"OpenTelemetry SDK is required for RactoTracer. "
"Install with: pip install ractogateway[telemetry]"
) from exc
return TracerProvider
def _require_otlp_grpc() -> Any:
try:
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
except ImportError as exc:
raise ImportError(
"OTLP gRPC exporter is required for otlp_endpoint. "
"Install with: pip install ractogateway[telemetry]"
) from exc
return OTLPSpanExporter
def _require_otlp_http() -> Any:
try:
from opentelemetry.exporter.otlp.proto.http.trace_exporter import (
OTLPSpanExporter,
)
except ImportError as exc:
raise ImportError(
"OTLP HTTP exporter is required for otlp_http_endpoint. "
"Install with: pip install ractogateway[telemetry]"
) from exc
return OTLPSpanExporter
class _InMemoryExporter:
"""Pure-Python in-memory span store — no OTEL dependency needed."""
def __init__(self) -> None:
self._records: list[SpanRecord] = []
self._lock: threading.Lock = threading.Lock()
def add(self, record: SpanRecord) -> None:
with self._lock:
self._records.append(record)
@property
def records(self) -> list[SpanRecord]:
with self._lock:
return list(self._records)
def clear(self) -> None:
with self._lock:
self._records.clear()
[docs]
class RactoTracer:
"""OpenTelemetry tracer — pass as ``tracer=`` to any developer kit.
Records one span per LLM call with attributes for latency, token
usage, estimated cost, cache-hit type, and tool-call count.
Supports OTLP gRPC (Jaeger / Grafana Tempo), OTLP HTTP, console
stdout, in-memory capture (for tests), and any custom
``opentelemetry.sdk.trace.export.SpanExporter``.
Parameters
----------
service_name:
OTEL ``service.name`` resource attribute.
Defaults to ``"ractogateway"``.
otlp_endpoint:
OTLP **gRPC** endpoint (e.g. ``"http://localhost:4317"``).
Requires ``pip install ractogateway[telemetry]``.
otlp_http_endpoint:
OTLP **HTTP** endpoint (e.g. ``"http://localhost:4318"``).
Requires ``pip install ractogateway[telemetry]``.
console:
Also print spans to stdout — convenient during local development.
in_memory:
Capture spans internally in a thread-safe list.
Access recorded spans via the :attr:`spans` property.
Useful for unit tests — no external backend required.
custom_exporter:
Any ``opentelemetry.sdk.trace.export.SpanExporter`` instance.
price_table:
Override or extend the built-in :data:`~ractogateway.telemetry.DEFAULT_COST_TABLE`.
Keys are model identifiers; values are :class:`ModelPricing` objects.
Span attributes
---------------
All spans carry the following OTEL attributes:
* ``llm.provider`` — ``"openai"`` / ``"google"`` / ``"anthropic"``
* ``llm.model`` — e.g. ``"gpt-4o"``
* ``llm.operation`` — ``"chat"`` / ``"stream"`` / ``"embed"``
* ``llm.latency_ms`` — wall-clock time in milliseconds
* ``llm.input_tokens`` — prompt tokens consumed
* ``llm.output_tokens`` — completion tokens produced
* ``llm.cost_usd`` — estimated USD cost (8 decimal places)
* ``llm.cache_hit`` — ``"exact"`` / ``"semantic"`` / ``"miss"``
* ``llm.tool_calls`` — number of tool calls in the response
* ``llm.error_type`` — exception class name on error (omitted on success)
"""
def __init__(
self,
*,
service_name: str = "ractogateway",
otlp_endpoint: str | None = None,
otlp_http_endpoint: str | None = None,
console: bool = False,
in_memory: bool = False,
custom_exporter: Any | None = None,
price_table: dict[str, ModelPricing] | None = None,
) -> None:
self._service_name = service_name
self._otlp_endpoint = otlp_endpoint
self._otlp_http_endpoint = otlp_http_endpoint
self._console = console
self._in_memory = in_memory
self._custom_exporter = custom_exporter
self._price_table: dict[str, ModelPricing] = {
**DEFAULT_COST_TABLE,
**(price_table or {}),
}
self._in_memory_exporter: _InMemoryExporter | None = (
_InMemoryExporter() if in_memory else None
)
# Build OTEL TracerProvider only when at least one exporter is requested.
self._otel_tracer: Any | None = None
if otlp_endpoint or otlp_http_endpoint or console or custom_exporter:
self._otel_tracer = self._build_otel_tracer()
# ------------------------------------------------------------------
# Private — OTEL setup
# ------------------------------------------------------------------
def _build_otel_tracer(self) -> Any:
"""Construct and configure an OpenTelemetry ``TracerProvider``."""
from opentelemetry.sdk.resources import (
SERVICE_NAME,
Resource,
)
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import (
BatchSpanProcessor,
ConsoleSpanExporter,
)
resource = Resource.create({SERVICE_NAME: self._service_name})
provider = TracerProvider(resource=resource)
if self._otlp_endpoint:
otlp_grpc_cls = _require_otlp_grpc()
provider.add_span_processor(
BatchSpanProcessor(otlp_grpc_cls(endpoint=self._otlp_endpoint))
)
if self._otlp_http_endpoint:
otlp_http_cls = _require_otlp_http()
provider.add_span_processor(
BatchSpanProcessor(otlp_http_cls(endpoint=self._otlp_http_endpoint))
)
if self._console:
provider.add_span_processor(BatchSpanProcessor(ConsoleSpanExporter()))
if self._custom_exporter is not None:
provider.add_span_processor(BatchSpanProcessor(self._custom_exporter))
return provider.get_tracer(self._service_name)
# ------------------------------------------------------------------
# Private — cost + OTEL span emission
# ------------------------------------------------------------------
def _compute_cost(self, model: str, input_tokens: int, output_tokens: int) -> float:
pricing = self._price_table.get(model)
if pricing is None:
return 0.0
return (
input_tokens * pricing.input_per_million / 1_000_000
+ output_tokens * pricing.output_per_million / 1_000_000
)
def _emit_otel_span(
self,
name: str,
provider: str,
model: str,
operation: str,
latency_ms: float,
input_tokens: int,
output_tokens: int,
cost_usd: float,
cache_hit: str,
tool_calls: int,
status: str,
error_type: str | None,
) -> None:
if self._otel_tracer is None:
return
from opentelemetry.trace import StatusCode
with self._otel_tracer.start_as_current_span(name) as span:
span.set_attribute("llm.provider", provider)
span.set_attribute("llm.model", model)
span.set_attribute("llm.operation", operation)
span.set_attribute("llm.latency_ms", round(latency_ms, 2))
span.set_attribute("llm.input_tokens", input_tokens)
span.set_attribute("llm.output_tokens", output_tokens)
span.set_attribute("llm.cost_usd", round(cost_usd, 8))
span.set_attribute("llm.cache_hit", cache_hit)
span.set_attribute("llm.tool_calls", tool_calls)
if status == "error":
span.set_status(StatusCode.ERROR, error_type or "unknown")
if error_type:
span.set_attribute("llm.error_type", error_type)
# ------------------------------------------------------------------
# Public recording API (called by developer kits)
# ------------------------------------------------------------------
[docs]
def record_chat_span(
self,
*,
provider: str,
model: str,
latency_ms: float,
input_tokens: int = 0,
output_tokens: int = 0,
cache_hit: str = "miss",
tool_calls: int = 0,
status: str = "ok",
error_type: str | None = None,
) -> None:
"""Record a completed chat or stream span.
Parameters
----------
provider:
Provider string (``"openai"``, ``"google"``, ``"anthropic"``).
model:
Model identifier (e.g. ``"gpt-4o"``).
latency_ms:
Total wall-clock latency of the LLM call in milliseconds.
input_tokens:
Number of prompt tokens consumed (``0`` for cache hits).
output_tokens:
Number of completion tokens produced (``0`` for cache hits).
cache_hit:
``"exact"``, ``"semantic"``, or ``"miss"``.
tool_calls:
Number of tool calls in the response.
status:
``"ok"`` or ``"error"``.
error_type:
Exception class name when ``status == "error"``, else ``None``.
"""
cost_usd = self._compute_cost(model, input_tokens, output_tokens)
self._emit_otel_span(
name="llm.chat",
provider=provider,
model=model,
operation="chat",
latency_ms=latency_ms,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost_usd,
cache_hit=cache_hit,
tool_calls=tool_calls,
status=status,
error_type=error_type,
)
if self._in_memory_exporter is not None:
self._in_memory_exporter.add(
SpanRecord(
name="llm.chat",
provider=provider,
model=model,
operation="chat",
latency_ms=latency_ms,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost_usd,
cache_hit=cache_hit,
tool_calls=tool_calls,
status=status,
error_type=error_type,
timestamp=time.time(),
)
)
[docs]
def record_embed_span(
self,
*,
provider: str,
model: str,
latency_ms: float,
input_tokens: int = 0,
status: str = "ok",
error_type: str | None = None,
) -> None:
"""Record a completed embedding span.
Parameters
----------
provider:
Provider string (``"openai"`` or ``"google"``).
model:
Embedding model identifier.
latency_ms:
Total wall-clock latency in milliseconds.
input_tokens:
Number of tokens embedded.
status:
``"ok"`` or ``"error"``.
error_type:
Exception class name when ``status == "error"``, else ``None``.
"""
cost_usd = self._compute_cost(model, input_tokens, 0)
self._emit_otel_span(
name="llm.embed",
provider=provider,
model=model,
operation="embed",
latency_ms=latency_ms,
input_tokens=input_tokens,
output_tokens=0,
cost_usd=cost_usd,
cache_hit="miss",
tool_calls=0,
status=status,
error_type=error_type,
)
if self._in_memory_exporter is not None:
self._in_memory_exporter.add(
SpanRecord(
name="llm.embed",
provider=provider,
model=model,
operation="embed",
latency_ms=latency_ms,
input_tokens=input_tokens,
output_tokens=0,
cost_usd=cost_usd,
cache_hit="miss",
tool_calls=0,
status=status,
error_type=error_type,
timestamp=time.time(),
)
)
# ------------------------------------------------------------------
# In-memory span access (for tests)
# ------------------------------------------------------------------
@property
def spans(self) -> list[SpanRecord]:
"""Return all captured in-memory spans.
Only populated when ``in_memory=True``. Thread-safe.
Returns
-------
list[SpanRecord]
Snapshot of all recorded spans (newest last).
"""
if self._in_memory_exporter is None:
return []
return self._in_memory_exporter.records
[docs]
def clear_spans(self) -> None:
"""Clear all in-memory spans.
Only has effect when ``in_memory=True``.
"""
if self._in_memory_exporter is not None:
self._in_memory_exporter.clear()