Celery
Models
Pydantic models for the Celery task-queue subsystem.
- class ractogateway.celery._models.TaskStatus(*values)[source]
-
Lifecycle state of a Celery task.
Maps directly to Celery’s native task states so you can compare them without importing Celery’s own constants.
- PENDING = 'pending'
- STARTED = 'started'
- SUCCESS = 'success'
- FAILURE = 'failure'
- RETRY = 'retry'
- REVOKED = 'revoked'
- class ractogateway.celery._models.TaskResult(**data)[source]
Bases:
BaseModelUnified result returned by
wait()andget_result().- Parameters:
task_id (str) – The Celery task UUID.
status (TaskStatus) – Current
TaskStatus.result (Any | None) –
Deserialised task output on success:
For
generate()— adictmatchingLLMResponse.model_dump()`.For
ingest_document()— alistofChunk.model_dump()` dicts.
error (str | None) – Exception message string on failure;
Noneon success.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- status: TaskStatus
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.celery._models.RetryConfig(**data)[source]
Bases:
BaseModelExponential-backoff retry policy for
RactoCeleryWorkertasks.The backoff delay for attempt n (0-based) is:
delay = min(initial_delay_s * backoff_factor ** n, max_delay_s)
With the defaults this gives: 2 s → 4 s → 8 s (then stops at max_retries=3).
- Parameters:
max_retries (int) – Maximum number of retry attempts after the first failure.
0disables retries entirely.initial_delay_s (float) – Countdown (seconds) before the first retry.
backoff_factor (float) – Multiplier applied on each successive retry. Must be > 1.
max_delay_s (float) – Upper bound on the countdown (seconds). Prevents extremely long waits after many retries.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Worker
RactoCeleryWorker — Celery-backed async task queue for RactoGateway.
Three production patterns in one class:
Never-Fail generation —
generate()enqueues an LLM call that automatically retries with exponential backoff on transient failures (timeouts, 5xx API errors). Auth errors and bad inputs are not retried.Background document ingestion —
ingest_document()offloads the fullchunk → embed → storepipeline to a worker node so your web server returns immediately with a task ID.Parallel batch inference —
parallel_batch()fans a list of items out to the worker pool using Celerygroup(), allowing N workers to process them concurrently.
How Celery serialization works here
Celery workers run in separate processes — live Python objects (kit, rag) cannot be sent over the message broker. Instead:
Task arguments are JSON-primitive:
str,float,int,dict,list. The task reconstructsChatConfig/Messageobjects inside the worker.The
kitandragobjects are captured by closure. For this to work in a distributed worker fleet, the same module that instantiatesRactoCeleryWorkermust be imported by the worker process — identical to the standard Flask-Celery / Django-Celery pattern.
Example
# tasks.py ← imported by BOTH client and worker
from celery import Celery
from ractogateway import openai_developer_kit as gpt
from ractogateway.celery import RactoCeleryWorker, RetryConfig
celery_app = Celery(
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/0",
)
kit = gpt.Chat(model="gpt-4o", default_prompt=my_prompt)
worker = RactoCeleryWorker(
celery_app,
kit=kit,
retry_config=RetryConfig(max_retries=3),
)
# Start workers:
# celery -A tasks.celery_app worker --loglevel=info
# In your request handler:
handle = worker.generate("Summarise this report: …")
result = worker.wait(handle.id, timeout_s=60.0)
print(result.result["content"])
- class ractogateway.celery.worker.RactoCeleryWorker(app, *, kit, rag=None, retry_config=None)[source]
Bases:
objectCelery-backed task queue wrapper for RactoGateway developer kits.
- Parameters:
app (
Any) – A pre-configuredcelery.Celeryinstance with broker and backend already set. You create and configure it yourself so you retain full control over serializers, routing, concurrency, etc.kit (
Any) – Any RactoGateway developer kit —OpenAIDeveloperKit,GoogleDeveloperKit, orAnthropicDeveloperKit. The kit’sdefault_promptis used by generation tasks (prompts are not serialisable over the broker).rag (
Any|None) – OptionalRactoRAGinstance. Required only when callingingest_document().retry_config (
RetryConfig|None) – Exponential-backoff configuration. Defaults are applied whenNone.
- generate(user_message, *, temperature=0.0, max_tokens=4096, history=None, extra=None)[source]
Enqueue an LLM generation task and return immediately.
The task automatically retries with exponential backoff on transient errors (
RactoGatewayTimeoutErrorandRactoGatewayAPIError).- Parameters:
user_message (
str) – The prompt text for this generation.temperature (
float) – Sampling temperature passed to the LLM.max_tokens (
int) – Maximum completion tokens.history (
list[dict[str,str]] |None) – Previous conversation turns as[{"role": …, "content": …}, …]. Use this to continue a multi-turn dialogue asynchronously.extra (
dict[str,Any] |None) – Provider-specific pass-through parameters (top_p,seed, …).
- Return type:
- Returns:
celery.result.AsyncResult – Call
.idfor the task UUID. Usewait()orget_result()to retrieve the outcome.
Note
The prompt is not an argument because Pydantic models containing
type[BaseModel]fields cannot be serialised over the message broker. Set your prompt on the kit’sdefault_promptat construction time.
- ingest_document(path, **metadata)[source]
Enqueue a background RAG document-ingestion task.
The full
read → chunk → process → embed → storepipeline runs in a Celery worker. Your web request returns immediately with anAsyncResultwhose.idyou can poll later.- Parameters:
- Return type:
- Returns:
celery.result.AsyncResult
- Raises:
RuntimeError – If
ragwas not provided toRactoCeleryWorker.
- parallel_batch(items, *, temperature=0.0, max_tokens=4096)[source]
Fan a list of items out to the worker pool in parallel.
Uses Celery
group()so all tasks are submitted at once and run concurrently across available workers.- Parameters:
- Return type:
- Returns:
celery.result.GroupResult – Call
wait_parallel()to block until all tasks finish, or iterate.resultsfor individualAsyncResultobjects.
- get_result(task_id)[source]
Return the current state of a task without blocking.
- Parameters:
task_id (
str) – The UUID returned bygenerate(),ingest_document(), or the.idattribute of anAsyncResult.- Return type:
- Returns:
TaskResult –
statuswill bePENDINGif the task has not started yet.
- wait(task_id, *, timeout_s=None)[source]
Block until a task completes (or times out) and return its result.
- Parameters:
task_id (
str) – The task UUID fromgenerate()oringest_document().timeout_s (
float|None) – Maximum seconds to wait.None= wait indefinitely.
- Return type:
- Returns:
TaskResult –
result.okisTrueon success;result.erroris set on failure or timeout.
- wait_parallel(group_result, *, timeout_s=None)[source]
Block until all tasks from
parallel_batch()complete.- Parameters:
group_result (
Any) – Thecelery.result.GroupResultreturned byparallel_batch().timeout_s (
float|None) – Maximum seconds to wait for the entire group.None= wait indefinitely.
- Return type:
- Returns:
list[TaskResult] – One
TaskResultper item, in submission order. Inspect each.ok/.errorindividually.