import os import json import re import logging import datetime from typing import Dict, List, Any, Union, Tuple # Cấu hình logging logger = logging.getLogger(__name__) class DataProcessor: def __init__(self, data_dir: str = "data"): self.data_dir = data_dir self.metadata = {} self.chunks = [] self.tables = [] self.figures = [] logger.info(f"Khởi tạo DataProcessor với data_dir: {data_dir}") if not os.path.exists(self.data_dir): logger.error(f"Thư mục data không tồn tại: {self.data_dir}") else: self._load_all_data() def _load_all_data(self): """Tải tất cả dữ liệu từ các thư mục con trong data""" logger.info(f"Đang tải dữ liệu từ thư mục: {self.data_dir}") # Quét qua tất cả thư mục trong data for item in os.listdir(self.data_dir): folder_path = os.path.join(self.data_dir, item) # Kiểm tra xem đây có phải là thư mục không if os.path.isdir(folder_path): metadata_file = os.path.join(folder_path, "metadata.json") # Nếu có file metadata.json if os.path.exists(metadata_file): try: # Tải metadata with open(metadata_file, 'r', encoding='utf-8') as f: content = f.read() if not content.strip(): logger.warning(f"File metadata trống: {metadata_file}") continue folder_metadata = json.loads(content) # Xác định ID của thư mục folder_id = None if "bai_info" in folder_metadata: folder_id = folder_metadata["bai_info"].get("id", item) elif "phuluc_info" in folder_metadata: folder_id = folder_metadata["phuluc_info"].get("id", item) else: folder_id = item # Lưu metadata vào từ điển self.metadata[folder_id] = folder_metadata # Tải tất cả chunks, tables và figures self._load_content_from_metadata(folder_path, folder_metadata) logger.info(f"Đã tải xong thư mục: {item}") except json.JSONDecodeError as e: logger.error(f"Lỗi đọc file JSON {metadata_file}: {e}") except Exception as e: logger.error(f"Lỗi khi tải metadata từ {metadata_file}: {e}") def _load_content_from_metadata(self, folder_path: str, folder_metadata: Dict[str, Any]): """Tải nội dung chunks, tables và figures từ metadata""" # Tải chunks for chunk_meta in folder_metadata.get("chunks", []): chunk_id = chunk_meta.get("id") chunk_path = os.path.join(folder_path, "chunks", f"{chunk_id}.md") chunk_data = chunk_meta.copy() # Sao chép metadata của chunk # Thêm nội dung từ file markdown nếu tồn tại if os.path.exists(chunk_path): with open(chunk_path, 'r', encoding='utf-8') as f: content = f.read() chunk_data["content"] = self._extract_content_from_markdown(content) else: # Nếu không tìm thấy file, tạo nội dung mẫu và ghi log ở debug level chunk_data["content"] = f"Nội dung cho {chunk_id} không tìm thấy." logger.debug(f"Không tìm thấy file chunk: {chunk_path}") self.chunks.append(chunk_data) # Tải tables for table_meta in folder_metadata.get("tables", []): table_id = table_meta.get("id") table_path = os.path.join(folder_path, "tables", f"{table_id}.md") table_data = table_meta.copy() # Thêm nội dung từ file markdown nếu tồn tại if os.path.exists(table_path): with open(table_path, 'r', encoding='utf-8') as f: content = f.read() table_data["content"] = self._extract_content_from_markdown(content) else: table_data["content"] = f"Bảng {table_id} không tìm thấy." logger.debug(f"Không tìm thấy file bảng: {table_path}") self.tables.append(table_data) # Tải figures for figure_meta in folder_metadata.get("figures", []): figure_id = figure_meta.get("id") figure_path = os.path.join(folder_path, "figures", f"{figure_id}.md") figure_data = figure_meta.copy() # Thêm nội dung từ file markdown nếu tồn tại content_loaded = False if os.path.exists(figure_path): with open(figure_path, 'r', encoding='utf-8') as f: content = f.read() figure_data["content"] = self._extract_content_from_markdown(content) content_loaded = True # Thêm đường dẫn đến file hình ảnh nếu có image_path = None image_extensions = ['.png', '.jpg', '.jpeg', '.gif', '.svg'] for ext in image_extensions: img_path = os.path.join(folder_path, "figures", f"{figure_id}{ext}") if os.path.exists(img_path): image_path = img_path break if image_path: figure_data["image_path"] = image_path # Tạo nội dung mặc định nếu không có file markdown if not content_loaded: figure_caption = figure_meta.get("title", f"Hình {figure_id}") figure_data["content"] = f"![{figure_caption}]({image_path})" elif not content_loaded: # Nếu không có cả file markdown và file hình figure_data["content"] = f"Hình {figure_id} không tìm thấy." logger.debug(f"Không tìm thấy file hình cho {figure_id}") self.figures.append(figure_data) # Tải data_files (trường hợp phụ lục) if "data_files" in folder_metadata: for data_file_meta in folder_metadata.get("data_files", []): data_id = data_file_meta.get("id") data_path = os.path.join(folder_path, "data", f"{data_id}.md") data_file = data_file_meta.copy() # Thêm nội dung từ file markdown nếu tồn tại if os.path.exists(data_path): with open(data_path, 'r', encoding='utf-8') as f: content = f.read() data_file["content"] = self._extract_content_from_markdown(content) # Xác định loại nội dung content_type = data_file.get("content_type", "table") # Thêm vào danh sách phù hợp dựa trên loại nội dung if content_type == "table": self.tables.append(data_file) elif content_type == "text": self.chunks.append(data_file) elif content_type == "figure": self.figures.append(data_file) logger.debug(f"Đã tải dữ liệu: {data_id}, loại: {content_type}") else: logger.debug(f"Không tìm thấy file dữ liệu: {data_path}") data_file["content"] = f"Dữ liệu {data_id} không tìm thấy." def _extract_content_from_markdown(self, md_content: str) -> str: """Trích xuất nội dung từ markdown, bỏ qua phần frontmatter""" # Tách frontmatter (nằm giữa "---") if md_content.startswith("---"): parts = md_content.split("---", 2) if len(parts) >= 3: return parts[2].strip() return md_content def get_all_items(self) -> Dict[str, List[Dict[str, Any]]]: """Trả về tất cả các items đã tải""" return { "chunks": self.chunks, "tables": self.tables, "figures": self.figures } def get_all_metadata(self) -> Dict[str, Any]: """Trả về tất cả metadata của các bài học và phụ lục""" return self.metadata def get_chunk_by_id(self, chunk_id: str) -> Union[Dict[str, Any], None]: """Tìm và trả về chunk theo ID""" for chunk in self.chunks: if chunk.get("id") == chunk_id: return chunk return None def get_table_by_id(self, table_id: str) -> Union[Dict[str, Any], None]: """Tìm và trả về bảng theo ID""" for table in self.tables: if table.get("id") == table_id: return table return None def get_figure_by_id(self, figure_id: str) -> Union[Dict[str, Any], None]: """Tìm và trả về hình theo ID""" for figure in self.figures: if figure.get("id") == figure_id: return figure return None def find_items_by_age(self, age: int) -> Dict[str, List[Dict[str, Any]]]: """Tìm các items (chunks, tables, figures) liên quan đến độ tuổi của người dùng""" relevant_chunks = [] relevant_tables = [] relevant_figures = [] # Lọc chunks for chunk in self.chunks: age_range = chunk.get("age_range", [0, 100]) if len(age_range) == 2 and age_range[0] <= age <= age_range[1]: relevant_chunks.append(chunk) # Lọc tables for table in self.tables: age_range = table.get("age_range", [0, 100]) if len(age_range) == 2 and age_range[0] <= age <= age_range[1]: relevant_tables.append(table) # Lọc figures for figure in self.figures: age_range = figure.get("age_range", [0, 100]) if len(age_range) == 2 and age_range[0] <= age <= age_range[1]: relevant_figures.append(figure) return { "chunks": relevant_chunks, "tables": relevant_tables, "figures": relevant_figures } def get_related_items(self, item_id: str) -> Dict[str, List[Dict[str, Any]]]: """Tìm các items liên quan đến một item cụ thể dựa vào related_chunks""" related_chunks = [] related_tables = [] related_figures = [] # Tìm item gốc source_item = None for item in self.chunks + self.tables + self.figures: if item.get("id") == item_id: source_item = item break if not source_item: return { "chunks": [], "tables": [], "figures": [] } # Lấy danh sách IDs của các items liên quan related_ids = source_item.get("related_chunks", []) # Tìm các items liên quan for related_id in related_ids: # Tìm trong chunks for chunk in self.chunks: if chunk.get("id") == related_id: related_chunks.append(chunk) break # Tìm trong tables for table in self.tables: if table.get("id") == related_id: related_tables.append(table) break # Tìm trong figures for figure in self.figures: if figure.get("id") == related_id: related_figures.append(figure) break return { "chunks": related_chunks, "tables": related_tables, "figures": related_figures } def preprocess_query(self, query: str) -> str: """Tiền xử lý câu truy vấn""" # Loại bỏ ký tự đặc biệt query = re.sub(r'[^\w\s\d]', ' ', query) # Loại bỏ khoảng trắng thừa query = re.sub(r'\s+', ' ', query).strip() return query def format_context_for_rag(self, items: List[Dict[str, Any]]) -> str: """Định dạng các items để đưa vào ngữ cảnh cho mô hình RAG""" formatted_contexts = [] for i, item in enumerate(items, 1): item_id = item.get("id", "") title = item.get("title", "") content = item.get("content", "") content_type = item.get("content_type", "text") # Nếu là bảng, thêm tiêu đề "Bảng:" if content_type == "table": title = f"Bảng: {title}" # Nếu là hình, thêm tiêu đề "Hình:" elif content_type == "figure": title = f"Hình: {title}" formatted_context = f"[{i}] {title}\n\n{content}\n\n" formatted_contexts.append(formatted_context) return "\n".join(formatted_contexts) def prepare_for_embedding(self) -> List[Dict[str, Any]]: """Chuẩn bị dữ liệu cho việc nhúng (embedding)""" all_items = [] # Thêm chunks for chunk in self.chunks: # Tìm chapter từ chunk ID chunk_id = chunk.get("id", "") chapter = "unknown" if chunk_id.startswith("bai1_"): chapter = "bai1" elif chunk_id.startswith("bai2_"): chapter = "bai2" elif chunk_id.startswith("bai3_"): chapter = "bai3" elif chunk_id.startswith("bai4_"): chapter = "bai4" elif "phuluc" in chunk_id.lower(): chapter = "phuluc" content = chunk.get("content", "") if chunk.get("title"): content = f"Tiêu đề: {chunk.get('title')}\n\nNội dung: {content}" # Xử lý age_range - convert list thành string và tách thành min/max age_range = chunk.get("age_range", [0, 100]) age_min = age_range[0] if len(age_range) > 0 else 0 age_max = age_range[1] if len(age_range) > 1 else 100 age_range_str = f"{age_min}-{age_max}" # Xử lý related_chunks - convert list thành string related_chunks = chunk.get("related_chunks", []) related_chunks_str = ",".join(related_chunks) if related_chunks else "" embedding_item = { "content": content, "metadata": { "chunk_id": chunk_id, "chapter": chapter, "title": chunk.get("title", ""), "content_type": chunk.get("content_type", "text"), "age_range": age_range_str, "age_min": age_min, "age_max": age_max, "summary": chunk.get("summary", ""), "pages": chunk.get("pages", ""), "related_chunks": related_chunks_str, "word_count": chunk.get("word_count", 0), "token_count": chunk.get("token_count", 0), "contains_table": chunk.get("contains_table", False), "contains_figure": chunk.get("contains_figure", False), "created_at": datetime.datetime.now().isoformat() }, "id": chunk_id } all_items.append(embedding_item) # Thêm tables for table in self.tables: # Tìm chapter từ table ID table_id = table.get("id", "") chapter = "unknown" if table_id.startswith("bai1_"): chapter = "bai1" elif table_id.startswith("bai2_"): chapter = "bai2" elif table_id.startswith("bai3_"): chapter = "bai3" elif table_id.startswith("bai4_"): chapter = "bai4" elif "phuluc" in table_id.lower(): chapter = "phuluc" content = table.get("content", "") if table.get("title"): content = f"Bảng: {table.get('title')}\n\nNội dung: {content}" # Xử lý age_range age_range = table.get("age_range", [0, 100]) age_min = age_range[0] if len(age_range) > 0 else 0 age_max = age_range[1] if len(age_range) > 1 else 100 age_range_str = f"{age_min}-{age_max}" # Xử lý related_chunks và table_columns related_chunks = table.get("related_chunks", []) related_chunks_str = ",".join(related_chunks) if related_chunks else "" table_columns = table.get("table_columns", []) table_columns_str = ",".join(table_columns) if table_columns else "" embedding_item = { "content": content, "metadata": { "chunk_id": table_id, "chapter": chapter, "title": table.get("title", ""), "content_type": "table", "age_range": age_range_str, "age_min": age_min, "age_max": age_max, "summary": table.get("summary", ""), "pages": table.get("pages", ""), "related_chunks": related_chunks_str, "table_columns": table_columns_str, "word_count": table.get("word_count", 0), "token_count": table.get("token_count", 0), "created_at": datetime.datetime.now().isoformat() }, "id": table_id } all_items.append(embedding_item) # Thêm figures for figure in self.figures: # Tìm chapter từ figure ID figure_id = figure.get("id", "") chapter = "unknown" if figure_id.startswith("bai1_"): chapter = "bai1" elif figure_id.startswith("bai2_"): chapter = "bai2" elif figure_id.startswith("bai3_"): chapter = "bai3" elif figure_id.startswith("bai4_"): chapter = "bai4" elif "phuluc" in figure_id.lower(): chapter = "phuluc" content = figure.get("content", "") if figure.get("title"): content = f"Hình: {figure.get('title')}\n\nMô tả: {content}" # Xử lý age_range age_range = figure.get("age_range", [0, 100]) age_min = age_range[0] if len(age_range) > 0 else 0 age_max = age_range[1] if len(age_range) > 1 else 100 age_range_str = f"{age_min}-{age_max}" # Xử lý related_chunks related_chunks = figure.get("related_chunks", []) related_chunks_str = ",".join(related_chunks) if related_chunks else "" embedding_item = { "content": content, "metadata": { "chunk_id": figure_id, "chapter": chapter, "title": figure.get("title", ""), "content_type": "figure", "age_range": age_range_str, "age_min": age_min, "age_max": age_max, "summary": figure.get("summary", ""), "pages": figure.get("pages", ""), "related_chunks": related_chunks_str, "image_path": figure.get("image_path", ""), "created_at": datetime.datetime.now().isoformat() }, "id": figure_id } all_items.append(embedding_item) return all_items def count_items_by_prefix(self, prefix: str) -> Dict[str, int]: """Đếm số lượng items theo tiền tố ID""" chunks_count = sum(1 for chunk in self.chunks if chunk.get("id", "").startswith(prefix)) tables_count = sum(1 for table in self.tables if table.get("id", "").startswith(prefix)) figures_count = sum(1 for figure in self.figures if figure.get("id", "").startswith(prefix)) return { "chunks": chunks_count, "tables": tables_count, "figures": figures_count, "total": chunks_count + tables_count + figures_count } def get_stats(self) -> Dict[str, Any]: """Lấy thống kê về dữ liệu đã tải""" stats = { "total_chunks": len(self.chunks), "total_tables": len(self.tables), "total_figures": len(self.figures), "total_items": len(self.chunks) + len(self.tables) + len(self.figures), "by_lesson": {}, "by_age": {} } # Thống kê theo bài for item in os.listdir(self.data_dir): if os.path.isdir(os.path.join(self.data_dir, item)): item_stats = self.count_items_by_prefix(f"{item}_") stats["by_lesson"][item] = item_stats # Thống kê theo độ tuổi age_ranges = {} for chunk in self.chunks + self.tables + self.figures: age_range = chunk.get("age_range", [0, 100]) if len(age_range) == 2: range_key = f"{age_range[0]}-{age_range[1]}" if range_key not in age_ranges: age_ranges[range_key] = 0 age_ranges[range_key] += 1 stats["by_age"] = age_ranges return stats