Kafka

The kafka module provides typed wrappers for durable event publishing, consuming, stream processing, and immutable audit trails.

Installation

pip install ractogateway[kafka]

Publishing Events

from ractogateway.kafka import KafkaProducerClient, KafkaProducerConfig

producer = KafkaProducerClient(
    config=KafkaProducerConfig(bootstrap_servers="localhost:9092")
)

producer.publish("agent.events", {"agent": "research", "status": "ready"})

Consuming Events

from ractogateway.kafka import KafkaConsumerClient, KafkaConsumerConfig

consumer = KafkaConsumerClient(
    config=KafkaConsumerConfig(
        topics=["agent.events"],
        bootstrap_servers="localhost:9092",
        group_id="writer-agent",
    )
)

messages = consumer.poll(timeout_ms=1000)
for msg in messages:
    print(msg.key, msg.value)

Stream Processing

KafkaStreamProcessor consumes from one topic, transforms each message, and publishes to another:

from ractogateway.kafka import KafkaStreamProcessor, KafkaStreamConfig

def transform(msg):
    # Call your LLM kit here, return enriched payload
    return {"summary": "...", "original": msg.value}

processor = KafkaStreamProcessor(
    config=KafkaStreamConfig(
        input_topic="raw.documents",
        output_topic="processed.summaries",
        bootstrap_servers="localhost:9092",
        group_id="summary-processor",
    ),
    transform_fn=transform,
)

processor.run(batch_size=10)

Audit Logging

KafkaAuditLogger writes immutable prompt/response records to a dedicated audit topic:

from ractogateway.kafka import KafkaAuditLogger

audit = KafkaAuditLogger(producer, topic="ai.audit")

audit.log(
    user_id="u-42",
    model="gpt-4o",
    prompt="Summarize incident report",
    response="Summary …",
)

Audit events are stored as KafkaAuditEvent Pydantic models with timestamps, making them easy to query and replay.