ractogateway.kafka

Kafka streaming integration for RactoGateway.

Four production patterns exposed through typed wrappers:

  • KafkaProducerClient for durable event publishing.

  • KafkaConsumerClient for typed polling.

  • KafkaStreamProcessor for batched consume/transform/publish loops.

  • KafkaAuditLogger for immutable prompt/response audit trails.

Quick start:

pip install ractogateway[kafka]

from ractogateway.kafka import (
    KafkaProducerClient,
    KafkaConsumerClient,
    KafkaProducerConfig,
    KafkaConsumerConfig,
    KafkaStreamProcessor,
    KafkaAuditLogger,
)

producer = KafkaProducerClient(
    config=KafkaProducerConfig(bootstrap_servers="localhost:9092")
)
producer.publish("agent.events", {"agent": "research", "status": "ready"})

consumer = KafkaConsumerClient(
    config=KafkaConsumerConfig(
        topics=["agent.events"],
        bootstrap_servers="localhost:9092",
        group_id="writer-agent",
    )
)
messages = consumer.poll(timeout_ms=1000)

audit = KafkaAuditLogger(producer, topic="ai.audit")
audit.log(
    user_id="u-42",
    model="gpt-4o",
    prompt="Summarize incident report",
    response="Summary ...",
)
class ractogateway.kafka.KafkaAuditEvent(**data)[source]

Bases: BaseModel

Immutable audit envelope for prompt/response traces.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

user_id: str
model: str
prompt: str
response: str
request_id: str | None
metadata: dict[str, Any]
timestamp_utc: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka.KafkaAuditLogger(producer, *, topic='ractogateway.audit')[source]

Bases: object

Asynchronous audit logger for regulated AI workloads.

Writes immutable prompt/response records to a dedicated Kafka topic so audit traces survive web-process crashes and can be replayed later.

property topic: str
log(*, user_id, model, prompt, response, request_id=None, metadata=None, wait=False)[source]

Publish one immutable audit record.

Parameters:

wait (bool) – If True, block until Kafka acknowledges the write. For high-throughput request paths, False is usually preferred.

Return type:

KafkaAuditEvent

class ractogateway.kafka.KafkaConsumerClient(*, config, consumer=None, key_deserializer=None, value_deserializer=None)[source]

Bases: object

Typed facade over kafka.KafkaConsumer.

Parameters:
  • config (KafkaConsumerConfig) – Consumer connection and polling options.

  • consumer (Any | None) – Pre-built consumer object; useful for tests and dependency injection.

  • key_deserializer (Callable[[Any], Any] | None) – Optional override for key decoding.

  • value_deserializer (Callable[[Any], Any] | None) – Optional override for value decoding.

poll(*, timeout_ms=None, max_records=None)[source]

Poll records and return normalized KafkaMessage entries.

Return type:

list[KafkaMessage]

commit()[source]

Commit currently consumed offsets.

Return type:

None

close()[source]

Close underlying consumer and release resources.

Return type:

None

class ractogateway.kafka.KafkaConsumerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaConsumerClient.

Parameters:
  • topics (list[str]) – Topic list to subscribe on startup.

  • bootstrap_servers (str | list[str]) – Kafka broker list.

  • group_id (str | None) – Consumer group ID. None disables group coordination.

  • auto_offset_reset (Literal['earliest', 'latest', 'none']) – Initial offset policy when no committed offset is present.

  • enable_auto_commit (bool) – Whether Kafka should auto-commit offsets in the background.

  • max_poll_records (int) – Upper bound on records returned per poll call.

  • poll_timeout_ms (int) – Default poll timeout in milliseconds.

  • session_timeout_ms (int) – Group session timeout.

  • client_id (str) – Consumer client ID.

  • security_protocol (str) – Kafka transport protocol.

  • sasl_mechanism (str | None) – SASL mechanism when applicable.

  • sasl_plain_username (str | None) – Username for SASL auth.

  • sasl_plain_password (str | None) – Password for SASL auth.

  • extra (dict[str, Any]) – Additional kwargs forwarded to kafka.KafkaConsumer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topics: list[str]
bootstrap_servers: str | list[str]
group_id: str | None
auto_offset_reset: Literal['earliest', 'latest', 'none']
enable_auto_commit: bool
max_poll_records: int
poll_timeout_ms: int
session_timeout_ms: int
client_id: str
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka.KafkaMessage(**data)[source]

Bases: BaseModel

Normalized Kafka record returned by KafkaConsumerClient.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
key: str | bytes | None
value: Any
headers: dict[str, bytes | None]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka.KafkaProduceResult(**data)[source]

Bases: BaseModel

Broker acknowledgement metadata returned by publish operations.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
partition: int
offset: int
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka.KafkaProducerClient(*, config=None, producer=None, key_serializer=None, value_serializer=None)[source]

Bases: object

Typed facade over kafka.KafkaProducer.

Parameters:
  • config (KafkaProducerConfig | None) – Producer connection and reliability settings. Defaults are applied when omitted.

  • producer (Any | None) – Pre-built Kafka producer object. Useful for dependency injection in tests or when your runtime provides the producer lifecycle externally.

  • key_serializer (Callable[[Any], bytes] | None) – Optional override for key serialization.

  • value_serializer (Callable[[Any], bytes] | None) – Optional override for value serialization.

publish(topic, value, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]

Publish one event and optionally wait for broker acknowledgement.

Return type:

KafkaProduceResult | None

publish_json(topic, payload, *, key=None, headers=None, partition=None, timestamp_ms=None, wait=True, timeout_s=10.0)[source]

Alias for publish() when value is a JSON payload.

Return type:

KafkaProduceResult | None

flush(*, timeout_s=None)[source]

Force all buffered events to be sent to brokers.

Return type:

None

close(*, timeout_s=5.0)[source]

Close underlying producer and release network resources.

Return type:

None

class ractogateway.kafka.KafkaProducerConfig(**data)[source]

Bases: BaseModel

Configuration for KafkaProducerClient.

Parameters:
  • bootstrap_servers (str | list[str]) – Kafka broker list (single host string or multiple hosts).

  • client_id (str) – Producer client ID reported to the broker.

  • acks (Literal['all', '0', '1'] | int) – Acknowledgement policy; "all" is safest for durability.

  • linger_ms (int) – Small batching delay to improve throughput.

  • compression_type (str | None) – Optional compression algorithm (e.g. "gzip", "snappy").

  • retries (int) – Retry attempts for transient broker/network failures.

  • request_timeout_ms (int) – Broker request timeout.

  • security_protocol (str) – Kafka transport protocol (PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL).

  • sasl_mechanism (str | None) – SASL mechanism when using SASL security protocols.

  • sasl_plain_username (str | None) – Username for SASL PLAIN/SCRAM.

  • sasl_plain_password (str | None) – Password for SASL PLAIN/SCRAM.

  • extra (dict[str, Any]) – Additional kwargs forwarded directly to kafka.KafkaProducer.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

bootstrap_servers: str | list[str]
client_id: str
acks: Literal['all', '0', '1'] | int
linger_ms: int
compression_type: str | None
retries: int
request_timeout_ms: int
security_protocol: str
sasl_mechanism: str | None
sasl_plain_username: str | None
sasl_plain_password: str | None
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka.KafkaPublishRequest(**data)[source]

Bases: BaseModel

Explicit publish request used by stream routing handlers.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

topic: str
value: Any
key: str | bytes | None
headers: dict[str, str | bytes | None] | None
partition: int | None
timestamp_ms: int | None
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka.KafkaStreamConfig(**data)[source]

Bases: BaseModel

Batch processing configuration for KafkaStreamProcessor.

Parameters:
  • batch_size (int) – Maximum messages to process per handler invocation.

  • max_wait_ms (int) – Max wall-clock wait to collect one batch.

  • poll_timeout_ms (int) – Per-poll timeout while collecting a batch.

  • commit_on_success (bool) – Commit offsets after successful handler + publish cycle.

  • wait_for_publish (bool) – If True, block for broker acknowledgement on each produced output.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

batch_size: int
max_wait_ms: int
poll_timeout_ms: int
commit_on_success: bool
wait_for_publish: bool
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.kafka.KafkaStreamProcessor(*, consumer, producer=None, config=None)[source]

Bases: object

Real-time stream loop helper for high-throughput event processing.

Parameters:
collect_batch()[source]

Collect up to batch_size messages bounded by max_wait_ms.

Return type:

list[KafkaMessage]

process_once(handler, *, output_topic=None)[source]

Run one batch cycle and return the number of consumed messages.

Return type:

int

Handler return options

  • None: consume-only, no output publish.

  • Sequence[KafkaPublishRequest]: explicit per-message publish plans.

  • Sequence[Any]: publish raw values to output_topic.

run(handler, *, output_topic=None, iterations=None)[source]

Run repeated batch cycles. Returns total consumed message count.

Return type:

int