ractogateway.celery

Celery task-queue integration for RactoGateway.

Three production patterns exposed through a single RactoCeleryWorker:

  • Never-Fail generationgenerate() enqueues an LLM call that retries automatically with exponential backoff on transient failures (timeouts, 5xx API errors).

  • Background document ingestioningest_document() offloads the full chunk embed store RAG pipeline to a worker node so your web server returns a task ID immediately.

  • Parallel batch inferenceparallel_batch() fans a list of items across the worker pool using Celery group().

Quick start:

pip install ractogateway[celery]

# tasks.py  ← imported by BOTH client and worker
from celery import Celery
from ractogateway import openai_developer_kit as gpt
from ractogateway.celery import RactoCeleryWorker, RetryConfig

celery_app = Celery(
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)
kit = gpt.Chat(model="gpt-4o", default_prompt=my_prompt)

worker = RactoCeleryWorker(
    celery_app,
    kit=kit,
    retry_config=RetryConfig(max_retries=3, initial_delay_s=2.0),
)

# Fire-and-forget with automatic retry:
handle = worker.generate("Summarise the quarterly report: …")
result  = worker.wait(handle.id, timeout_s=60.0)
print(result.result["content"])

# Start your Celery workers:
#   celery -A tasks.celery_app worker --loglevel=info
class ractogateway.celery.RactoCeleryWorker(app, *, kit, rag=None, retry_config=None)[source]

Bases: object

Celery-backed task queue wrapper for RactoGateway developer kits.

Parameters:
  • app (Any) – A pre-configured celery.Celery instance with broker and backend already set. You create and configure it yourself so you retain full control over serializers, routing, concurrency, etc.

  • kit (Any) – Any RactoGateway developer kit — OpenAIDeveloperKit, GoogleDeveloperKit, or AnthropicDeveloperKit. The kit’s default_prompt is used by generation tasks (prompts are not serialisable over the broker).

  • rag (Any | None) – Optional RactoRAG instance. Required only when calling ingest_document().

  • retry_config (RetryConfig | None) – Exponential-backoff configuration. Defaults are applied when None.

generate(user_message, *, temperature=0.0, max_tokens=4096, history=None, extra=None)[source]

Enqueue an LLM generation task and return immediately.

The task automatically retries with exponential backoff on transient errors (RactoGatewayTimeoutError and RactoGatewayAPIError).

Parameters:
  • user_message (str) – The prompt text for this generation.

  • temperature (float) – Sampling temperature passed to the LLM.

  • max_tokens (int) – Maximum completion tokens.

  • history (list[dict[str, str]] | None) – Previous conversation turns as [{"role": …, "content": …}, …]. Use this to continue a multi-turn dialogue asynchronously.

  • extra (dict[str, Any] | None) – Provider-specific pass-through parameters (top_p, seed, …).

Return type:

Any

Returns:

celery.result.AsyncResult – Call .id for the task UUID. Use wait() or get_result() to retrieve the outcome.

Note

The prompt is not an argument because Pydantic models containing type[BaseModel] fields cannot be serialised over the message broker. Set your prompt on the kit’s default_prompt at construction time.

ingest_document(path, **metadata)[source]

Enqueue a background RAG document-ingestion task.

The full read chunk process embed store pipeline runs in a Celery worker. Your web request returns immediately with an AsyncResult whose .id you can poll later.

Parameters:
  • path (str) – Absolute or relative path to the file to ingest. The string is passed as-is to ingest().

  • **metadata (Any) – Extra key-value metadata merged into every chunk’s ChunkMetadata.extra.

Return type:

Any

Returns:

celery.result.AsyncResult

Raises:

RuntimeError – If rag was not provided to RactoCeleryWorker.

parallel_batch(items, *, temperature=0.0, max_tokens=4096)[source]

Fan a list of items out to the worker pool in parallel.

Uses Celery group() so all tasks are submitted at once and run concurrently across available workers.

Parameters:
  • items (list[BatchItem]) – A list of BatchItem objects. Each item’s user_message becomes one generation task.

  • temperature (float) – Shared sampling temperature for all items.

  • max_tokens (int) – Shared max-tokens limit for all items.

Return type:

Any

Returns:

celery.result.GroupResult – Call wait_parallel() to block until all tasks finish, or iterate .results for individual AsyncResult objects.

get_result(task_id)[source]

Return the current state of a task without blocking.

Parameters:

task_id (str) – The UUID returned by generate(), ingest_document(), or the .id attribute of an AsyncResult.

Return type:

TaskResult

Returns:

TaskResultstatus will be PENDING if the task has not started yet.

wait(task_id, *, timeout_s=None)[source]

Block until a task completes (or times out) and return its result.

Parameters:
  • task_id (str) – The task UUID from generate() or ingest_document().

  • timeout_s (float | None) – Maximum seconds to wait. None = wait indefinitely.

Return type:

TaskResult

Returns:

TaskResultresult.ok is True on success; result.error is set on failure or timeout.

wait_parallel(group_result, *, timeout_s=None)[source]

Block until all tasks from parallel_batch() complete.

Parameters:
  • group_result (Any) – The celery.result.GroupResult returned by parallel_batch().

  • timeout_s (float | None) – Maximum seconds to wait for the entire group. None = wait indefinitely.

Return type:

list[TaskResult]

Returns:

list[TaskResult] – One TaskResult per item, in submission order. Inspect each .ok / .error individually.

class ractogateway.celery.RetryConfig(**data)[source]

Bases: BaseModel

Exponential-backoff retry policy for RactoCeleryWorker tasks.

The backoff delay for attempt n (0-based) is:

delay = min(initial_delay_s * backoff_factor ** n, max_delay_s)

With the defaults this gives: 2 s → 4 s → 8 s (then stops at max_retries=3).

Parameters:
  • max_retries (int) – Maximum number of retry attempts after the first failure. 0 disables retries entirely.

  • initial_delay_s (float) – Countdown (seconds) before the first retry.

  • backoff_factor (float) – Multiplier applied on each successive retry. Must be > 1.

  • max_delay_s (float) – Upper bound on the countdown (seconds). Prevents extremely long waits after many retries.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_retries: int
initial_delay_s: float
backoff_factor: float
max_delay_s: float
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.celery.TaskResult(**data)[source]

Bases: BaseModel

Unified result returned by wait() and get_result().

Parameters:
  • task_id (str) – The Celery task UUID.

  • status (TaskStatus) – Current TaskStatus.

  • result (Any | None) –

    Deserialised task output on success:

    • For generate() — a dict matching LLMResponse.model_dump()`.

    • For ingest_document() — a list of Chunk.model_dump()` dicts.

  • error (str | None) – Exception message string on failure; None on success.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

task_id: str
status: TaskStatus
result: Any | None
error: str | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property ok: bool

True when the task succeeded and produced a result.

class ractogateway.celery.TaskStatus(*values)[source]

Bases: str, Enum

Lifecycle state of a Celery task.

Maps directly to Celery’s native task states so you can compare them without importing Celery’s own constants.

PENDING = 'pending'
STARTED = 'started'
SUCCESS = 'success'
FAILURE = 'failure'
RETRY = 'retry'
REVOKED = 'revoked'