Batch Processing
Models
Shared data models for the batch-processing subsystem.
- class ractogateway.batch._models.BatchStatus(*values)[source]
-
Processing state of a batch job.
Maps to the union of OpenAI and Anthropic batch status strings.
- PENDING = 'pending'
- IN_PROGRESS = 'in_progress'
- FINALIZING = 'finalizing'
- COMPLETED = 'completed'
- FAILED = 'failed'
- EXPIRED = 'expired'
- CANCELLING = 'cancelling'
- CANCELLED = 'cancelled'
- class ractogateway.batch._models.BatchItem(**data)[source]
Bases:
BaseModelA single request within a batch job.
- Parameters:
custom_id (str) – User-supplied identifier used to correlate results. Must be unique within a batch.
user_message (str) – The end-user’s query string (equivalent to
ChatConfig.user_message).temperature (float) – Sampling temperature. Defaults to
0.0.max_tokens (int) – Maximum tokens for the completion. Defaults to
4096.extra (dict[str, Any]) – Provider-specific pass-through kwargs.
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.batch._models.BatchJobInfo(**data)[source]
Bases:
BaseModelMetadata about a submitted batch job.
Returned by
submit_batch()andpoll_status().Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- status: BatchStatus
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.batch._models.BatchResult(**data)[source]
Bases:
BaseModelThe outcome of a single
BatchItem.A result is always present in the
resultslist returned byget_results(); checkerrorto detect failures.Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- response: LLMResponse | None
- model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
OpenAI Batch Processor
OpenAI Batch API processor.
Submits large sets of chat-completion requests to OpenAI’s asynchronous Batch API, which processes them within 24 hours at ~50 % cost compared to the synchronous Chat Completions API.
Workflow:
upload JSONL file → create batch job → poll status
→ download results → parse into BatchResult list
Both synchronous and async variants are provided for every operation.
Usage:
from ractogateway import openai_developer_kit as gpt
from ractogateway.prompts.engine import RactoPrompt
prompt = RactoPrompt(role="assistant", aim="answer briefly",
constraints="", tone="", output_format="text")
processor = gpt.OpenAIBatchProcessor(model="gpt-4o-mini", default_prompt=prompt)
results = processor.submit_and_wait([
gpt.BatchItem(custom_id="q1", user_message="What is 2+2?"),
gpt.BatchItem(custom_id="q2", user_message="Capital of France?"),
])
for r in results:
print(r.custom_id, r.response.content if r.ok else r.error)
- class ractogateway.batch.openai_batch.OpenAIBatchProcessor(model='gpt-4o-mini', *, api_key=None, base_url=None, default_prompt=None)[source]
Bases:
objectSubmit thousands of chat-completion requests to OpenAI’s Batch API at ~50 % of standard API cost.
- Parameters:
model (
str) – Chat model to use for all items in a batch (e.g."gpt-4o-mini").api_key (
str|None) – OpenAI API key. Falls back toOPENAI_API_KEYenv var.base_url (
str|None) – Custom base URL (Azure OpenAI / proxy).default_prompt (
RactoPrompt|None) – RACTO prompt used as the system message for every batch item.
- submit_batch / asubmit_batch:
Upload JSONL and create batch job → returns
BatchJobInfo.
- poll_status / apoll_status:
Fetch current job state → returns updated
BatchJobInfo.
- get_results / aget_results:
Download and parse completed job results →
list[BatchResult].
- submit_and_wait / asubmit_and_wait:
Convenience: submit + poll until done + return results.
- submit_batch(items, *, prompt=None, completion_window='24h')[source]
Upload items as a JSONL file and create an OpenAI batch job.
Returns immediately with a
BatchJobInfo(status = IN_PROGRESS).- Return type:
- get_results(job_id)[source]
Download and parse results for a completed batch job.
- Raises:
RuntimeError – If the job is not yet completed.
- Return type:
- submit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]
Submit a batch and block until it completes, then return results.
- Parameters:
- Raises:
TimeoutError – If the batch does not complete within max_wait_s.
RuntimeError – If the batch job fails or is cancelled.
- Return type:
- async asubmit_batch(items, *, prompt=None, completion_window='24h')[source]
Async variant of
submit_batch().- Return type:
- async apoll_status(job_id)[source]
Async variant of
poll_status().- Return type:
- async aget_results(job_id)[source]
Async variant of
get_results().- Return type:
- async asubmit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]
Async variant of
submit_and_wait().- Return type:
Anthropic Batch Processor
Anthropic Message Batches API processor.
Submits large sets of Claude messages using Anthropic’s asynchronous Batch API, which processes them at ~50 % of standard API cost.
Workflow:
create batch job → poll until ended_at is set
→ stream results → parse into BatchResult list
Both synchronous and async variants are provided for every operation.
Usage:
from ractogateway import anthropic_developer_kit as claude
from ractogateway.prompts.engine import RactoPrompt
prompt = RactoPrompt(role="assistant", aim="answer briefly",
constraints="", tone="", output_format="text")
processor = claude.AnthropicBatchProcessor(
model="claude-haiku-4-5-20251001",
default_prompt=prompt,
)
results = processor.submit_and_wait([
claude.BatchItem(custom_id="q1", user_message="What is 2+2?"),
claude.BatchItem(custom_id="q2", user_message="Capital of France?"),
])
for r in results:
print(r.custom_id, r.response.content if r.ok else r.error)
- class ractogateway.batch.anthropic_batch.AnthropicBatchProcessor(model='claude-haiku-4-5-20251001', *, api_key=None, default_prompt=None)[source]
Bases:
objectSubmit thousands of Claude requests to Anthropic’s Message Batches API at ~50 % of standard API cost.
- Parameters:
model (
str) – Claude model for all items (e.g."claude-haiku-4-5-20251001").api_key (
str|None) – Anthropic API key. Falls back toANTHROPIC_API_KEYenv var.default_prompt (
RactoPrompt|None) – RACTO prompt used as thesystemmessage for every batch item.
- submit_batch / asubmit_batch:
Create a batch job → returns
BatchJobInfo.
- poll_status / apoll_status:
Fetch current job state → returns updated
BatchJobInfo.
- get_results / aget_results:
Stream and parse completed job results →
list[BatchResult].
- submit_and_wait / asubmit_and_wait:
Convenience: submit + poll until done + return results.
- submit_batch(items, *, prompt=None)[source]
Create an Anthropic Message Batch job.
Returns immediately with a
BatchJobInfo(status = IN_PROGRESS).- Return type:
- get_results(job_id)[source]
Stream and parse results for a completed batch job.
- Raises:
RuntimeError – If the job is not yet completed.
- Return type:
- submit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]
Submit a batch and block until it completes, then return results.
- Parameters:
- Return type:
- async asubmit_batch(items, *, prompt=None)[source]
Async variant of
submit_batch().- Return type:
- async apoll_status(job_id)[source]
Async variant of
poll_status().- Return type:
- async aget_results(job_id)[source]
Async variant of
get_results().- Return type:
- async asubmit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]
Async variant of
submit_and_wait().- Return type: