ractogateway.kafka._models

Pydantic models for Kafka integration primitives.

class ractogateway.kafka._models.KafkaProducerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaProducerClient.

Parameters:
  • bootstrap_servers (str | list[str]) – Kafka broker list (single host string or multiple hosts).

  • client_id (str) – Producer client ID reported to the broker.

  • acks (Literal['all', '0', '1'] | int) – Acknowledgement policy; "all" is safest for durability.

  • linger_ms (int) – Small batching delay to improve throughput.

  • compression_type (str | None) – Optional compression algorithm (e.g. "gzip", "snappy").

  • retries (int) – Retry attempts for transient broker/network failures.

  • request_timeout_ms (int) – Broker request timeout.

  • security_protocol (str) – Kafka transport protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL).

  • sasl_mechanism (str | None) – SASL mechanism when using SASL security protocols.

  • sasl_plain_username (str | None) – Username for SASL PLAIN/SCRAM.

  • sasl_plain_password (str | None) – Password for SASL PLAIN/SCRAM.

  • extra (dict[str, Any]) – Additional kwargs forwarded directly to kafka.KafkaProducer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

bootstrap_servers: str | list[str]
client_id: str
acks: Literal['all', '0', '1'] | int
linger_ms: int
compression_type: str | None
retries: int
request_timeout_ms: int
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaConsumerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaConsumerClient.

Parameters:
  • topics (list[str]) – Topic list to subscribe on startup.

  • bootstrap_servers (str | list[str]) – Kafka broker list.

  • group_id (str | None) – Consumer group ID. None disables group coordination.

  • auto_offset_reset (Literal['earliest', 'latest', 'none']) – Initial offset policy when no committed offset is present.

  • enable_auto_commit (bool) – Whether Kafka should auto-commit offsets in the background.

  • max_poll_records (int) – Upper bound on records returned per poll call.

  • poll_timeout_ms (int) – Default poll timeout in milliseconds.

  • session_timeout_ms (int) – Group session timeout.

  • client_id (str) – Consumer client ID.

  • security_protocol (str) – Kafka transport protocol.

  • sasl_mechanism (str | None) – SASL mechanism when applicable.

  • sasl_plain_username (str | None) – Username for SASL auth.

  • sasl_plain_password (str | None) – Password for SASL auth.

  • extra (dict[str, Any]) – Additional kwargs forwarded to kafka.KafkaConsumer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topics: list[str]
bootstrap_servers: str | list[str]
group_id: str | None
auto_offset_reset: Literal['earliest', 'latest', 'none']
enable_auto_commit: bool
max_poll_records: int
poll_timeout_ms: int
session_timeout_ms: int
client_id: str
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaStreamConfig(**data)[source]

Bases: BaseModel

Batch processing configuration for KafkaStreamProcessor.

Parameters:
  • batch_size (int) – Maximum messages to process per handler invocation.

  • max_wait_ms (int) – Max wall-clock wait to collect one batch.

  • poll_timeout_ms (int) – Per-poll timeout while collecting a batch.

  • commit_on_success (bool) – Commit offsets after successful handler + publish cycle.

  • wait_for_publish (bool) – If True, block for broker acknowledgement on each produced output.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

batch_size: int
max_wait_ms: int
poll_timeout_ms: int
commit_on_success: bool
wait_for_publish: bool
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaMessage(**data)[source]

Bases: BaseModel

Normalized Kafka record returned by KafkaConsumerClient.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
key: str | bytes | None
value: Any
headers: dict[str, bytes | None]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaProduceResult(**data)[source]

Bases: BaseModel

Broker acknowledgement metadata returned by publish operations.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaPublishRequest(**data)[source]

Bases: BaseModel

Explicit publish request used by stream routing handlers.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
value: Any
key: str | bytes | None
headers: dict[str, str | bytes | None] | None
partition: int | None
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaAuditEvent(**data)[source]

Bases: BaseModel

Immutable audit envelope for prompt/response traces.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_id: str
model: str
prompt: str
response: str
request_id: str | None
metadata: dict[str, Any]
timestamp_utc: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].