Kafka

Models

Pydantic models for Kafka integration primitives.

class ractogateway.kafka._models.KafkaProducerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaProducerClient.

Parameters:
  • bootstrap_servers (str | list[str]) – Kafka broker list (single host string or multiple hosts).

  • client_id (str) – Producer client ID reported to the broker.

  • acks (Literal['all', '0', '1'] | int) – Acknowledgement policy; "all" is safest for durability.

  • linger_ms (int) – Small batching delay to improve throughput.

  • compression_type (str | None) – Optional compression algorithm (e.g. "gzip", "snappy").

  • retries (int) – Retry attempts for transient broker/network failures.

  • request_timeout_ms (int) – Broker request timeout.

  • security_protocol (str) – Kafka transport protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL).

  • sasl_mechanism (str | None) – SASL mechanism when using SASL security protocols.

  • sasl_plain_username (str | None) – Username for SASL PLAIN/SCRAM.

  • sasl_plain_password (str | None) – Password for SASL PLAIN/SCRAM.

  • extra (dict[str, Any]) – Additional kwargs forwarded directly to kafka.KafkaProducer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

bootstrap_servers: str | list[str]
client_id: str
acks: Literal['all', '0', '1'] | int
linger_ms: int
compression_type: str | None
retries: int
request_timeout_ms: int
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaConsumerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaConsumerClient.

Parameters:
  • topics (list[str]) – Topic list to subscribe on startup.

  • bootstrap_servers (str | list[str]) – Kafka broker list.

  • group_id (str | None) – Consumer group ID. None disables group coordination.

  • auto_offset_reset (Literal['earliest', 'latest', 'none']) – Initial offset policy when no committed offset is present.

  • enable_auto_commit (bool) – Whether Kafka should auto-commit offsets in the background.

  • max_poll_records (int) – Upper bound on records returned per poll call.

  • poll_timeout_ms (int) – Default poll timeout in milliseconds.

  • session_timeout_ms (int) – Group session timeout.

  • client_id (str) – Consumer client ID.

  • security_protocol (str) – Kafka transport protocol.

  • sasl_mechanism (str | None) – SASL mechanism when applicable.

  • sasl_plain_username (str | None) – Username for SASL auth.

  • sasl_plain_password (str | None) – Password for SASL auth.

  • extra (dict[str, Any]) – Additional kwargs forwarded to kafka.KafkaConsumer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topics: list[str]
bootstrap_servers: str | list[str]
group_id: str | None
auto_offset_reset: Literal['earliest', 'latest', 'none']
enable_auto_commit: bool
max_poll_records: int
poll_timeout_ms: int
session_timeout_ms: int
client_id: str
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaStreamConfig(**data)[source]

Bases: BaseModel

Batch processing configuration for KafkaStreamProcessor.

Parameters:
  • batch_size (int) – Maximum messages to process per handler invocation.

  • max_wait_ms (int) – Max wall-clock wait to collect one batch.

  • poll_timeout_ms (int) – Per-poll timeout while collecting a batch.

  • commit_on_success (bool) – Commit offsets after successful handler + publish cycle.

  • wait_for_publish (bool) – If True, block for broker acknowledgement on each produced output.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

batch_size: int
max_wait_ms: int
poll_timeout_ms: int
commit_on_success: bool
wait_for_publish: bool
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaMessage(**data)[source]

Bases: BaseModel

Normalized Kafka record returned by KafkaConsumerClient.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
key: str | bytes | None
value: Any
headers: dict[str, bytes | None]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaProduceResult(**data)[source]

Bases: BaseModel

Broker acknowledgement metadata returned by publish operations.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaPublishRequest(**data)[source]

Bases: BaseModel

Explicit publish request used by stream routing handlers.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
value: Any
key: str | bytes | None
headers: dict[str, str | bytes | None] | None
partition: int | None
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka._models.KafkaAuditEvent(**data)[source]

Bases: BaseModel

Immutable audit envelope for prompt/response traces.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_id: str
model: str
prompt: str
response: str
request_id: str | None
metadata: dict[str, Any]
timestamp_utc: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Producer

Kafka producer wrapper for high-throughput event publishing.

class ractogateway.kafka.producer.KafkaProducerClient(*, config=None, producer=None, key_serializer=None, value_serializer=None)[source]

Bases: object

Typed facade over kafka.KafkaProducer.

Parameters:
  • config (KafkaProducerConfig | None) – Producer connection and reliability settings. Defaults are applied when omitted.

  • producer (Any | None) – Pre-built Kafka producer object. Useful for dependency injection in tests or when your runtime provides the producer lifecycle externally.

  • key_serializer (Callable[[Any], bytes] | None) – Optional override for key serialization.

  • value_serializer (Callable[[Any], bytes] | None) – Optional override for value serialization.

publish(topic, value, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]

Publish one event and optionally wait for broker acknowledgement.

Return type:

KafkaProduceResult | None

publish_json(topic, payload, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]

Alias for publish() when value is a JSON payload.

Return type:

KafkaProduceResult | None

flush(*, timeout_s=None)[source]

Force all buffered events to be sent to brokers.

Return type:

None

close(*, timeout_s=5.0)[source]

Close underlying producer and release network resources.

Return type:

None

Consumer

Kafka consumer wrapper that returns normalized typed records.

class ractogateway.kafka.consumer.KafkaConsumerClient(*, config, consumer=None, key_deserializer=None, value_deserializer=None)[source]

Bases: object

Typed facade over kafka.KafkaConsumer.

Parameters:
  • config (KafkaConsumerConfig) – Consumer connection and polling options.

  • consumer (Any | None) – Pre-built consumer object; useful for tests and dependency injection.

  • key_deserializer (Callable[[Any], Any] | None) – Optional override for key decoding.

  • value_deserializer (Callable[[Any], Any] | None) – Optional override for value decoding.

poll(*, timeout_ms=None, max_records=None)[source]

Poll records and return normalized KafkaMessage entries.

Return type:

list[KafkaMessage]

commit()[source]

Commit currently consumed offsets.

Return type:

None

close()[source]

Close underlying consumer and release resources.

Return type:

None

Stream Processor

Batched consume-process-publish utilities for real-time Kafka streams.

class ractogateway.kafka.stream.KafkaStreamProcessor(*, consumer, producer=None, config=None)[source]

Bases: object

Real-time stream loop helper for high-throughput event processing.

Parameters:
collect_batch()[source]

Collect up to batch_size messages bounded by max_wait_ms.

Return type:

list[KafkaMessage]

process_once(handler, *, output_topic=None)[source]

Run one batch cycle and return the number of consumed messages.

Return type:

int

Handler return options

  • None: consume-only, no output publish.

  • Sequence[KafkaPublishRequest]: explicit per-message publish plans.

  • Sequence[Any]: publish raw values to output_topic.

run(handler, *, output_topic=None, iterations=None)[source]

Run repeated batch cycles. Returns total consumed message count.

Return type:

int

Audit Logger

Immutable prompt/response audit logging over Kafka.

class ractogateway.kafka.audit.KafkaAuditLogger(producer, *, topic='ractogateway.audit')[source]

Bases: object

Asynchronous audit logger for regulated AI workloads.

Writes immutable prompt/response records to a dedicated Kafka topic so audit traces survive web-process crashes and can be replayed later.

property topic: str
log(*, user_id, model, prompt, response, request_id=None, metadata=None, wait=False)[source]

Publish one immutable audit record.

Parameters:

wait (bool) – If True, block until Kafka acknowledges the write. For high-throughput request paths, False is usually preferred.

Return type:

KafkaAuditEvent