Celery

RactoCeleryWorker integrates RactoGateway with Celery for durable background LLM tasks, automatic retries, and parallel batch inference.

Installation

pip install ractogateway[celery]

Setup

Define your worker in a shared tasks.py module imported by both client and worker processes:

# tasks.py
from celery import Celery
from ractogateway import openai_developer_kit as gpt
from ractogateway.celery import RactoCeleryWorker, RetryConfig
from ractogateway.prompts.engine import RactoPrompt

celery_app = Celery(
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)

my_prompt = RactoPrompt(
    system="You are a helpful assistant.",
    output_format="Return a concise answer.",
)

kit = gpt.OpenAIDeveloperKit(model="gpt-4o", default_prompt=my_prompt)

worker = RactoCeleryWorker(
    celery_app,
    kit=kit,
    retry_config=RetryConfig(max_retries=3, initial_delay_s=2.0),
)

Never-Fail Generation

Enqueue an LLM call that retries automatically with exponential backoff:

handle = worker.generate("Summarise the quarterly report: …")
result = worker.wait(handle.id, timeout_s=60.0)
print(result.result["content"])

Background Document Ingestion

Offload the full RAG chunk embed store pipeline to a worker node:

handle = worker.ingest_document("/path/to/report.pdf")
worker.wait(handle.id, timeout_s=120.0)

Parallel Batch Inference

Fan a list of items across the worker pool using Celery group():

items = ["Summarise doc 1", "Summarise doc 2", "Summarise doc 3"]
results = worker.parallel_batch(items)
for r in results:
    print(r.result["content"])

Starting Workers

celery -A tasks.celery_app worker --loglevel=info