feat(rag): paragraph-aware chunker (chunk_text)
Browse files- src/rag/__init__.py +0 -0
- src/rag/chunker.py +39 -0
- tests/rag/__init__.py +0 -0
- tests/rag/test_chunker.py +40 -0
src/rag/__init__.py
ADDED
|
File without changes
|
src/rag/chunker.py
ADDED
|
@@ -0,0 +1,39 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Paragraph-aware recursive character splitter for RAG ingestion.
|
| 2 |
+
|
| 3 |
+
Public entry: `chunk_text(text, max_chars, overlap)`. Splits on the first
|
| 4 |
+
of [paragraph break, sentence end, newline, space] that fits inside the
|
| 5 |
+
window. Empty / whitespace-only inputs return [].
|
| 6 |
+
"""
|
| 7 |
+
from __future__ import annotations
|
| 8 |
+
|
| 9 |
+
|
| 10 |
+
_SEPARATORS: tuple[str, ...] = ("\n\n", ". ", "\n", " ")
|
| 11 |
+
|
| 12 |
+
|
| 13 |
+
def chunk_text(text: str, max_chars: int = 600, overlap: int = 80) -> list[str]:
|
| 14 |
+
"""Split `text` into chunks of at most `max_chars`, with `overlap` carry-over."""
|
| 15 |
+
text = text.strip()
|
| 16 |
+
if not text:
|
| 17 |
+
return []
|
| 18 |
+
if len(text) <= max_chars:
|
| 19 |
+
return [text]
|
| 20 |
+
|
| 21 |
+
chunks: list[str] = []
|
| 22 |
+
start = 0
|
| 23 |
+
n = len(text)
|
| 24 |
+
while start < n:
|
| 25 |
+
end = min(start + max_chars, n)
|
| 26 |
+
if end < n:
|
| 27 |
+
# try to land on a clean boundary inside [start, end]
|
| 28 |
+
for sep in _SEPARATORS:
|
| 29 |
+
last = text.rfind(sep, start, end)
|
| 30 |
+
if last > start:
|
| 31 |
+
end = last + len(sep)
|
| 32 |
+
break
|
| 33 |
+
chunk = text[start:end].strip()
|
| 34 |
+
if chunk:
|
| 35 |
+
chunks.append(chunk)
|
| 36 |
+
if end >= n:
|
| 37 |
+
break
|
| 38 |
+
start = max(start + 1, end - overlap)
|
| 39 |
+
return chunks
|
tests/rag/__init__.py
ADDED
|
File without changes
|
tests/rag/test_chunker.py
ADDED
|
@@ -0,0 +1,40 @@
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
| 1 |
+
"""Tests for src.rag.chunker — paragraph-aware character splitter."""
|
| 2 |
+
from __future__ import annotations
|
| 3 |
+
|
| 4 |
+
import pytest
|
| 5 |
+
|
| 6 |
+
from src.rag.chunker import chunk_text
|
| 7 |
+
|
| 8 |
+
|
| 9 |
+
class TestChunkText:
|
| 10 |
+
def test_short_text_returns_single_chunk(self) -> None:
|
| 11 |
+
out = chunk_text("hello world", max_chars=100, overlap=10)
|
| 12 |
+
assert out == ["hello world"]
|
| 13 |
+
|
| 14 |
+
def test_empty_text_returns_empty_list(self) -> None:
|
| 15 |
+
assert chunk_text("", max_chars=100, overlap=10) == []
|
| 16 |
+
assert chunk_text(" \n\n ", max_chars=100, overlap=10) == []
|
| 17 |
+
|
| 18 |
+
def test_long_text_splits_into_multiple_chunks(self) -> None:
|
| 19 |
+
text = "a" * 250
|
| 20 |
+
out = chunk_text(text, max_chars=100, overlap=10)
|
| 21 |
+
assert len(out) >= 3
|
| 22 |
+
# every chunk respects max_chars
|
| 23 |
+
for c in out:
|
| 24 |
+
assert len(c) <= 100
|
| 25 |
+
|
| 26 |
+
def test_overlap_between_chunks(self) -> None:
|
| 27 |
+
text = "abcdefghij" * 30 # 300 chars, no natural break
|
| 28 |
+
out = chunk_text(text, max_chars=100, overlap=20)
|
| 29 |
+
# consecutive chunks share at least some characters
|
| 30 |
+
for i in range(len(out) - 1):
|
| 31 |
+
assert out[i][-10:] in out[i + 1] or out[i + 1][:10] in out[i]
|
| 32 |
+
|
| 33 |
+
def test_paragraph_boundary_preferred(self) -> None:
|
| 34 |
+
# First paragraph fits, second doesn't — split at \n\n
|
| 35 |
+
para_a = "First paragraph content."
|
| 36 |
+
para_b = "Second paragraph content " * 10
|
| 37 |
+
text = f"{para_a}\n\n{para_b}"
|
| 38 |
+
out = chunk_text(text, max_chars=100, overlap=10)
|
| 39 |
+
# first chunk should end at the paragraph boundary, not mid-word
|
| 40 |
+
assert para_a in out[0]
|