Celery

Models

Pydantic models for the Celery task-queue subsystem.

class ractogateway.celery._models.TaskStatus(*values)[source]

Bases: str, Enum

Lifecycle state of a Celery task.

Maps directly to Celery’s native task states so you can compare them without importing Celery’s own constants.

PENDING = 'pending'
STARTED = 'started'
SUCCESS = 'success'
FAILURE = 'failure'
RETRY = 'retry'
REVOKED = 'revoked'
class ractogateway.celery._models.TaskResult(**data)[source]

Bases: BaseModel

Unified result returned by wait() and get_result().

Parameters:
  • task_id (str) – The Celery task UUID.

  • status (TaskStatus) – Current TaskStatus.

  • result (Any | None) –

    Deserialised task output on success:

    • For generate() — a dict matching LLMResponse.model_dump()`.

    • For ingest_document() — a list of Chunk.model_dump()` dicts.

  • error (str | None) – Exception message string on failure; None on success.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

task_id: str
status: TaskStatus
result: Any | None
error: str | None
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property ok: bool

True when the task succeeded and produced a result.

class ractogateway.celery._models.RetryConfig(**data)[source]

Bases: BaseModel

Exponential-backoff retry policy for RactoCeleryWorker tasks.

The backoff delay for attempt n (0-based) is:

delay = min(initial_delay_s * backoff_factor ** n, max_delay_s)

With the defaults this gives: 2 s → 4 s → 8 s (then stops at max_retries=3).

Parameters:
  • max_retries (int) – Maximum number of retry attempts after the first failure. 0 disables retries entirely.

  • initial_delay_s (float) – Countdown (seconds) before the first retry.

  • backoff_factor (float) – Multiplier applied on each successive retry. Must be > 1.

  • max_delay_s (float) – Upper bound on the countdown (seconds). Prevents extremely long waits after many retries.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_retries: int
initial_delay_s: float
backoff_factor: float
max_delay_s: float
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Worker

RactoCeleryWorker — Celery-backed async task queue for RactoGateway.

Three production patterns in one class:

  1. Never-Fail generationgenerate() enqueues an LLM call that automatically retries with exponential backoff on transient failures (timeouts, 5xx API errors). Auth errors and bad inputs are not retried.

  2. Background document ingestioningest_document() offloads the full chunk embed store pipeline to a worker node so your web server returns immediately with a task ID.

  3. Parallel batch inferenceparallel_batch() fans a list of items out to the worker pool using Celery group(), allowing N workers to process them concurrently.

How Celery serialization works here

Celery workers run in separate processes — live Python objects (kit, rag) cannot be sent over the message broker. Instead:

  • Task arguments are JSON-primitive: str, float, int, dict, list. The task reconstructs ChatConfig / Message objects inside the worker.

  • The kit and rag objects are captured by closure. For this to work in a distributed worker fleet, the same module that instantiates RactoCeleryWorker must be imported by the worker process — identical to the standard Flask-Celery / Django-Celery pattern.

Example

# tasks.py  ← imported by BOTH client and worker
from celery import Celery
from ractogateway import openai_developer_kit as gpt
from ractogateway.celery import RactoCeleryWorker, RetryConfig

celery_app = Celery(
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/0",
)
kit = gpt.Chat(model="gpt-4o", default_prompt=my_prompt)

worker = RactoCeleryWorker(
    celery_app,
    kit=kit,
    retry_config=RetryConfig(max_retries=3),
)

# Start workers:
#   celery -A tasks.celery_app worker --loglevel=info

# In your request handler:
handle = worker.generate("Summarise this report: …")
result  = worker.wait(handle.id, timeout_s=60.0)
print(result.result["content"])
class ractogateway.celery.worker.RactoCeleryWorker(app, *, kit, rag=None, retry_config=None)[source]

Bases: object

Celery-backed task queue wrapper for RactoGateway developer kits.

Parameters:
  • app (Any) – A pre-configured celery.Celery instance with broker and backend already set. You create and configure it yourself so you retain full control over serializers, routing, concurrency, etc.

  • kit (Any) – Any RactoGateway developer kit — OpenAIDeveloperKit, GoogleDeveloperKit, or AnthropicDeveloperKit. The kit’s default_prompt is used by generation tasks (prompts are not serialisable over the broker).

  • rag (Any | None) – Optional RactoRAG instance. Required only when calling ingest_document().

  • retry_config (RetryConfig | None) – Exponential-backoff configuration. Defaults are applied when None.

generate(user_message, *, temperature=0.0, max_tokens=4096, history=None, extra=None)[source]

Enqueue an LLM generation task and return immediately.

The task automatically retries with exponential backoff on transient errors (RactoGatewayTimeoutError and RactoGatewayAPIError).

Parameters:
  • user_message (str) – The prompt text for this generation.

  • temperature (float) – Sampling temperature passed to the LLM.

  • max_tokens (int) – Maximum completion tokens.

  • history (list[dict[str, str]] | None) – Previous conversation turns as [{"role": …, "content": …}, …]. Use this to continue a multi-turn dialogue asynchronously.

  • extra (dict[str, Any] | None) – Provider-specific pass-through parameters (top_p, seed, …).

Return type:

Any

Returns:

celery.result.AsyncResult – Call .id for the task UUID. Use wait() or get_result() to retrieve the outcome.

Note

The prompt is not an argument because Pydantic models containing type[BaseModel] fields cannot be serialised over the message broker. Set your prompt on the kit’s default_prompt at construction time.

ingest_document(path, **metadata)[source]

Enqueue a background RAG document-ingestion task.

The full read chunk process embed store pipeline runs in a Celery worker. Your web request returns immediately with an AsyncResult whose .id you can poll later.

Parameters:
  • path (str) – Absolute or relative path to the file to ingest. The string is passed as-is to ingest().

  • **metadata (Any) – Extra key-value metadata merged into every chunk’s ChunkMetadata.extra.

Return type:

Any

Returns:

celery.result.AsyncResult

Raises:

RuntimeError – If rag was not provided to RactoCeleryWorker.

parallel_batch(items, *, temperature=0.0, max_tokens=4096)[source]

Fan a list of items out to the worker pool in parallel.

Uses Celery group() so all tasks are submitted at once and run concurrently across available workers.

Parameters:
  • items (list[BatchItem]) – A list of BatchItem objects. Each item’s user_message becomes one generation task.

  • temperature (float) – Shared sampling temperature for all items.

  • max_tokens (int) – Shared max-tokens limit for all items.

Return type:

Any

Returns:

celery.result.GroupResult – Call wait_parallel() to block until all tasks finish, or iterate .results for individual AsyncResult objects.

get_result(task_id)[source]

Return the current state of a task without blocking.

Parameters:

task_id (str) – The UUID returned by generate(), ingest_document(), or the .id attribute of an AsyncResult.

Return type:

TaskResult

Returns:

TaskResultstatus will be PENDING if the task has not started yet.

wait(task_id, *, timeout_s=None)[source]

Block until a task completes (or times out) and return its result.

Parameters:
Return type:

TaskResult

Returns:

TaskResultresult.ok is True on success; result.error is set on failure or timeout.

wait_parallel(group_result, *, timeout_s=None)[source]

Block until all tasks from parallel_batch() complete.

Parameters:
  • group_result (Any) – The celery.result.GroupResult returned by parallel_batch().

  • timeout_s (float | None) – Maximum seconds to wait for the entire group. None = wait indefinitely.

Return type:

list[TaskResult]

Returns:

list[TaskResult] – One TaskResult per item, in submission order. Inspect each .ok / .error individually.