ractogateway.celery.worker

RactoCeleryWorker — Celery-backed async task queue for RactoGateway.

Three production patterns in one class:

  1. Never-Fail generationgenerate() enqueues an LLM call that automatically retries with exponential backoff on transient failures (timeouts, 5xx API errors). Auth errors and bad inputs are not retried.

  2. Background document ingestioningest_document() offloads the full chunk embed store pipeline to a worker node so your web server returns immediately with a task ID.

  3. Parallel batch inferenceparallel_batch() fans a list of items out to the worker pool using Celery group(), allowing N workers to process them concurrently.

How Celery serialization works here

Celery workers run in separate processes — live Python objects (kit, rag) cannot be sent over the message broker. Instead:

  • Task arguments are JSON-primitive: str, float, int, dict, list. The task reconstructs ChatConfig / Message objects inside the worker.

  • The kit and rag objects are captured by closure. For this to work in a distributed worker fleet, the same module that instantiates RactoCeleryWorker must be imported by the worker process — identical to the standard Flask-Celery / Django-Celery pattern.

Example

# tasks.py  ← imported by BOTH client and worker
from celery import Celery
from ractogateway import openai_developer_kit as gpt
from ractogateway.celery import RactoCeleryWorker, RetryConfig

celery_app = Celery(
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)
kit = gpt.Chat(model="gpt-4o", default_prompt=my_prompt)

worker = RactoCeleryWorker(
    celery_app,
    kit=kit,
    retry_config=RetryConfig(max_retries=3),
)

# Start workers:
#   celery -A tasks.celery_app worker --loglevel=info

# In your request handler:
handle = worker.generate("Summarise this report: …")
result  = worker.wait(handle.id, timeout_s=60.0)
print(result.result["content"])
class ractogateway.celery.worker.RactoCeleryWorker(app, *, kit, rag=None, retry_config=None)[source]

Bases: object

Celery-backed task queue wrapper for RactoGateway developer kits.

Parameters:
  • app (Any) – A pre-configured celery.Celery instance with broker and backend already set. You create and configure it yourself so you retain full control over serializers, routing, concurrency, etc.

  • kit (Any) – Any RactoGateway developer kit — OpenAIDeveloperKit, GoogleDeveloperKit, or AnthropicDeveloperKit. The kit’s default_prompt is used by generation tasks (prompts are not serialisable over the broker).

  • rag (Any | None) – Optional RactoRAG instance. Required only when calling ingest_document().

  • retry_config (RetryConfig | None) – Exponential-backoff configuration. Defaults are applied when None.

generate(user_message, *, temperature=0.0, max_tokens=4096, history=None, extra=None)[source]

Enqueue an LLM generation task and return immediately.

The task automatically retries with exponential backoff on transient errors (RactoGatewayTimeoutError and RactoGatewayAPIError).

Parameters:
  • user_message (str) – The prompt text for this generation.

  • temperature (float) – Sampling temperature passed to the LLM.

  • max_tokens (int) – Maximum completion tokens.

  • history (list[dict[str, str]] | None) – Previous conversation turns as [{"role": …, "content": …}, …]. Use this to continue a multi-turn dialogue asynchronously.

  • extra (dict[str, Any] | None) – Provider-specific pass-through parameters (top_p, seed, …).

Return type:

Any

Returns:

celery.result.AsyncResult – Call .id for the task UUID. Use wait() or get_result() to retrieve the outcome.

Note

The prompt is not an argument because Pydantic models containing type[BaseModel] fields cannot be serialised over the message broker. Set your prompt on the kit’s default_prompt at construction time.

ingest_document(path, **metadata)[source]

Enqueue a background RAG document-ingestion task.

The full read chunk process embed store pipeline runs in a Celery worker. Your web request returns immediately with an AsyncResult whose .id you can poll later.

Parameters:
  • path (str) – Absolute or relative path to the file to ingest. The string is passed as-is to ingest().

  • **metadata (Any) – Extra key-value metadata merged into every chunk’s ChunkMetadata.extra.

Return type:

Any

Returns:

celery.result.AsyncResult

Raises:

RuntimeError – If rag was not provided to RactoCeleryWorker.

parallel_batch(items, *, temperature=0.0, max_tokens=4096)[source]

Fan a list of items out to the worker pool in parallel.

Uses Celery group() so all tasks are submitted at once and run concurrently across available workers.

Parameters:
  • items (list[BatchItem]) – A list of BatchItem objects. Each item’s user_message becomes one generation task.

  • temperature (float) – Shared sampling temperature for all items.

  • max_tokens (int) – Shared max-tokens limit for all items.

Return type:

Any

Returns:

celery.result.GroupResult – Call wait_parallel() to block until all tasks finish, or iterate .results for individual AsyncResult objects.

get_result(task_id)[source]

Return the current state of a task without blocking.

Parameters:

task_id (str) – The UUID returned by generate(), ingest_document(), or the .id attribute of an AsyncResult.

Return type:

TaskResult

Returns:

TaskResultstatus will be PENDING if the task has not started yet.

wait(task_id, *, timeout_s=None)[source]

Block until a task completes (or times out) and return its result.

Parameters:
Return type:

TaskResult

Returns:

TaskResultresult.ok is True on success; result.error is set on failure or timeout.

wait_parallel(group_result, *, timeout_s=None)[source]

Block until all tasks from parallel_batch() complete.

Parameters:
  • group_result (Any) – The celery.result.GroupResult returned by parallel_batch().

  • timeout_s (float | None) – Maximum seconds to wait for the entire group. None = wait indefinitely.

Return type:

list[TaskResult]

Returns:

list[TaskResult] – One TaskResult per item, in submission order. Inspect each .ok / .error individually.