Spaces:
Sleeping
Sleeping
| """ | |
| Recursive Splitter Adapter - Secondary Adapter | |
| ์ฌ๊ท์ ํ ์คํธ ๋ถํ ์ ์ํํ๋ ๊ตฌํ์ฒด | |
| """ | |
| import re | |
| import logging | |
| from typing import List, Dict, Any, Optional | |
| from ....core.ports.text_splitter_port import TextSplitterPort | |
| from ....core.domain.models import Document, DocumentChunk | |
| logger = logging.getLogger(__name__) | |
| class RecursiveSplitterAdapter(TextSplitterPort): | |
| """์ฌ๊ท์ ํ ์คํธ ๋ถํ ๊ตฌํ์ฒด""" | |
| def __init__(self, default_chunk_size: int = 500, overlap: int = 50): | |
| self.default_chunk_size = default_chunk_size | |
| self.default_overlap = overlap | |
| # ๋ถํ ์ฐ์ ์์ (ํฐ ๋จ์ -> ์์ ๋จ์) | |
| self.separators = [ | |
| "\n\n\n", # ์ฌ๋ฌ ์ค๋ฐ๊ฟ | |
| "\n\n", # ๋จ๋ฝ ๊ตฌ๋ถ | |
| "\n", # ์ค๋ฐ๊ฟ | |
| ". ", # ๋ฌธ์ฅ ๋ | |
| "! ", # ๋๋ํ | |
| "? ", # ๋ฌผ์ํ | |
| "; ", # ์ธ๋ฏธ์ฝ๋ก | |
| ", ", # ์ผํ | |
| " ", # ๊ณต๋ฐฑ | |
| "", # ๋ฌธ์ ๋จ์ (์ตํ ์๋จ) | |
| ] | |
| async def split_documents( | |
| self, | |
| documents: List[Document], | |
| chunk_config: Optional[Dict[str, Any]] = None | |
| ) -> List[DocumentChunk]: | |
| """๋ฌธ์๋ค์ ์ฒญํฌ๋ก ๋ถํ """ | |
| all_chunks = [] | |
| for document in documents: | |
| try: | |
| chunks = await self.split_text( | |
| text=document.content, | |
| document_id=document.id, | |
| chunk_config=chunk_config, | |
| metadata={ | |
| 'document_type': document.document_type.value, | |
| 'title': document.title, | |
| 'priority_score': document.priority_score, | |
| 'technologies': document.metadata.get('technologies', []), | |
| 'source': document.source, | |
| **document.metadata | |
| } | |
| ) | |
| all_chunks.extend(chunks) | |
| except Exception as e: | |
| logger.error(f"Failed to split document {document.id}: {e}") | |
| continue | |
| logger.info(f"Split {len(documents)} documents into {len(all_chunks)} chunks") | |
| return all_chunks | |
| async def split_text( | |
| self, | |
| text: str, | |
| document_id: str, | |
| chunk_config: Optional[Dict[str, Any]] = None, | |
| metadata: Optional[Dict[str, Any]] = None | |
| ) -> List[DocumentChunk]: | |
| """์ฌ๊ท์ ์ผ๋ก ํ ์คํธ๋ฅผ ์ฒญํฌ๋ก ๋ถํ """ | |
| try: | |
| # ์ค์ ํ๋ผ๋ฏธํฐ | |
| config = chunk_config or {} | |
| chunk_size = config.get('chunk_size', self.default_chunk_size) | |
| overlap = config.get('overlap', self.default_overlap) | |
| # ์ฌ๊ท์ ๋ถํ ์ํ | |
| chunks_text = self._recursive_split(text, chunk_size, overlap) | |
| # DocumentChunk ๊ฐ์ฒด๋ก ๋ณํ | |
| chunks = [] | |
| for i, chunk_text in enumerate(chunks_text): | |
| if len(chunk_text.strip()) < 10: # ๋๋ฌด ์งง์ ์ฒญํฌ ์ ์ธ | |
| continue | |
| chunk_metadata = { | |
| **(metadata or {}), | |
| 'chunk_type': 'recursive', | |
| 'chunk_method': 'recursive_character', | |
| 'content_length': len(chunk_text) | |
| } | |
| chunk = DocumentChunk( | |
| id=f"{document_id}_chunk_{i}", | |
| content=chunk_text.strip(), | |
| document_id=document_id, | |
| chunk_index=i, | |
| metadata=chunk_metadata | |
| ) | |
| chunks.append(chunk) | |
| return chunks | |
| except Exception as e: | |
| logger.error(f"Failed to split text for document {document_id}: {e}") | |
| return [] | |
| def _recursive_split(self, text: str, chunk_size: int, overlap: int) -> List[str]: | |
| """์ฌ๊ท์ ๋ถํ ๋ฉ์ธ ๋ก์ง""" | |
| # ํ ์คํธ๊ฐ ์ฒญํฌ ํฌ๊ธฐ๋ณด๋ค ์์ผ๋ฉด ๊ทธ๋๋ก ๋ฐํ | |
| if len(text) <= chunk_size: | |
| return [text] if text.strip() else [] | |
| # ๊ฐ ๊ตฌ๋ถ์๋ก ๋ถํ ์๋ | |
| for separator in self.separators: | |
| if separator in text: | |
| chunks = self._split_with_separator(text, separator, chunk_size, overlap) | |
| if chunks: # ์ฑ๊ณต์ ์ผ๋ก ๋ถํ ๋ ๊ฒฝ์ฐ | |
| return chunks | |
| # ๋ชจ๋ ๊ตฌ๋ถ์๋ก ๋ถํ ์คํจํ ๊ฒฝ์ฐ ๊ฐ์ ๋ถํ | |
| return self._force_split(text, chunk_size, overlap) | |
| def _split_with_separator( | |
| self, | |
| text: str, | |
| separator: str, | |
| chunk_size: int, | |
| overlap: int | |
| ) -> List[str]: | |
| """ํน์ ๊ตฌ๋ถ์๋ก ๋ถํ """ | |
| # ๊ตฌ๋ถ์๋ก ํ ์คํธ ๋ถํ | |
| if separator == "": | |
| # ๋น ๋ฌธ์์ด์ธ ๊ฒฝ์ฐ ๋ฌธ์ ๋จ์ ๋ถํ | |
| splits = list(text) | |
| else: | |
| splits = text.split(separator) | |
| # ๊ตฌ๋ถ์๋ฅผ ๋ค์ ์ถ๊ฐ (๋ง์ง๋ง ์ ์ธ) | |
| if separator != "" and len(splits) > 1: | |
| for i in range(len(splits) - 1): | |
| splits[i] += separator | |
| return self._merge_splits(splits, chunk_size, overlap) | |
| def _merge_splits(self, splits: List[str], chunk_size: int, overlap: int) -> List[str]: | |
| """๋ถํ ๋ ์กฐ๊ฐ๋ค์ ์ ์ ํ ํฌ๊ธฐ๋ก ๋ณํฉ""" | |
| chunks = [] | |
| current_chunk = "" | |
| for split in splits: | |
| # ํ์ฌ ์ฒญํฌ์ ์ถ๊ฐํ์ ๋ ํฌ๊ธฐ ์ฒดํฌ | |
| if len(current_chunk + split) <= chunk_size: | |
| current_chunk += split | |
| else: | |
| # ํ์ฌ ์ฒญํฌ๊ฐ ๋๋ฌด ํฌ๋ฉด ์ ์ฅ | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| # ์ ์ฒญํฌ ์์ | |
| if len(split) > chunk_size: | |
| # split ์์ฒด๊ฐ ๋๋ฌด ํฌ๋ฉด ์ฌ๊ท์ ์ผ๋ก ๋ค์ ๋ถํ | |
| sub_chunks = self._recursive_split(split, chunk_size, overlap) | |
| chunks.extend(sub_chunks) | |
| current_chunk = "" | |
| else: | |
| current_chunk = split | |
| # ๋ง์ง๋ง ์ฒญํฌ ์ถ๊ฐ | |
| if current_chunk: | |
| chunks.append(current_chunk) | |
| # overlap ์ ์ฉ | |
| if overlap > 0: | |
| chunks = self._apply_overlap(chunks, overlap) | |
| return [chunk for chunk in chunks if chunk.strip()] | |
| def _apply_overlap(self, chunks: List[str], overlap: int) -> List[str]: | |
| """์ฒญํฌ๋ค์ overlap ์ ์ฉ""" | |
| if len(chunks) <= 1: | |
| return chunks | |
| overlapped_chunks = [] | |
| for i, chunk in enumerate(chunks): | |
| if i == 0: | |
| # ์ฒซ ๋ฒ์งธ ์ฒญํฌ๋ ๊ทธ๋๋ก | |
| overlapped_chunks.append(chunk) | |
| else: | |
| # ์ด์ ์ฒญํฌ์ ๋ง์ง๋ง ๋ถ๋ถ์ ํ์ฌ ์ฒญํฌ ์์ ์ถ๊ฐ | |
| prev_chunk = chunks[i - 1] | |
| # ์ด์ ์ฒญํฌ์์ overlap๋งํผ ๊ฐ์ ธ์ค๊ธฐ | |
| if len(prev_chunk) > overlap: | |
| overlap_text = prev_chunk[-overlap:] | |
| overlapped_chunk = overlap_text + chunk | |
| else: | |
| overlapped_chunk = prev_chunk + chunk | |
| overlapped_chunks.append(overlapped_chunk) | |
| return overlapped_chunks | |
| def _force_split(self, text: str, chunk_size: int, overlap: int) -> List[str]: | |
| """๊ฐ์ ๋ถํ (์ตํ ์๋จ)""" | |
| chunks = [] | |
| start = 0 | |
| while start < len(text): | |
| end = start + chunk_size | |
| chunk = text[start:end] | |
| if chunk.strip(): | |
| chunks.append(chunk) | |
| # overlap ๊ณ ๋ คํด์ ๋ค์ ์์์ ๊ณ์ฐ | |
| start = end - overlap if overlap > 0 else end | |
| return chunks | |
| def calculate_chunk_size(self, text: str, target_chunk_size: int = 500) -> int: | |
| """ํ ์คํธ ํน์ฑ์ ๊ณ ๋ คํ ์ต์ ์ฒญํฌ ํฌ๊ธฐ ๊ณ์ฐ""" | |
| text_length = len(text) | |
| # ์งง์ ํ ์คํธ๋ ๊ทธ๋๋ก | |
| if text_length <= target_chunk_size: | |
| return text_length | |
| # ๊ตฌ๋ถ์ ๋ฐ๋ ๊ณ์ฐ | |
| separator_density = 0 | |
| for separator in self.separators[:4]: # ์ฃผ์ ๊ตฌ๋ถ์๋ง ์ฒดํฌ | |
| if separator in text: | |
| separator_density += text.count(separator) / text_length | |
| # ๊ตฌ๋ถ์ ๋ฐ๋์ ๋ฐ๋ฅธ ์ฒญํฌ ํฌ๊ธฐ ์กฐ์ | |
| if separator_density > 0.01: # ๊ตฌ๋ถ์๊ฐ ๋ง์ผ๋ฉด ๋ ํฐ ์ฒญํฌ | |
| adjusted_size = min(800, int(target_chunk_size * 1.2)) | |
| elif separator_density < 0.005: # ๊ตฌ๋ถ์๊ฐ ์ ์ผ๋ฉด ๋ ์์ ์ฒญํฌ | |
| adjusted_size = max(200, int(target_chunk_size * 0.8)) | |
| else: | |
| adjusted_size = target_chunk_size | |
| return adjusted_size | |
| def estimate_chunks_count( | |
| self, | |
| text: str, | |
| chunk_config: Optional[Dict[str, Any]] = None | |
| ) -> int: | |
| """์์ ์ฒญํฌ ๊ฐ์ ์ถ์ """ | |
| config = chunk_config or {} | |
| chunk_size = config.get('chunk_size', self.default_chunk_size) | |
| overlap = config.get('overlap', self.default_overlap) | |
| optimal_size = self.calculate_chunk_size(text, chunk_size) | |
| # overlap์ ๊ณ ๋ คํ ์ค์ ์งํ๋ | |
| effective_chunk_size = optimal_size - overlap if overlap > 0 else optimal_size | |
| effective_chunk_size = max(100, effective_chunk_size) # ์ต์๊ฐ ๋ณด์ฅ | |
| estimated_count = max(1, (len(text) + effective_chunk_size - 1) // effective_chunk_size) | |
| return estimated_count | |
| def get_splitting_strategy(self) -> str: | |
| """๋ถํ ์ ๋ต ์ด๋ฆ""" | |
| return "recursive" | |
| def is_available(self) -> bool: | |
| """๋ถํ ๊ธฐ ์ฌ์ฉ ๊ฐ๋ฅ ์ฌ๋ถ""" | |
| return True |