Source code for ractogateway._models.stream

"""Streaming response models — every chunk the user sees is a ``StreamChunk``."""

from __future__ import annotations

from typing import Any

from pydantic import BaseModel, Field

from ractogateway.adapters.base import FinishReason, ToolCallResult


[docs] class StreamDelta(BaseModel): """Incremental content produced by a single streaming event.""" text: str = "" thinking: str = "" tool_call_id: str | None = None tool_call_name: str | None = None tool_call_args_fragment: str | None = None
[docs] class StreamChunk(BaseModel): """A single piece of a streaming response. Consumers iterate over ``StreamChunk`` objects — they never touch raw provider events directly. Attributes ---------- delta: The incremental content for this chunk. accumulated_text: Running concatenation of all ``delta.text`` values so far. finish_reason: ``None`` for intermediate chunks; set on the final chunk. tool_calls: Empty until the final chunk (``is_final=True``). usage: Token counts — populated on the final chunk only. is_final: ``True`` only for the very last chunk in the stream. raw: The underlying provider event (escape-hatch for advanced users). """ delta: StreamDelta = Field(default_factory=StreamDelta) accumulated_text: str = "" accumulated_thinking: str = "" is_thinking: bool = False finish_reason: FinishReason | None = None tool_calls: list[ToolCallResult] = Field(default_factory=list) usage: dict[str, int] = Field(default_factory=dict) is_final: bool = False parsed: dict[str, Any] | list[Any] | None = Field( default=None, description=( "Auto-parsed and validated JSON from accumulated_text on the final chunk. " "Populated only when ChatConfig.response_model is set and the stream completes." ), ) raw: Any = Field(default=None) model_config = {"arbitrary_types_allowed": True}