Source code for ractogateway.rag.readers.spreadsheet_reader

"""Spreadsheet reader — handles CSV (stdlib) and XLSX (openpyxl, lazy).

Install xlsx support with:  pip install ractogateway[rag-excel]
"""

from __future__ import annotations

import csv
import io
from pathlib import Path
from typing import Any


def _require_openpyxl() -> Any:
    try:
        import openpyxl
    except ImportError as exc:
        raise ImportError(
            "Reading Excel (.xlsx/.xls) files requires the 'openpyxl' package. "
            "Install it with:  pip install ractogateway[rag-excel]"
        ) from exc
    return openpyxl


from ractogateway.rag._models.document import Document
from ractogateway.rag.readers.base import BaseReader


[docs] class SpreadsheetReader(BaseReader): """Read CSV and Excel spreadsheets into plain text. Each row is rendered as a tab-separated line; an optional header row is prepended. Multiple sheets in an XLSX workbook are separated by a ``--- Sheet: <name> ---`` divider. Parameters ---------- max_rows: Maximum number of rows to read per sheet (``None`` = all). include_header: Whether to repeat the header row at the start of each sheet section. """ def __init__( self, max_rows: int | None = None, include_header: bool = True, ) -> None: self._max_rows = max_rows self._include_header = include_header @property def supported_extensions(self) -> frozenset[str]: return frozenset({".csv", ".tsv", ".xlsx", ".xls"})
[docs] def read(self, path: Path) -> Document: ext = path.suffix.lower() if ext in {".csv", ".tsv"}: return self._read_csv(path) return self._read_xlsx(path)
# ------------------------------------------------------------------ # CSV # ------------------------------------------------------------------ def _read_csv(self, path: Path) -> Document: try: raw = path.read_text(encoding="utf-8") except UnicodeDecodeError: raw = path.read_text(encoding="latin-1") dialect = csv.Sniffer().sniff(raw[:4096], delimiters=",\t;|") reader = csv.reader(io.StringIO(raw), dialect) rows = list(reader) if self._max_rows is not None: rows = rows[: self._max_rows + 1] # +1 for header lines = ["\t".join(row) for row in rows] return Document( content="\n".join(lines), source=str(path.resolve()), metadata={ "extension": path.suffix.lower(), "filename": path.name, "size_bytes": path.stat().st_size, "row_count": len(rows), "col_count": max((len(r) for r in rows), default=0), }, ) # ------------------------------------------------------------------ # XLSX # ------------------------------------------------------------------ def _read_xlsx(self, path: Path) -> Document: openpyxl = _require_openpyxl() wb = openpyxl.load_workbook(str(path), read_only=True, data_only=True) sections: list[str] = [] total_rows = 0 for sheet_name in wb.sheetnames: ws = wb[sheet_name] rows_text: list[str] = [] for i, row in enumerate(ws.iter_rows(values_only=True)): if self._max_rows is not None and i >= self._max_rows: break cells = [str(c) if c is not None else "" for c in row] rows_text.append("\t".join(cells)) total_rows += 1 if rows_text: sections.append(f"--- Sheet: {sheet_name} ---\n" + "\n".join(rows_text)) wb.close() return Document( content="\n\n".join(sections), source=str(path.resolve()), metadata={ "extension": path.suffix.lower(), "filename": path.name, "size_bytes": path.stat().st_size, "sheet_count": len(wb.sheetnames), "total_rows": total_rows, }, )