Source code for ractogateway.redis.chat_memory

"""Sliding-window conversation memory stored in Redis.

Each conversation is kept as a Redis List of JSON-encoded ``{"role", "content"}``
message dicts.  The list is capped at ``max_turns * 2`` entries (one user + one
assistant message per turn) using ``LTRIM`` after every ``append()``.

Why Redis Lists?
----------------
Redis Lists support O(1) push and O(n) range reads — perfect for maintaining a
bounded conversation history that must be accessible to multiple server replicas.
Unlike in-memory approaches, the history survives rolling deployments and can be
shared between a web server and a background worker.

Compatibility with ``ChatConfig.history``
-----------------------------------------
:meth:`get_history` returns ``list[dict[str, str]]`` with ``"role"`` and
``"content"`` keys.  This is the exact format used by all three provider
adapters under the hood.  You can pass the result directly to
``ChatConfig(history=memory.get_history(conv_id))`` after wrapping each dict in
your ``Message`` model.

Example::

    from ractogateway.redis import RedisChatMemory, ChatMemoryConfig

    memory = RedisChatMemory(
        url="redis://localhost:6379/0",
        config=ChatMemoryConfig(max_turns=20, ttl_seconds=1800),
    )

    # Store messages as the conversation progresses:
    memory.append("conv_abc", "user", "What is the capital of France?")
    memory.append("conv_abc", "assistant", "Paris.")

    # Retrieve history to pass back into the kit:
    history = memory.get_history("conv_abc")
    # → [{"role": "user", "content": "What is the capital of France?"},
    #    {"role": "assistant", "content": "Paris."}]
"""

from __future__ import annotations

import json
from typing import Any

from ractogateway.redis._models import ChatMemoryConfig


def _require_redis() -> Any:
    try:
        import redis as redis_lib
    except ImportError as exc:
        raise ImportError(
            "The 'redis' package is required for RedisChatMemory. "
            "Install it with:  pip install ractogateway[redis]"
        ) from exc
    return redis_lib


[docs] class RedisChatMemory: """Shared, bounded conversation history backed by Redis. Parameters ---------- url: Redis connection URL. Ignored when *client* is provided. client: Pre-built ``redis.Redis`` client. config: :class:`~ractogateway.redis.ChatMemoryConfig` controlling turn limit, TTL, and key namespace. Defaults are applied when ``None``. """ def __init__( self, *, url: str = "redis://localhost:6379/0", client: Any | None = None, config: ChatMemoryConfig | None = None, ) -> None: self._url = url self._provided_client = client self._config = config or ChatMemoryConfig() # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ def _client(self) -> Any: if self._provided_client is not None: return self._provided_client return _require_redis().from_url(self._url, decode_responses=True) def _key(self, conversation_id: str) -> str: return f"{self._config.key_prefix}:{conversation_id}" # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------
[docs] def append(self, conversation_id: str, role: str, content: str) -> None: """Append a message to the conversation history. After appending, the list is trimmed to the last ``config.max_turns * 2`` messages (oldest dropped first). If a TTL is configured, it is refreshed on every append so the window slides with activity. Parameters ---------- conversation_id: Opaque identifier for the conversation (e.g. session UUID). role: The message author: ``"user"``, ``"assistant"``, or ``"system"``. content: Text content of the message. """ key = self._key(conversation_id) payload = json.dumps({"role": role, "content": content}) max_messages = self._config.max_turns * 2 cli = self._client() with cli.pipeline(transaction=True) as pipe: pipe.rpush(key, payload) # Keep only the most-recent max_messages entries. pipe.ltrim(key, -max_messages, -1) if self._config.ttl_seconds is not None: pipe.expire(key, int(self._config.ttl_seconds)) pipe.execute()
[docs] def get_history(self, conversation_id: str) -> list[dict[str, str]]: """Return all stored messages as a list of ``{"role", "content"}`` dicts. The list is ordered oldest-first, matching the ``ChatConfig.history`` convention. Returns an empty list when the conversation does not exist or has expired. """ key = self._key(conversation_id) raw_messages: list[str] = self._client().lrange(key, 0, -1) result: list[dict[str, str]] = [] for raw in raw_messages: try: msg: dict[str, str] = json.loads(raw) result.append(msg) except (json.JSONDecodeError, TypeError): # Skip corrupted entries rather than crashing. continue return result
[docs] def clear(self, conversation_id: str) -> None: """Delete the conversation history from Redis.""" self._client().delete(self._key(conversation_id))
[docs] def count(self, conversation_id: str) -> int: """Return the number of messages stored for this conversation. Returns ``0`` when the conversation does not exist. """ return int(self._client().llen(self._key(conversation_id)))
def __repr__(self) -> str: # pragma: no cover cfg = self._config return ( f"RedisChatMemory(max_turns={cfg.max_turns}, " f"ttl={cfg.ttl_seconds}s, " f"prefix={cfg.key_prefix!r})" )