RAG5_SearchStrategy_FastSTT / utils /document_processor.py
jeongsoo's picture
Initial commit
1555e87
"""
๋ฌธ์„œ ์ฒ˜๋ฆฌ ์œ ํ‹ธ๋ฆฌํ‹ฐ ๋ชจ๋“ˆ
"""
import os
import re
import logging
from typing import List, Dict, Any, Optional, Tuple, Union
import numpy as np
logger = logging.getLogger("DocProcessor")
if not logger.hasHandlers():
handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class DocumentProcessor:
"""๋ฌธ์„œ ์ฒ˜๋ฆฌ ์œ ํ‹ธ๋ฆฌํ‹ฐ ํด๋ž˜์Šค"""
@staticmethod
def split_text(
text: str,
chunk_size: int = 512,
chunk_overlap: int = 50,
separator: str = "\n"
) -> List[str]:
"""
ํ…์ŠคํŠธ๋ฅผ ๋” ์ž‘์€ ์ฒญํฌ๋กœ ๋ถ„ํ• 
Args:
text: ๋ถ„ํ• ํ•  ํ…์ŠคํŠธ
chunk_size: ๊ฐ ์ฒญํฌ์˜ ์ตœ๋Œ€ ๋ฌธ์ž ์ˆ˜
chunk_overlap: ์ฒญํฌ ๊ฐ„ ์ค‘์ฒฉ๋˜๋Š” ๋ฌธ์ž ์ˆ˜
separator: ๋ถ„ํ•  ์‹œ ์‚ฌ์šฉํ•  ๊ตฌ๋ถ„์ž
Returns:
๋ถ„ํ• ๋œ ํ…์ŠคํŠธ ์ฒญํฌ ๋ชฉ๋ก
"""
if not text or chunk_size <= 0:
return []
# ๊ตฌ๋ถ„์ž๋กœ ๋ถ„ํ• 
parts = text.split(separator)
chunks = []
current_chunk = []
current_size = 0
for part in parts:
part_size = len(part)
if current_size + part_size + len(current_chunk) > chunk_size and current_chunk:
# ํ˜„์žฌ ์ฒญํฌ๊ฐ€ ์ตœ๋Œ€ ํฌ๊ธฐ๋ฅผ ์ดˆ๊ณผํ•˜๋ฉด ์ €์žฅ
chunks.append(separator.join(current_chunk))
# ์ค‘์ฒฉ์„ ์œ„ํ•ด ์ผ๋ถ€ ์ฒญํฌ ์œ ์ง€
overlap_tokens = []
overlap_size = 0
for token in reversed(current_chunk):
if overlap_size + len(token) <= chunk_overlap:
overlap_tokens.insert(0, token)
overlap_size += len(token) + 1 # separator ๊ธธ์ด ํฌํ•จ
else:
break
current_chunk = overlap_tokens
current_size = overlap_size - len(current_chunk) # separator ๊ธธ์ด ์ œ์™ธ
current_chunk.append(part)
current_size += part_size
# ๋งˆ์ง€๋ง‰ ์ฒญํฌ ์ถ”๊ฐ€
if current_chunk:
chunks.append(separator.join(current_chunk))
return chunks
@staticmethod
def clean_text(text: str, remove_urls: bool = True, remove_extra_whitespace: bool = True) -> str:
"""
ํ…์ŠคํŠธ ์ •์ œ
Args:
text: ์ •์ œํ•  ํ…์ŠคํŠธ
remove_urls: URL ์ œ๊ฑฐ ์—ฌ๋ถ€
remove_extra_whitespace: ์—ฌ๋ถ„์˜ ๊ณต๋ฐฑ ์ œ๊ฑฐ ์—ฌ๋ถ€
Returns:
์ •์ œ๋œ ํ…์ŠคํŠธ
"""
if not text:
return ""
# URL ์ œ๊ฑฐ
if remove_urls:
text = re.sub(r'https?://\S+|www\.\S+', '', text)
# ํŠน์ˆ˜ ๋ฌธ์ž ๋ฐ HTML ํƒœ๊ทธ ์ •์ œ
text = re.sub(r'<.*?>', '', text) # HTML ํƒœ๊ทธ ์ œ๊ฑฐ
# ์—ฌ๋ถ„์˜ ๊ณต๋ฐฑ ์ œ๊ฑฐ
if remove_extra_whitespace:
text = re.sub(r'\s+', ' ', text).strip()
return text
@staticmethod
def text_to_documents(
text: str,
metadata: Optional[Dict[str, Any]] = None,
chunk_size: int = 512,
chunk_overlap: int = 50
) -> List[Dict[str, Any]]:
"""
ํ…์ŠคํŠธ๋ฅผ ๋ฌธ์„œ ๊ฐ์ฒด ๋ชฉ๋ก์œผ๋กœ ๋ณ€ํ™˜
Args:
text: ๋ณ€ํ™˜ํ•  ํ…์ŠคํŠธ
metadata: ๋ฌธ์„œ์— ์ถ”๊ฐ€ํ•  ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ
chunk_size: ๊ฐ ์ฒญํฌ์˜ ์ตœ๋Œ€ ๋ฌธ์ž ์ˆ˜
chunk_overlap: ์ฒญํฌ ๊ฐ„ ์ค‘์ฒฉ๋˜๋Š” ๋ฌธ์ž ์ˆ˜
Returns:
๋ฌธ์„œ ๊ฐ์ฒด ๋ชฉ๋ก
"""
if not text:
return []
# ํ…์ŠคํŠธ ์ •์ œ
clean = DocumentProcessor.clean_text(text)
# ํ…์ŠคํŠธ ๋ถ„ํ• 
chunks = DocumentProcessor.split_text(
clean,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
# ๋ฌธ์„œ ๊ฐ์ฒด ์ƒ์„ฑ
documents = []
for i, chunk in enumerate(chunks):
doc = {
"text": chunk,
"index": i,
"chunk_count": len(chunks)
}
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ถ”๊ฐ€
if metadata:
doc.update(metadata)
documents.append(doc)
return documents
@staticmethod
def load_documents_from_directory(
directory: str,
extensions: List[str] = [".txt", ".md", ".csv"],
recursive: bool = True,
chunk_size: int = 512,
chunk_overlap: int = 50
) -> List[Dict[str, Any]]:
"""
๋””๋ ‰ํ† ๋ฆฌ์—์„œ ๋ฌธ์„œ ๋กœ๋“œ ๋ฐ ์ฒ˜๋ฆฌ
Args:
directory: ๋กœ๋“œํ•  ๋””๋ ‰ํ† ๋ฆฌ ๊ฒฝ๋กœ
extensions: ์ฒ˜๋ฆฌํ•  ํŒŒ์ผ ํ™•์žฅ์ž ๋ชฉ๋ก
recursive: ํ•˜์œ„ ๋””๋ ‰ํ† ๋ฆฌ ๊ฒ€์ƒ‰ ์—ฌ๋ถ€
chunk_size: ๊ฐ ์ฒญํฌ์˜ ์ตœ๋Œ€ ๋ฌธ์ž ์ˆ˜
chunk_overlap: ์ฒญํฌ ๊ฐ„ ์ค‘์ฒฉ๋˜๋Š” ๋ฌธ์ž ์ˆ˜
Returns:
๋ฌธ์„œ ๊ฐ์ฒด ๋ชฉ๋ก
"""
if not os.path.isdir(directory):
logger.error(f"๋””๋ ‰ํ† ๋ฆฌ๋ฅผ ์ฐพ์„ ์ˆ˜ ์—†์Šต๋‹ˆ๋‹ค: {directory}")
return []
documents = []
for root, dirs, files in os.walk(directory):
if not recursive and root != directory:
continue
for file in files:
_, ext = os.path.splitext(file)
if ext.lower() not in extensions:
continue
file_path = os.path.join(root, file)
rel_path = os.path.relpath(file_path, directory)
try:
logger.info(f"ํŒŒ์ผ ๋กœ๋“œ ์ค‘: {rel_path}")
# ๋จผ์ € UTF-8๋กœ ์‹œ๋„
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
except UnicodeDecodeError:
# UTF-8๋กœ ์‹คํŒจํ•˜๋ฉด CP949(ํ•œ๊ตญ์–ด Windows ๊ธฐ๋ณธ ์ธ์ฝ”๋”ฉ)๋กœ ์‹œ๋„
logger.info(f"UTF-8 ๋””์ฝ”๋”ฉ ์‹คํŒจ, CP949๋กœ ์‹œ๋„: {rel_path}")
with open(file_path, 'r', encoding='cp949') as f:
content = f.read()
# ๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์ƒ์„ฑ
metadata = {
"source": rel_path,
"filename": file,
"filetype": ext.lower()[1:],
"filepath": file_path
}
# ๋ฌธ์„œ ์ฒ˜๋ฆฌ
file_docs = DocumentProcessor.text_to_documents(
content,
metadata=metadata,
chunk_size=chunk_size,
chunk_overlap=chunk_overlap
)
documents.extend(file_docs)
logger.info(f"{len(file_docs)}๊ฐœ ์ฒญํฌ ์ถ”์ถœ: {rel_path}")
except Exception as e:
logger.error(f"ํŒŒ์ผ '{rel_path}' ์ฒ˜๋ฆฌ ์ค‘ ์˜ค๋ฅ˜ ๋ฐœ์ƒ: {e}")
continue
logger.info(f"์ด {len(documents)}๊ฐœ ๋ฌธ์„œ ์ฒญํฌ๋ฅผ ๋กœ๋“œํ–ˆ์Šต๋‹ˆ๋‹ค.")
return documents
@staticmethod
def prepare_rag_context(results: List[Dict[str, Any]], field: str = "text") -> List[str]:
"""
๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ์—์„œ RAG์— ์‚ฌ์šฉํ•  ์ปจํ…์ŠคํŠธ ์ถ”์ถœ
Args:
results: ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ๋ชฉ๋ก
field: ํ…์ŠคํŠธ ๋‚ด์šฉ์ด ์žˆ๋Š” ํ•„๋“œ ์ด๋ฆ„
Returns:
์ปจํ…์ŠคํŠธ ํ…์ŠคํŠธ ๋ชฉ๋ก
"""
context = []
for result in results:
if field in result:
context.append(result[field])
return context