Redis
Models
Pydantic configuration models for the Redis infrastructure layer.
- class ractogateway.redis._models.RateLimitConfig(**data)[source]
Bases:
BaseModelConfiguration for
RedisRateLimiter.- Parameters:
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- class ractogateway.redis._models.ChatMemoryConfig(**data)[source]
Bases:
BaseModelConfiguration for
RedisChatMemory.- Parameters:
max_turns (int) – Maximum number of conversation turns to retain. Each turn consists of one user message and one assistant message, so up to
max_turns * 2raw messages are stored per conversation.ttl_seconds (float | None) – Optional TTL. Every
append()call refreshes the expiry on the underlying Redis list.Nonedisables expiry.key_prefix (str) – Redis key namespace. Each conversation is stored at
"{key_prefix}:{conversation_id}".
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- model_config: ClassVar[ConfigDict] = {}
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
Exact Cache
Redis-backed exact-match cache — distributed drop-in for ExactMatchCache.
Stores LLM responses in Redis so the cache is shared across every process and
server in a fleet. The public API is byte-for-byte identical to
ExactMatchCache, which means you can substitute
RedisExactCache wherever ExactMatchCache is accepted (including all
developer-kit exact_cache= parameters) without changing any other code.
Thread-safety
Stats counters are guarded by threading.Lock (same pattern as the in-process
cache). The Redis operations themselves are atomic at the command level; no
additional locking is required across processes.
Cache key
"{key_prefix}:{sha256_hex}" where the SHA-256 digest is computed from
(user_message, system_prompt, model, temperature, max_tokens) — identical
hashing logic to _make_key().
Example:
from ractogateway.redis import RedisExactCache
from ractogateway import openai_developer_kit as gpt
cache = RedisExactCache(
url="redis://localhost:6379/0",
ttl_seconds=3600,
)
kit = gpt.OpenAIDeveloperKit(model="gpt-4o", exact_cache=cache)
- class ractogateway.redis.exact_cache.RedisExactCache(*, url='redis://localhost:6379/0', client=None, ttl_seconds=None, key_prefix='ractogateway:exact')[source]
Bases:
objectDistributed exact-match LRU cache backed by Redis.
- Parameters:
url (
str) – Redis connection URL (e.g."redis://localhost:6379/0"). Ignored when client is provided.client (
Any|None) – Pre-builtredis.Redis(or compatible) client. Useful when you manage the connection pool yourself or use a mock in tests.ttl_seconds (
float|None) – Optional TTL for each entry. Passed directly to RedisSET EX.Nonemeans entries never expire (Redis default).key_prefix (
str) – Namespace for all Redis keys managed by this instance.
- get(user_message, system_prompt, model, temperature, max_tokens)[source]
Return a cached response or
Noneon a miss.O(1) Redis GET.
- Return type:
- put(user_message, system_prompt, model, temperature, max_tokens, response)[source]
Store a response in Redis.
O(1) Redis SET [EX ttl].
- Return type:
- invalidate(user_message, system_prompt, model, temperature, max_tokens)[source]
Remove a specific entry. Returns
Trueif it was present.- Return type:
- clear()[source]
Delete all entries matching this instance’s key prefix.
Uses SCAN to iterate safely (no KEYS * in production). Also resets in-memory stats counters.
- Return type:
- property stats: CacheStats
Return a snapshot of hit/miss counters plus current Redis key count.
Rate Limiter
Distributed token-bucket rate limiter backed by Redis.
Uses a sliding 1-minute window: each request increments a counter stored at
"{key_prefix}:{user_id}:{unix_minute}". The key expires automatically after
60 seconds, so the window always reflects the current minute.
Why not a strict token-bucket?
A true token-bucket requires compare-and-swap semantics (Lua script) to be atomic. The sliding-window approach (INCRBY + EXPIRE in a pipeline) is atomic enough for rate-limiting purposes: it has a small race window at the boundary of two minute-windows, but this is acceptable — the same trade-off made by every major API gateway (Stripe, Cloudflare, etc.).
Example:
from ractogateway.redis import RedisRateLimiter, RateLimitConfig
limiter = RedisRateLimiter(
url="redis://localhost:6379/0",
config=RateLimitConfig(max_tokens_per_minute=5_000),
)
# In your request handler (before calling the LLM):
if not limiter.check_and_consume(user_id="user_42", tokens=estimated_tokens):
raise RuntimeError("Rate limit exceeded — try again in a minute.")
- class ractogateway.redis.rate_limiter.RedisRateLimiter(*, url='redis://localhost:6379/0', client=None, config=None)[source]
Bases:
objectFleet-wide token-budget rate limiter backed by a shared Redis instance.
- Parameters:
url (
str) – Redis connection URL. Ignored when client is provided.client (
Any|None) – Pre-builtredis.Redisclient. Useful for connection-pool sharing or unit-test mocking.config (
RateLimitConfig|None) –RateLimitConfigcontrolling the token budget and Redis key namespace. Defaults are applied whenNone.
- check_and_consume(user_id, tokens=1)[source]
Attempt to consume tokens from user_id’s budget.
Returns
Trueif the request is within budget (tokens are consumed), orFalseif the rate limit would be exceeded (no tokens consumed).The check-and-increment is done in a single Redis pipeline, making it safe against concurrent requests from the same user.
Chat Memory
Sliding-window conversation memory stored in Redis.
Each conversation is kept as a Redis List of JSON-encoded {"role", "content"}
message dicts. The list is capped at max_turns * 2 entries (one user + one
assistant message per turn) using LTRIM after every append().
Why Redis Lists?
Redis Lists support O(1) push and O(n) range reads — perfect for maintaining a bounded conversation history that must be accessible to multiple server replicas. Unlike in-memory approaches, the history survives rolling deployments and can be shared between a web server and a background worker.
Compatibility with ChatConfig.history
get_history() returns list[dict[str, str]] with "role" and
"content" keys. This is the exact format used by all three provider
adapters under the hood. You can pass the result directly to
ChatConfig(history=memory.get_history(conv_id)) after wrapping each dict in
your Message model.
Example:
from ractogateway.redis import RedisChatMemory, ChatMemoryConfig
memory = RedisChatMemory(
url="redis://localhost:6379/0",
config=ChatMemoryConfig(max_turns=20, ttl_seconds=1800),
)
# Store messages as the conversation progresses:
memory.append("conv_abc", "user", "What is the capital of France?")
memory.append("conv_abc", "assistant", "Paris.")
# Retrieve history to pass back into the kit:
history = memory.get_history("conv_abc")
# → [{"role": "user", "content": "What is the capital of France?"},
# {"role": "assistant", "content": "Paris."}]
- class ractogateway.redis.chat_memory.RedisChatMemory(*, url='redis://localhost:6379/0', client=None, config=None)[source]
Bases:
objectShared, bounded conversation history backed by Redis.
- Parameters:
url (
str) – Redis connection URL. Ignored when client is provided.config (
ChatMemoryConfig|None) –ChatMemoryConfigcontrolling turn limit, TTL, and key namespace. Defaults are applied whenNone.
- append(conversation_id, role, content)[source]
Append a message to the conversation history.
After appending, the list is trimmed to the last
config.max_turns * 2messages (oldest dropped first). If a TTL is configured, it is refreshed on every append so the window slides with activity.