ractogateway.batch

Batch-processing subsystem for RactoGateway.

Submit thousands of non-urgent LLM requests through provider Batch APIs at ~50 % lower cost than synchronous calls. Supported providers:

  • OpenAIOpenAIBatchProcessor (uses Files API + Batch API)

  • AnthropicAnthropicBatchProcessor (uses Message Batches API)

Quick start (OpenAI):

from ractogateway import openai_developer_kit as gpt

processor = gpt.OpenAIBatchProcessor(model="gpt-4o-mini", default_prompt=my_prompt)
results = processor.submit_and_wait([
    gpt.BatchItem(custom_id="q1", user_message="Summarise this text: …"),
    gpt.BatchItem(custom_id="q2", user_message="Classify the sentiment: …"),
])

for r in results:
    if r.ok:
        print(r.custom_id, r.response.content)

Quick start (Anthropic):

from ractogateway import anthropic_developer_kit as claude

processor = claude.AnthropicBatchProcessor(
    model="claude-haiku-4-5-20251001", default_prompt=my_prompt
)
results = processor.submit_and_wait(items)
class ractogateway.batch.AnthropicBatchProcessor(model='claude-haiku-4-5-20251001', *, api_key=None, default_prompt=None)[source]

Bases: object

Submit thousands of Claude requests to Anthropic’s Message Batches API at ~50 % of standard API cost.

Parameters:
  • model (str) – Claude model for all items (e.g. "claude-haiku-4-5-20251001").

  • api_key (str | None) – Anthropic API key. Falls back to ANTHROPIC_API_KEY env var.

  • default_prompt (RactoPrompt | None) – RACTO prompt used as the system message for every batch item.

submit_batch / asubmit_batch:

Create a batch job → returns BatchJobInfo.

poll_status / apoll_status:

Fetch current job state → returns updated BatchJobInfo.

get_results / aget_results:

Stream and parse completed job results → list[BatchResult].

submit_and_wait / asubmit_and_wait:

Convenience: submit + poll until done + return results.

provider: str = 'anthropic'
submit_batch(items, *, prompt=None)[source]

Create an Anthropic Message Batch job.

Returns immediately with a BatchJobInfo (status = IN_PROGRESS).

Return type:

BatchJobInfo

poll_status(job_id)[source]

Fetch the current status of batch job job_id.

Return type:

BatchJobInfo

get_results(job_id)[source]

Stream and parse results for a completed batch job.

Raises:

RuntimeError – If the job is not yet completed.

Return type:

list[BatchResult]

submit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]

Submit a batch and block until it completes, then return results.

Parameters:
  • poll_interval_s (float) – Seconds between status-poll API calls. Default 60.0.

  • max_wait_s (float) – Maximum total wait. Default 86400 (24 h).

Return type:

list[BatchResult]

async asubmit_batch(items, *, prompt=None)[source]

Async variant of submit_batch().

Return type:

BatchJobInfo

async apoll_status(job_id)[source]

Async variant of poll_status().

Return type:

BatchJobInfo

async aget_results(job_id)[source]

Async variant of get_results().

Return type:

list[BatchResult]

async asubmit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]

Async variant of submit_and_wait().

Return type:

list[BatchResult]

class ractogateway.batch.BatchItem(**data)[source]

Bases: BaseModel

A single request within a batch job.

Parameters:
  • custom_id (str) – User-supplied identifier used to correlate results. Must be unique within a batch.

  • user_message (str) – The end-user’s query string (equivalent to ChatConfig.user_message).

  • temperature (float) – Sampling temperature. Defaults to 0.0.

  • max_tokens (int) – Maximum tokens for the completion. Defaults to 4096.

  • extra (dict[str, Any]) – Provider-specific pass-through kwargs.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

custom_id: str
user_message: str
temperature: float
max_tokens: int
extra: dict[str, Any]
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.batch.BatchJobInfo(**data)[source]

Bases: BaseModel

Metadata about a submitted batch job.

Returned by submit_batch() and poll_status().

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

job_id: str
provider: str
status: BatchStatus
created_at: float
request_count: int
raw: Any
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.batch.BatchResult(**data)[source]

Bases: BaseModel

The outcome of a single BatchItem.

A result is always present in the results list returned by get_results(); check error to detect failures.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

custom_id: str
response: LLMResponse | None
error: str | None
raw: Any
model_config: ClassVar[ConfigDict] = {'arbitrary_types_allowed': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

property ok: bool

True when the request succeeded (no error, response present).

class ractogateway.batch.BatchStatus(*values)[source]

Bases: str, Enum

Processing state of a batch job.

Maps to the union of OpenAI and Anthropic batch status strings.

PENDING = 'pending'
IN_PROGRESS = 'in_progress'
FINALIZING = 'finalizing'
COMPLETED = 'completed'
FAILED = 'failed'
EXPIRED = 'expired'
CANCELLING = 'cancelling'
CANCELLED = 'cancelled'
class ractogateway.batch.OpenAIBatchProcessor(model='gpt-4o-mini', *, api_key=None, base_url=None, default_prompt=None)[source]

Bases: object

Submit thousands of chat-completion requests to OpenAI’s Batch API at ~50 % of standard API cost.

Parameters:
  • model (str) – Chat model to use for all items in a batch (e.g. "gpt-4o-mini").

  • api_key (str | None) – OpenAI API key. Falls back to OPENAI_API_KEY env var.

  • base_url (str | None) – Custom base URL (Azure OpenAI / proxy).

  • default_prompt (RactoPrompt | None) – RACTO prompt used as the system message for every batch item.

submit_batch / asubmit_batch:

Upload JSONL and create batch job → returns BatchJobInfo.

poll_status / apoll_status:

Fetch current job state → returns updated BatchJobInfo.

get_results / aget_results:

Download and parse completed job results → list[BatchResult].

submit_and_wait / asubmit_and_wait:

Convenience: submit + poll until done + return results.

provider: str = 'openai'
submit_batch(items, *, prompt=None, completion_window='24h')[source]

Upload items as a JSONL file and create an OpenAI batch job.

Returns immediately with a BatchJobInfo (status = IN_PROGRESS).

Return type:

BatchJobInfo

poll_status(job_id)[source]

Fetch the current status of batch job job_id.

Return type:

BatchJobInfo

get_results(job_id)[source]

Download and parse results for a completed batch job.

Raises:

RuntimeError – If the job is not yet completed.

Return type:

list[BatchResult]

submit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]

Submit a batch and block until it completes, then return results.

Parameters:
  • poll_interval_s (float) – Seconds between status-poll API calls. Default 60.0.

  • max_wait_s (float) – Maximum total seconds to wait. Default 86400 (24 h).

Raises:
  • TimeoutError – If the batch does not complete within max_wait_s.

  • RuntimeError – If the batch job fails or is cancelled.

Return type:

list[BatchResult]

async asubmit_batch(items, *, prompt=None, completion_window='24h')[source]

Async variant of submit_batch().

Return type:

BatchJobInfo

async apoll_status(job_id)[source]

Async variant of poll_status().

Return type:

BatchJobInfo

async aget_results(job_id)[source]

Async variant of get_results().

Return type:

list[BatchResult]

async asubmit_and_wait(items, *, prompt=None, completion_window='24h', poll_interval_s=60.0, max_wait_s=86400.0)[source]

Async variant of submit_and_wait().

Return type:

list[BatchResult]