ractogateway.kafka.stream

Batched consume-process-publish utilities for real-time Kafka streams.

class ractogateway.kafka.stream.KafkaStreamProcessor(*, consumer, producer=None, config=None)[source]

Bases: object

Real-time stream loop helper for high-throughput event processing.

Parameters:
collect_batch()[source]

Collect up to batch_size messages bounded by max_wait_ms.

Return type:

list[KafkaMessage]

process_once(handler, *, output_topic=None)[source]

Run one batch cycle and return the number of consumed messages.

Return type:

int

Handler return options

  • None: consume-only, no output publish.

  • Sequence[KafkaPublishRequest]: explicit per-message publish plans.

  • Sequence[Any]: publish raw values to output_topic.

run(handler, *, output_topic=None, iterations=None)[source]

Run repeated batch cycles. Returns total consumed message count.

Return type:

int