ractogateway.kafka
Kafka streaming integration for RactoGateway.
Four production patterns exposed through typed wrappers:
KafkaProducerClientfor durable event publishing.KafkaConsumerClientfor typed polling.KafkaStreamProcessorfor batched consume/transform/publish loops.KafkaAuditLoggerfor immutable prompt/response audit trails.
Quick start:
pip install ractogateway[kafka]
from ractogateway.kafka import (
KafkaProducerClient,
KafkaConsumerClient,
KafkaProducerConfig,
KafkaConsumerConfig,
KafkaStreamProcessor,
KafkaAuditLogger,
)
producer = KafkaProducerClient(
config=KafkaProducerConfig(bootstrap_servers="localhost:9092")
)
producer.publish("agent.events", {"agent": "research", "status": "ready"})
consumer = KafkaConsumerClient(
config=KafkaConsumerConfig(
topics=["agent.events"],
bootstrap_servers="localhost:9092",
group_id="writer-agent",
)
)
messages = consumer.poll(timeout_ms=1000)
audit = KafkaAuditLogger(producer, topic="ai.audit")
audit.log(
user_id="u-42",
model="gpt-4o",
prompt="Summarize incident report",
response="Summary ...",
)
- class ractogateway.kafka.KafkaAuditEvent(**data)[source]
Bases:
BaseModelImmutable audit envelope for prompt/response traces.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- user_id: str
- model: str
- prompt: str
- response: str
- timestamp_utc: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka.KafkaAuditLogger(producer, *, topic='ractogateway.audit')[source]
Bases:
objectAsynchronous audit logger for regulated AI workloads.
Writes immutable prompt/response records to a dedicated Kafka topic so audit traces survive web-process crashes and can be replayed later.
- property topic: str
- class ractogateway.kafka.KafkaConsumerClient(*, config, consumer=None, key_deserializer=None, value_deserializer=None)[source]
Bases:
objectTyped facade over
kafka.KafkaConsumer.- Parameters:
config (
KafkaConsumerConfig) – Consumer connection and polling options.consumer (
Any|None) – Pre-built consumer object; useful for tests and dependency injection.key_deserializer (
Callable[[Any],Any] |None) – Optional override for key decoding.value_deserializer (
Callable[[Any],Any] |None) – Optional override for value decoding.
- poll(*, timeout_ms=None, max_records=None)[source]
Poll records and return normalized
KafkaMessageentries.- Return type:
- class ractogateway.kafka.KafkaConsumerConfig(**data)[source]
Bases:
BaseModelConfiguration for
KafkaConsumerClient.- Parameters:
group_id (str | None) – Consumer group ID.
Nonedisables group coordination.auto_offset_reset (Literal['earliest', 'latest', 'none']) – Initial offset policy when no committed offset is present.
enable_auto_commit (bool) – Whether Kafka should auto-commit offsets in the background.
max_poll_records (int) – Upper bound on records returned per poll call.
poll_timeout_ms (int) – Default poll timeout in milliseconds.
session_timeout_ms (int) – Group session timeout.
client_id (str) – Consumer client ID.
security_protocol (str) – Kafka transport protocol.
sasl_mechanism (str | None) – SASL mechanism when applicable.
sasl_plain_username (str | None) – Username for SASL auth.
sasl_plain_password (str | None) – Password for SASL auth.
extra (dict[str, Any]) – Additional kwargs forwarded to
kafka.KafkaConsumer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- auto_offset_reset: Literal['earliest', 'latest', 'none']
- enable_auto_commit: bool
- max_poll_records: int
- poll_timeout_ms: int
- session_timeout_ms: int
- client_id: str
- security_protocol: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka.KafkaMessage(**data)[source]
Bases:
BaseModelNormalized Kafka record returned by
KafkaConsumerClient.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- topic: str
- partition: int
- offset: int
- value: Any
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka.KafkaProduceResult(**data)[source]
Bases:
BaseModelBroker acknowledgement metadata returned by publish operations.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- topic: str
- partition: int
- offset: int
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka.KafkaProducerClient(*, config=None, producer=None, key_serializer=None, value_serializer=None)[source]
Bases:
objectTyped facade over
kafka.KafkaProducer.- Parameters:
config (
KafkaProducerConfig|None) – Producer connection and reliability settings. Defaults are applied when omitted.producer (
Any|None) – Pre-built Kafka producer object. Useful for dependency injection in tests or when your runtime provides the producer lifecycle externally.key_serializer (
Callable[[Any],bytes] |None) – Optional override for key serialization.value_serializer (
Callable[[Any],bytes] |None) – Optional override for value serialization.
- publish(topic, value, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]
Publish one event and optionally wait for broker acknowledgement.
- Return type:
- publish_json(topic, payload, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]
Alias for
publish()when value is a JSON payload.- Return type:
- class ractogateway.kafka.KafkaProducerConfig(**data)[source]
Bases:
BaseModelConfiguration for
KafkaProducerClient.- Parameters:
bootstrap_servers (str | list[str]) – Kafka broker list (single host string or multiple hosts).
client_id (str) – Producer client ID reported to the broker.
acks (Literal['all', '0', '1'] | int) – Acknowledgement policy;
"all"is safest for durability.linger_ms (int) – Small batching delay to improve throughput.
compression_type (str | None) – Optional compression algorithm (e.g.
"gzip","snappy").retries (int) – Retry attempts for transient broker/network failures.
request_timeout_ms (int) – Broker request timeout.
security_protocol (str) – Kafka transport protocol (
PLAINTEXT,SSL,SASL_PLAINTEXT,SASL_SSL).sasl_mechanism (str | None) – SASL mechanism when using SASL security protocols.
sasl_plain_username (str | None) – Username for SASL PLAIN/SCRAM.
sasl_plain_password (str | None) – Password for SASL PLAIN/SCRAM.
extra (dict[str, Any]) – Additional kwargs forwarded directly to
kafka.KafkaProducer.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- client_id: str
- linger_ms: int
- retries: int
- request_timeout_ms: int
- security_protocol: str
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka.KafkaPublishRequest(**data)[source]
Bases:
BaseModelExplicit publish request used by stream routing handlers.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- topic: str
- value: Any
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka.KafkaStreamConfig(**data)[source]
Bases:
BaseModelBatch processing configuration for
KafkaStreamProcessor.- Parameters:
batch_size (int) – Maximum messages to process per handler invocation.
max_wait_ms (int) – Max wall-clock wait to collect one batch.
poll_timeout_ms (int) – Per-poll timeout while collecting a batch.
commit_on_success (bool) – Commit offsets after successful handler + publish cycle.
wait_for_publish (bool) – If
True, block for broker acknowledgement on each produced output.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- batch_size: int
- max_wait_ms: int
- poll_timeout_ms: int
- commit_on_success: bool
- wait_for_publish: bool
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.kafka.KafkaStreamProcessor(*, consumer, producer=None, config=None)[source]
Bases:
objectReal-time stream loop helper for high-throughput event processing.
- Parameters:
consumer (
KafkaConsumerClient) – Source consumer used to read input events.producer (
KafkaProducerClient|None) – Optional producer used to publish handler outputs.config (
KafkaStreamConfig|None) – Batch and commit controls. Defaults are applied when omitted.
- collect_batch()[source]
Collect up to
batch_sizemessages bounded bymax_wait_ms.- Return type:
- process_once(handler, *, output_topic=None)[source]
Run one batch cycle and return the number of consumed messages.
- Return type:
Handler return options
None: consume-only, no output publish.Sequence[KafkaPublishRequest]: explicit per-message publish plans.Sequence[Any]: publish raw values tooutput_topic.