Kafka
Models
Pydantic models for Kafka integration primitives.
- class ractogateway.kafka._models.KafkaProducerConfig(**data)[source]
Bases:
BaseModelConfiguration for
KafkaProducerClient.- Parameters:
bootstrap_servers (str | list[str]) – Kafka broker list (single host string or multiple hosts).
client_id (str) – Producer client ID reported to the broker.
acks (Literal['all', '0', '1'] | int) – Acknowledgement policy;
"all"is safest for durability.linger_ms (int) – Small batching delay to improve throughput.
compression_type (str | None) – Optional compression algorithm (e.g.
"gzip","snappy").retries (int) – Retry attempts for transient broker/network failures.
request_timeout_ms (int) – Broker request timeout.
security_protocol (str) – Kafka transport protocol (
PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL).sasl_mechanism (str | None) – SASL mechanism when using SASL security protocols.
sasl_plain_username (str | None) – Username for SASL PLAIN/SCRAM.
sasl_plain_password (str | None) – Password for SASL PLAIN/SCRAM.
extra (dict[str, Any]) – Additional kwargs forwarded directly to
kafka.KafkaProducer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka._models.KafkaConsumerConfig(**data)[source]
Bases:
BaseModelConfiguration for
KafkaConsumerClient.- Parameters:
group_id (str | None) – Consumer group ID.
Nonedisables group coordination.auto_offset_reset (Literal['earliest', 'latest', 'none']) – Initial offset policy when no committed offset is present.
enable_auto_commit (bool) – Whether Kafka should auto-commit offsets in the background.
max_poll_records (int) – Upper bound on records returned per poll call.
poll_timeout_ms (int) – Default poll timeout in milliseconds.
session_timeout_ms (int) – Group session timeout.
client_id (str) – Consumer client ID.
security_protocol (str) – Kafka transport protocol.
sasl_mechanism (str | None) – SASL mechanism when applicable.
sasl_plain_username (str | None) – Username for SASL auth.
sasl_plain_password (str | None) – Password for SASL auth.
extra (dict[str, Any]) – Additional kwargs forwarded to
kafka.KafkaConsumer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka._models.KafkaStreamConfig(**data)[source]
Bases:
BaseModelBatch processing configuration for
KafkaStreamProcessor.- Parameters:
batch_size (int) – Maximum messages to process per handler invocation.
max_wait_ms (int) – Max wall-clock wait to collect one batch.
poll_timeout_ms (int) – Per-poll timeout while collecting a batch.
commit_on_success (bool) – Commit offsets after successful handler + publish cycle.
wait_for_publish (bool) – If
True, block for broker acknowledgement on each produced output.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka._models.KafkaMessage(**data)[source]
Bases:
BaseModelNormalized Kafka record returned by
KafkaConsumerClient.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka._models.KafkaProduceResult(**data)[source]
Bases:
BaseModelBroker acknowledgement metadata returned by publish operations.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka._models.KafkaPublishRequest(**data)[source]
Bases:
BaseModelExplicit publish request used by stream routing handlers.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka._models.KafkaAuditEvent(**data)[source]
Bases:
BaseModelImmutable audit envelope for prompt/response traces.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Producer
Kafka producer wrapper for high-throughput event publishing.
- class ractogateway.kafka.producer.KafkaProducerClient(*, config=None, producer=None, key_serializer=None, value_serializer=None)[source]
Bases:
objectTyped facade over
kafka.KafkaProducer.- Parameters:
config (
KafkaProducerConfig|None) – Producer connection and reliability settings. Defaults are applied when omitted.producer (
Any|None) – Pre-built Kafka producer object. Useful for dependency injection in tests or when your runtime provides the producer lifecycle externally.key_serializer (
Callable[[Any],bytes] |None) – Optional override for key serialization.value_serializer (
Callable[[Any],bytes] |None) – Optional override for value serialization.
- publish(topic, value, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]
Publish one event and optionally wait for broker acknowledgement.
- Return type:
- publish_json(topic, payload, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]
Alias for
publish()when value is a JSON payload.- Return type:
Consumer
Kafka consumer wrapper that returns normalized typed records.
- class ractogateway.kafka.consumer.KafkaConsumerClient(*, config, consumer=None, key_deserializer=None, value_deserializer=None)[source]
Bases:
objectTyped facade over
kafka.KafkaConsumer.- Parameters:
config (
KafkaConsumerConfig) – Consumer connection and polling options.consumer (
Any|None) – Pre-built consumer object; useful for tests and dependency injection.key_deserializer (
Callable[[Any],Any] |None) – Optional override for key decoding.value_deserializer (
Callable[[Any],Any] |None) – Optional override for value decoding.
Stream Processor
Batched consume-process-publish utilities for real-time Kafka streams.
- class ractogateway.kafka.stream.KafkaStreamProcessor(*, consumer, producer=None, config=None)[source]
Bases:
objectReal-time stream loop helper for high-throughput event processing.
- Parameters:
consumer (
KafkaConsumerClient) – Source consumer used to read input events.producer (
KafkaProducerClient|None) – Optional producer used to publish handler outputs.config (
KafkaStreamConfig|None) – Batch and commit controls. Defaults are applied when omitted.
- process_once(handler, *, output_topic=None)[source]
Run one batch cycle and return the number of consumed messages.
- Return type:
Handler return options
None: consume-only, no output publish.Sequence[KafkaPublishRequest]: explicit per-message publish plans.Sequence[Any]: publish raw values tooutput_topic.
Audit Logger
Immutable prompt/response audit logging over Kafka.
- class ractogateway.kafka.audit.KafkaAuditLogger(producer, *, topic='ractogateway.audit')[source]
Bases:
objectAsynchronous audit logger for regulated AI workloads.
Writes immutable prompt/response records to a dedicated Kafka topic so audit traces survive web-process crashes and can be replayed later.