ractogateway.batch.anthropic_batch

Anthropic Message Batches API processor.

Submits large sets of Claude messages using Anthropic’s asynchronous Batch API, which processes them at ~50 % of standard API cost.

Workflow:

create batch job  →  poll until ended_at is set
→  stream results  →  parse into BatchResult list

Both synchronous and async variants are provided for every operation.

Usage:

from ractogateway import anthropic_developer_kit as claude
from ractogateway.prompts.engine import RactoPrompt

prompt = RactoPrompt(role="assistant", aim="answer briefly",
                     constraints="", tone="", output_format="text")

processor = claude.AnthropicBatchProcessor(
    model="claude-haiku-4-5-20251001",
    default_prompt=prompt,
)

results = processor.submit_and_wait([
    claude.BatchItem(custom_id="q1", user_message="What is 2+2?"),
    claude.BatchItem(custom_id="q2", user_message="Capital of France?"),
])

for r in results:
    print(r.custom_id, r.response.content if r.ok else r.error)
class ractogateway.batch.anthropic_batch.AnthropicBatchProcessor(model='claude-haiku-4-5-20251001', *, api_key=None, default_prompt=None)[source]

Bases: object

Submit thousands of Claude requests to Anthropic’s Message Batches API at ~50 % of standard API cost.

Parameters:
  • model (str) – Claude model for all items (e.g. "claude-haiku-4-5-20251001").

  • api_key (str | None) – Anthropic API key. Falls back to ANTHROPIC_API_KEY env var.

  • default_prompt (RactoPrompt | None) – RACTO prompt used as the system message for every batch item.

submit_batch / asubmit_batch:

Create a batch job → returns BatchJobInfo.

poll_status / apoll_status:

Fetch current job state → returns updated BatchJobInfo.

get_results / aget_results:

Stream and parse completed job results → list[BatchResult].

submit_and_wait / asubmit_and_wait:

Convenience: submit + poll until done + return results.

provider: str = 'anthropic'
submit_batch(items, *, prompt=None)[source]

Create an Anthropic Message Batch job.

Returns immediately with a BatchJobInfo (status = IN_PROGRESS).

Return type:

BatchJobInfo

poll_status(job_id)[source]

Fetch the current status of batch job job_id.

Return type:

BatchJobInfo

get_results(job_id)[source]

Stream and parse results for a completed batch job.

Raises:

RuntimeError – If the job is not yet completed.

Return type:

list[BatchResult]

submit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]

Submit a batch and block until it completes, then return results.

Parameters:
  • poll_interval_s (float) – Seconds between status-poll API calls. Default 60.0.

  • max_wait_s (float) – Maximum total wait. Default 86400 (24 h).

Return type:

list[BatchResult]

async asubmit_batch(items, *, prompt=None)[source]

Async variant of submit_batch().

Return type:

BatchJobInfo

async apoll_status(job_id)[source]

Async variant of poll_status().

Return type:

BatchJobInfo

async aget_results(job_id)[source]

Async variant of get_results().

Return type:

list[BatchResult]

async asubmit_and_wait(items, *, prompt=None, poll_interval_s=60.0, max_wait_s=86400.0)[source]

Async variant of submit_and_wait().

Return type:

list[BatchResult]