Yamang02's picture
Upload folder using huggingface_hub
e75e78f verified
raw
history blame
10.4 kB
"""
Recursive Splitter Adapter - Secondary Adapter
์žฌ๊ท€์  ํ…์ŠคํŠธ ๋ถ„ํ• ์„ ์ˆ˜ํ–‰ํ•˜๋Š” ๊ตฌํ˜„์ฒด
"""
import re
import logging
from typing import List, Dict, Any, Optional
from ....core.ports.text_splitter_port import TextSplitterPort
from ....core.domain.models import Document, DocumentChunk
logger = logging.getLogger(__name__)
class RecursiveSplitterAdapter(TextSplitterPort):
"""์žฌ๊ท€์  ํ…์ŠคํŠธ ๋ถ„ํ•  ๊ตฌํ˜„์ฒด"""
def __init__(self, default_chunk_size: int = 500, overlap: int = 50):
self.default_chunk_size = default_chunk_size
self.default_overlap = overlap
# ๋ถ„ํ•  ์šฐ์„ ์ˆœ์œ„ (ํฐ ๋‹จ์œ„ -> ์ž‘์€ ๋‹จ์œ„)
self.separators = [
"\n\n\n", # ์—ฌ๋Ÿฌ ์ค„๋ฐ”๊ฟˆ
"\n\n", # ๋‹จ๋ฝ ๊ตฌ๋ถ„
"\n", # ์ค„๋ฐ”๊ฟˆ
". ", # ๋ฌธ์žฅ ๋
"! ", # ๋А๋‚Œํ‘œ
"? ", # ๋ฌผ์Œํ‘œ
"; ", # ์„ธ๋ฏธ์ฝœ๋ก 
", ", # ์‰ผํ‘œ
" ", # ๊ณต๋ฐฑ
"", # ๋ฌธ์ž ๋‹จ์œ„ (์ตœํ›„ ์ˆ˜๋‹จ)
]
async def split_documents(
self,
documents: List[Document],
chunk_config: Optional[Dict[str, Any]] = None
) -> List[DocumentChunk]:
"""๋ฌธ์„œ๋“ค์„ ์ฒญํฌ๋กœ ๋ถ„ํ• """
all_chunks = []
for document in documents:
try:
chunks = await self.split_text(
text=document.content,
document_id=document.id,
chunk_config=chunk_config,
metadata={
'document_type': document.document_type.value,
'title': document.title,
'priority_score': document.priority_score,
'technologies': document.metadata.get('technologies', []),
'source': document.source,
**document.metadata
}
)
all_chunks.extend(chunks)
except Exception as e:
logger.error(f"Failed to split document {document.id}: {e}")
continue
logger.info(f"Split {len(documents)} documents into {len(all_chunks)} chunks")
return all_chunks
async def split_text(
self,
text: str,
document_id: str,
chunk_config: Optional[Dict[str, Any]] = None,
metadata: Optional[Dict[str, Any]] = None
) -> List[DocumentChunk]:
"""์žฌ๊ท€์ ์œผ๋กœ ํ…์ŠคํŠธ๋ฅผ ์ฒญํฌ๋กœ ๋ถ„ํ• """
try:
# ์„ค์ • ํŒŒ๋ผ๋ฏธํ„ฐ
config = chunk_config or {}
chunk_size = config.get('chunk_size', self.default_chunk_size)
overlap = config.get('overlap', self.default_overlap)
# ์žฌ๊ท€์  ๋ถ„ํ•  ์ˆ˜ํ–‰
chunks_text = self._recursive_split(text, chunk_size, overlap)
# DocumentChunk ๊ฐ์ฒด๋กœ ๋ณ€ํ™˜
chunks = []
for i, chunk_text in enumerate(chunks_text):
if len(chunk_text.strip()) < 10: # ๋„ˆ๋ฌด ์งง์€ ์ฒญํฌ ์ œ์™ธ
continue
chunk_metadata = {
**(metadata or {}),
'chunk_type': 'recursive',
'chunk_method': 'recursive_character',
'content_length': len(chunk_text)
}
chunk = DocumentChunk(
id=f"{document_id}_chunk_{i}",
content=chunk_text.strip(),
document_id=document_id,
chunk_index=i,
metadata=chunk_metadata
)
chunks.append(chunk)
return chunks
except Exception as e:
logger.error(f"Failed to split text for document {document_id}: {e}")
return []
def _recursive_split(self, text: str, chunk_size: int, overlap: int) -> List[str]:
"""์žฌ๊ท€์  ๋ถ„ํ•  ๋ฉ”์ธ ๋กœ์ง"""
# ํ…์ŠคํŠธ๊ฐ€ ์ฒญํฌ ํฌ๊ธฐ๋ณด๋‹ค ์ž‘์œผ๋ฉด ๊ทธ๋Œ€๋กœ ๋ฐ˜ํ™˜
if len(text) <= chunk_size:
return [text] if text.strip() else []
# ๊ฐ ๊ตฌ๋ถ„์ž๋กœ ๋ถ„ํ•  ์‹œ๋„
for separator in self.separators:
if separator in text:
chunks = self._split_with_separator(text, separator, chunk_size, overlap)
if chunks: # ์„ฑ๊ณต์ ์œผ๋กœ ๋ถ„ํ• ๋œ ๊ฒฝ์šฐ
return chunks
# ๋ชจ๋“  ๊ตฌ๋ถ„์ž๋กœ ๋ถ„ํ•  ์‹คํŒจํ•œ ๊ฒฝ์šฐ ๊ฐ•์ œ ๋ถ„ํ• 
return self._force_split(text, chunk_size, overlap)
def _split_with_separator(
self,
text: str,
separator: str,
chunk_size: int,
overlap: int
) -> List[str]:
"""ํŠน์ • ๊ตฌ๋ถ„์ž๋กœ ๋ถ„ํ• """
# ๊ตฌ๋ถ„์ž๋กœ ํ…์ŠคํŠธ ๋ถ„ํ• 
if separator == "":
# ๋นˆ ๋ฌธ์ž์—ด์ธ ๊ฒฝ์šฐ ๋ฌธ์ž ๋‹จ์œ„ ๋ถ„ํ• 
splits = list(text)
else:
splits = text.split(separator)
# ๊ตฌ๋ถ„์ž๋ฅผ ๋‹ค์‹œ ์ถ”๊ฐ€ (๋งˆ์ง€๋ง‰ ์ œ์™ธ)
if separator != "" and len(splits) > 1:
for i in range(len(splits) - 1):
splits[i] += separator
return self._merge_splits(splits, chunk_size, overlap)
def _merge_splits(self, splits: List[str], chunk_size: int, overlap: int) -> List[str]:
"""๋ถ„ํ• ๋œ ์กฐ๊ฐ๋“ค์„ ์ ์ ˆํ•œ ํฌ๊ธฐ๋กœ ๋ณ‘ํ•ฉ"""
chunks = []
current_chunk = ""
for split in splits:
# ํ˜„์žฌ ์ฒญํฌ์— ์ถ”๊ฐ€ํ–ˆ์„ ๋•Œ ํฌ๊ธฐ ์ฒดํฌ
if len(current_chunk + split) <= chunk_size:
current_chunk += split
else:
# ํ˜„์žฌ ์ฒญํฌ๊ฐ€ ๋„ˆ๋ฌด ํฌ๋ฉด ์ €์žฅ
if current_chunk:
chunks.append(current_chunk)
# ์ƒˆ ์ฒญํฌ ์‹œ์ž‘
if len(split) > chunk_size:
# split ์ž์ฒด๊ฐ€ ๋„ˆ๋ฌด ํฌ๋ฉด ์žฌ๊ท€์ ์œผ๋กœ ๋‹ค์‹œ ๋ถ„ํ• 
sub_chunks = self._recursive_split(split, chunk_size, overlap)
chunks.extend(sub_chunks)
current_chunk = ""
else:
current_chunk = split
# ๋งˆ์ง€๋ง‰ ์ฒญํฌ ์ถ”๊ฐ€
if current_chunk:
chunks.append(current_chunk)
# overlap ์ ์šฉ
if overlap > 0:
chunks = self._apply_overlap(chunks, overlap)
return [chunk for chunk in chunks if chunk.strip()]
def _apply_overlap(self, chunks: List[str], overlap: int) -> List[str]:
"""์ฒญํฌ๋“ค์— overlap ์ ์šฉ"""
if len(chunks) <= 1:
return chunks
overlapped_chunks = []
for i, chunk in enumerate(chunks):
if i == 0:
# ์ฒซ ๋ฒˆ์งธ ์ฒญํฌ๋Š” ๊ทธ๋Œ€๋กœ
overlapped_chunks.append(chunk)
else:
# ์ด์ „ ์ฒญํฌ์˜ ๋งˆ์ง€๋ง‰ ๋ถ€๋ถ„์„ ํ˜„์žฌ ์ฒญํฌ ์•ž์— ์ถ”๊ฐ€
prev_chunk = chunks[i - 1]
# ์ด์ „ ์ฒญํฌ์—์„œ overlap๋งŒํผ ๊ฐ€์ ธ์˜ค๊ธฐ
if len(prev_chunk) > overlap:
overlap_text = prev_chunk[-overlap:]
overlapped_chunk = overlap_text + chunk
else:
overlapped_chunk = prev_chunk + chunk
overlapped_chunks.append(overlapped_chunk)
return overlapped_chunks
def _force_split(self, text: str, chunk_size: int, overlap: int) -> List[str]:
"""๊ฐ•์ œ ๋ถ„ํ•  (์ตœํ›„ ์ˆ˜๋‹จ)"""
chunks = []
start = 0
while start < len(text):
end = start + chunk_size
chunk = text[start:end]
if chunk.strip():
chunks.append(chunk)
# overlap ๊ณ ๋ คํ•ด์„œ ๋‹ค์Œ ์‹œ์ž‘์  ๊ณ„์‚ฐ
start = end - overlap if overlap > 0 else end
return chunks
def calculate_chunk_size(self, text: str, target_chunk_size: int = 500) -> int:
"""ํ…์ŠคํŠธ ํŠน์„ฑ์„ ๊ณ ๋ คํ•œ ์ตœ์  ์ฒญํฌ ํฌ๊ธฐ ๊ณ„์‚ฐ"""
text_length = len(text)
# ์งง์€ ํ…์ŠคํŠธ๋Š” ๊ทธ๋Œ€๋กœ
if text_length <= target_chunk_size:
return text_length
# ๊ตฌ๋ถ„์ž ๋ฐ€๋„ ๊ณ„์‚ฐ
separator_density = 0
for separator in self.separators[:4]: # ์ฃผ์š” ๊ตฌ๋ถ„์ž๋งŒ ์ฒดํฌ
if separator in text:
separator_density += text.count(separator) / text_length
# ๊ตฌ๋ถ„์ž ๋ฐ€๋„์— ๋”ฐ๋ฅธ ์ฒญํฌ ํฌ๊ธฐ ์กฐ์ •
if separator_density > 0.01: # ๊ตฌ๋ถ„์ž๊ฐ€ ๋งŽ์œผ๋ฉด ๋” ํฐ ์ฒญํฌ
adjusted_size = min(800, int(target_chunk_size * 1.2))
elif separator_density < 0.005: # ๊ตฌ๋ถ„์ž๊ฐ€ ์ ์œผ๋ฉด ๋” ์ž‘์€ ์ฒญํฌ
adjusted_size = max(200, int(target_chunk_size * 0.8))
else:
adjusted_size = target_chunk_size
return adjusted_size
def estimate_chunks_count(
self,
text: str,
chunk_config: Optional[Dict[str, Any]] = None
) -> int:
"""์˜ˆ์ƒ ์ฒญํฌ ๊ฐœ์ˆ˜ ์ถ”์ •"""
config = chunk_config or {}
chunk_size = config.get('chunk_size', self.default_chunk_size)
overlap = config.get('overlap', self.default_overlap)
optimal_size = self.calculate_chunk_size(text, chunk_size)
# overlap์„ ๊ณ ๋ คํ•œ ์‹ค์ œ ์ง„ํ–‰๋Ÿ‰
effective_chunk_size = optimal_size - overlap if overlap > 0 else optimal_size
effective_chunk_size = max(100, effective_chunk_size) # ์ตœ์†Œ๊ฐ’ ๋ณด์žฅ
estimated_count = max(1, (len(text) + effective_chunk_size - 1) // effective_chunk_size)
return estimated_count
def get_splitting_strategy(self) -> str:
"""๋ถ„ํ•  ์ „๋žต ์ด๋ฆ„"""
return "recursive"
def is_available(self) -> bool:
"""๋ถ„ํ• ๊ธฐ ์‚ฌ์šฉ ๊ฐ€๋Šฅ ์—ฌ๋ถ€"""
return True