Redis

Models

Pydantic configuration models for the Redis infrastructure layer.

class ractogateway.redis._models.RateLimitConfig(**data)[source]

Bases: BaseModel

Configuration for RedisRateLimiter.

Parameters:
  • max_tokens_per_minute (int) – Maximum LLM tokens a single user_id may consume in any 60-second window.

  • key_prefix (str) – Redis key namespace. All rate-limit keys are stored under "{key_prefix}:{user_id}:{unix_minute}".

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_tokens_per_minute: int
key_prefix: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ractogateway.redis._models.ChatMemoryConfig(**data)[source]

Bases: BaseModel

Configuration for RedisChatMemory.

Parameters:
  • max_turns (int) – Maximum number of conversation turns to retain. Each turn consists of one user message and one assistant message, so up to max_turns * 2 raw messages are stored per conversation.

  • ttl_seconds (float | None) – Optional TTL. Every append() call refreshes the expiry on the underlying Redis list. None disables expiry.

  • key_prefix (str) – Redis key namespace. Each conversation is stored at "{key_prefix}:{conversation_id}".

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

max_turns: int
ttl_seconds: float | None
key_prefix: str
model_config: ClassVar[ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

Exact Cache

Redis-backed exact-match cache — distributed drop-in for ExactMatchCache.

Stores LLM responses in Redis so the cache is shared across every process and server in a fleet. The public API is byte-for-byte identical to ExactMatchCache, which means you can substitute RedisExactCache wherever ExactMatchCache is accepted (including all developer-kit exact_cache= parameters) without changing any other code.

Thread-safety

Stats counters are guarded by threading.Lock (same pattern as the in-process cache). The Redis operations themselves are atomic at the command level; no additional locking is required across processes.

Cache key

"{key_prefix}:{sha256_hex}" where the SHA-256 digest is computed from (user_message, system_prompt, model, temperature, max_tokens) — identical hashing logic to _make_key().

Example:

from ractogateway.redis import RedisExactCache
from ractogateway import openai_developer_kit as gpt

cache = RedisExactCache(
    url="redis://localhost:6379/0",
    ttl_seconds=3600,
)
kit = gpt.OpenAIDeveloperKit(model="gpt-4o", exact_cache=cache)
class ractogateway.redis.exact_cache.RedisExactCache(*, url='redis://localhost:6379/0', client=None, ttl_seconds=None, key_prefix='ractogateway:exact')[source]

Bases: object

Distributed exact-match LRU cache backed by Redis.

Parameters:
  • url (str) – Redis connection URL (e.g. "redis://localhost:6379/0"). Ignored when client is provided.

  • client (Any | None) – Pre-built redis.Redis (or compatible) client. Useful when you manage the connection pool yourself or use a mock in tests.

  • ttl_seconds (float | None) – Optional TTL for each entry. Passed directly to Redis SET EX. None means entries never expire (Redis default).

  • key_prefix (str) – Namespace for all Redis keys managed by this instance.

get(user_message, system_prompt, model, temperature, max_tokens)[source]

Return a cached response or None on a miss.

O(1) Redis GET.

Return type:

LLMResponse | None

put(user_message, system_prompt, model, temperature, max_tokens, response)[source]

Store a response in Redis.

O(1) Redis SET [EX ttl].

Return type:

None

invalidate(user_message, system_prompt, model, temperature, max_tokens)[source]

Remove a specific entry. Returns True if it was present.

Return type:

bool

clear()[source]

Delete all entries matching this instance’s key prefix.

Uses SCAN to iterate safely (no KEYS * in production). Also resets in-memory stats counters.

Return type:

None

property stats: CacheStats

Return a snapshot of hit/miss counters plus current Redis key count.

Rate Limiter

Distributed token-bucket rate limiter backed by Redis.

Uses a sliding 1-minute window: each request increments a counter stored at "{key_prefix}:{user_id}:{unix_minute}". The key expires automatically after 60 seconds, so the window always reflects the current minute.

Why not a strict token-bucket?

A true token-bucket requires compare-and-swap semantics (Lua script) to be atomic. The sliding-window approach (INCRBY + EXPIRE in a pipeline) is atomic enough for rate-limiting purposes: it has a small race window at the boundary of two minute-windows, but this is acceptable — the same trade-off made by every major API gateway (Stripe, Cloudflare, etc.).

Example:

from ractogateway.redis import RedisRateLimiter, RateLimitConfig

limiter = RedisRateLimiter(
    url="redis://localhost:6379/0",
    config=RateLimitConfig(max_tokens_per_minute=5_000),
)

# In your request handler (before calling the LLM):
if not limiter.check_and_consume(user_id="user_42", tokens=estimated_tokens):
    raise RuntimeError("Rate limit exceeded — try again in a minute.")
class ractogateway.redis.rate_limiter.RedisRateLimiter(*, url='redis://localhost:6379/0', client=None, config=None)[source]

Bases: object

Fleet-wide token-budget rate limiter backed by a shared Redis instance.

Parameters:
  • url (str) – Redis connection URL. Ignored when client is provided.

  • client (Any | None) – Pre-built redis.Redis client. Useful for connection-pool sharing or unit-test mocking.

  • config (RateLimitConfig | None) – RateLimitConfig controlling the token budget and Redis key namespace. Defaults are applied when None.

check_and_consume(user_id, tokens=1)[source]

Attempt to consume tokens from user_id’s budget.

Returns True if the request is within budget (tokens are consumed), or False if the rate limit would be exceeded (no tokens consumed).

The check-and-increment is done in a single Redis pipeline, making it safe against concurrent requests from the same user.

Parameters:
  • user_id (str) – Opaque identifier for the caller (e.g. API key, user UUID).

  • tokens (int) – Number of tokens to consume. Defaults to 1 for request-count limiting; pass the estimated LLM token count for cost-based limiting.

Return type:

bool

get_remaining(user_id)[source]

Return the remaining token budget for the current minute.

Returns max_tokens_per_minute if the user has not made any requests in the current window.

Return type:

int

reset(user_id)[source]

Delete all rate-limit keys for user_id (current and any stale windows).

Intended for admin / testing use. Uses SCAN to avoid blocking.

Return type:

None

Chat Memory

Sliding-window conversation memory stored in Redis.

Each conversation is kept as a Redis List of JSON-encoded {"role", "content"} message dicts. The list is capped at max_turns * 2 entries (one user + one assistant message per turn) using LTRIM after every append().

Why Redis Lists?

Redis Lists support O(1) push and O(n) range reads — perfect for maintaining a bounded conversation history that must be accessible to multiple server replicas. Unlike in-memory approaches, the history survives rolling deployments and can be shared between a web server and a background worker.

Compatibility with ChatConfig.history

get_history() returns list[dict[str, str]] with "role" and "content" keys. This is the exact format used by all three provider adapters under the hood. You can pass the result directly to ChatConfig(history=memory.get_history(conv_id)) after wrapping each dict in your Message model.

Example:

from ractogateway.redis import RedisChatMemory, ChatMemoryConfig

memory = RedisChatMemory(
    url="redis://localhost:6379/0",
    config=ChatMemoryConfig(max_turns=20, ttl_seconds=1800),
)

# Store messages as the conversation progresses:
memory.append("conv_abc", "user", "What is the capital of France?")
memory.append("conv_abc", "assistant", "Paris.")

# Retrieve history to pass back into the kit:
history = memory.get_history("conv_abc")
# → [{"role": "user", "content": "What is the capital of France?"},
#    {"role": "assistant", "content": "Paris."}]
class ractogateway.redis.chat_memory.RedisChatMemory(*, url='redis://localhost:6379/0', client=None, config=None)[source]

Bases: object

Shared, bounded conversation history backed by Redis.

Parameters:
  • url (str) – Redis connection URL. Ignored when client is provided.

  • client (Any | None) – Pre-built redis.Redis client.

  • config (ChatMemoryConfig | None) – ChatMemoryConfig controlling turn limit, TTL, and key namespace. Defaults are applied when None.

append(conversation_id, role, content)[source]

Append a message to the conversation history.

After appending, the list is trimmed to the last config.max_turns * 2 messages (oldest dropped first). If a TTL is configured, it is refreshed on every append so the window slides with activity.

Parameters:
  • conversation_id (str) – Opaque identifier for the conversation (e.g. session UUID).

  • role (str) – The message author: "user", "assistant", or "system".

  • content (str) – Text content of the message.

Return type:

None

get_history(conversation_id)[source]

Return all stored messages as a list of {"role", "content"} dicts.

The list is ordered oldest-first, matching the ChatConfig.history convention.

Returns an empty list when the conversation does not exist or has expired.

Return type:

list[dict[str, str]]

clear(conversation_id)[source]

Delete the conversation history from Redis.

Return type:

None

count(conversation_id)[source]

Return the number of messages stored for this conversation.

Returns 0 when the conversation does not exist.

Return type:

int