| from typing import Any, Dict, List |
| from difflib import SequenceMatcher |
| import json, os, time, re |
| import gradio as gr |
|
|
| |
| current_dir = os.path.dirname(os.path.abspath(__file__)) |
| converted_path = os.path.join(current_dir, "converted") |
| chunks_path = os.path.join(current_dir, "chunks") |
|
|
| def extract_sequence_from_id(chunk_id: str) -> int: |
| """Trích xuất sequence number từ chunk ID""" |
| |
| match = re.search(r'::C(\d+)$', chunk_id) |
| if match: |
| return int(match.group(1)) |
| return 0 |
|
|
| def load_document_chunks(doc_id: str) -> List[Dict]: |
| """Load tất cả chunks của một document và sắp xếp theo thứ tự""" |
| manifest_path = os.path.join(chunks_path, "chunks_manifest.json") |
| |
| if not os.path.exists(manifest_path): |
| return [] |
| |
| with open(manifest_path, "r", encoding="utf-8") as f: |
| manifest = json.load(f) |
| |
| |
| doc_chunks = [] |
| for chunk_info in manifest["chunks"]: |
| if chunk_info["id"].startswith(doc_id): |
| chunk_file_path = chunk_info["path"] |
| if os.path.exists(chunk_file_path): |
| with open(chunk_file_path, "r", encoding="utf-8") as f: |
| chunk_data = json.load(f) |
| doc_chunks.append(chunk_data) |
| |
| |
| doc_chunks.sort(key=lambda x: extract_sequence_from_id(x["id"])) |
| return doc_chunks |
|
|
| def reconstruct_document(chunks: List[Dict]) -> str: |
| """Tái tạo lại document từ các chunks""" |
| if not chunks: |
| return "" |
| |
| document_parts = [] |
| current_path = [] |
| |
| for chunk in chunks: |
| content_type = chunk.get("content_type", "text") |
| chunk_text = chunk.get("chunk_text", "") |
| path = chunk.get("path", []) |
| |
| |
| if path != current_path: |
| |
| for i, path_item in enumerate(path): |
| if i >= len(current_path) or path_item != current_path[i]: |
| |
| if path_item and path_item not in ["ROOT", "TABLE"]: |
| |
| level = i + 1 |
| header_marker = "#" * min(level, 6) |
| document_parts.append(f"\n{header_marker} {path_item}\n") |
| break |
| current_path = path |
| |
| if content_type == "table": |
| |
| document_parts.append(f"\n{chunk_text}\n") |
| else: |
| |
| if chunk_text.strip(): |
| document_parts.append(chunk_text) |
| |
| return "\n".join(document_parts) |
|
|
| def find_text_positions_in_reconstructed_doc(text_to_find: str, reconstructed_doc: str) -> List[tuple]: |
| """Tìm tất cả vị trí của text trong document đã tái tạo""" |
| positions = [] |
| start = 0 |
| |
| while True: |
| pos = reconstructed_doc.find(text_to_find, start) |
| if pos == -1: |
| break |
| positions.append((pos, pos + len(text_to_find))) |
| start = pos + 1 |
| |
| return positions |
|
|
| def highlight_text_in_reconstructed_doc(texts_to_highlight: List[str], reconstructed_doc: str, chunks: List[Dict] = None) -> str: |
| """Highlight text trong document đã tái tạo""" |
| if not texts_to_highlight: |
| return reconstructed_doc |
| |
| |
| highlighted_doc = reconstructed_doc |
| |
| |
| sorted_texts = sorted(texts_to_highlight, key=len, reverse=True) |
| |
| for i, text in enumerate(sorted_texts): |
| if not text.strip(): |
| continue |
| |
| |
| positions = find_text_positions_in_reconstructed_doc(text, highlighted_doc) |
| |
| |
| if not positions and chunks: |
| for chunk in chunks: |
| chunk_embedding = chunk.get('chunk_for_embedding', '') |
| if text in chunk_embedding: |
| |
| highlighted_doc += f"\n\n{text}" |
| positions = [(len(highlighted_doc) - len(text), len(highlighted_doc))] |
| break |
| |
| |
| for start, end in reversed(positions): |
| highlighted_text = f'<span style="color:green; font-weight:bold; background-color:yellow;">{text}</span>' |
| highlighted_doc = highlighted_doc[:start] + highlighted_text + highlighted_doc[end:] |
| |
| return highlighted_doc |
|
|
| def highlight_doc_with_chunks(doc_id: str, texts: List[str]) -> str: |
| """Highlight document sử dụng chunks thay vì file markdown gốc""" |
| |
| chunks = load_document_chunks(doc_id) |
| |
| if not chunks: |
| return f"⚠️ Không tìm thấy chunks cho document {doc_id}" |
| |
| |
| reconstructed_doc = reconstruct_document(chunks) |
| |
| if not reconstructed_doc.strip(): |
| return f"⚠️ Document {doc_id} không có nội dung" |
| |
| |
| highlighted_doc = highlight_text_in_reconstructed_doc(texts, reconstructed_doc, chunks) |
| |
| |
| highlighted_count = 0 |
| for text in texts: |
| if text.strip() and text in reconstructed_doc: |
| highlighted_count += 1 |
| |
| total = len([t for t in texts if t.strip()]) |
| success_rate = (highlighted_count / total * 100) if total > 0 else 0.0 |
| |
| |
| total_tokens = sum(chunk.get('token_count', 0) for chunk in chunks) |
| total_chars = sum(chunk.get('text_length_chars', 0) for chunk in chunks) |
| |
| summary = f""" |
| <div style='background-color: #f0f0f0; padding: 10px; margin: 10px 0; border-radius: 5px;'> |
| <h3>Highlight Summary:</h3> |
| <p><strong>Document ID:</strong> {doc_id}</p> |
| <p><strong>Total chunks:</strong> {len(chunks)}</p> |
| <p><strong>Total tokens:</strong> {total_tokens:,}</p> |
| <p><strong>Total characters:</strong> {total_chars:,}</p> |
| <p><strong>Avg tokens per chunk:</strong> {total_tokens/len(chunks):.1f}</p> |
| <p><strong>Total texts to highlight:</strong> {total}</p> |
| <p><strong>Actually highlighted:</strong> {highlighted_count}</p> |
| <p><strong>Success rate:</strong> {success_rate:.1f}%</p> |
| </div> |
| """ |
| |
| return summary + f"<pre style='white-space: pre-wrap;'>{highlighted_doc}</pre>" |
|
|
| def process_json_file(json_file: str) -> List[str]: |
| """Đọc JSON đầu vào, trả về tối đa 5 output để hiển thị""" |
| if hasattr(json_file, 'name'): |
| file_path = json_file.name |
| else: |
| file_path = json_file |
| |
| with open(file_path, "r", encoding="utf-8") as f: |
| session_output = json.load(f) |
|
|
| |
| doc_ids_set = set(item["doc_id"] for item in session_output) |
| chunks_retrieved = [{ |
| "doc_id": doc_id, |
| "texts": [item["text"] for item in session_output if item["doc_id"] == doc_id] |
| } for doc_id in doc_ids_set] |
|
|
| |
| highlighted_texts = [] |
| for chunk in chunks_retrieved: |
| highlighted_text = highlight_doc_with_chunks(chunk["doc_id"], chunk["texts"]) |
| highlighted_texts.append(highlighted_text) |
|
|
| |
| while len(highlighted_texts) < 5: |
| highlighted_texts.append("") |
|
|
| return highlighted_texts[:5] |
|
|
| |
| with gr.Blocks(title="RAG Document Viewer") as demo: |
| gr.Markdown("# RAG Document Viewer") |
| gr.Markdown("Upload a JSON file containing RAG results to view highlighted documents") |
| |
| with gr.Row(): |
| json_file_input = gr.File(label="JSON File", type="file") |
| submit_btn = gr.Button("Submit", variant="primary") |
| |
| with gr.Row(): |
| html_outputs = [] |
| for i in range(5): |
| with gr.TabItem(f"Document {i+1}"): |
| html_outputs.append(gr.HTML()) |
|
|
| submit_btn.click( |
| fn=process_json_file, |
| inputs=[json_file_input], |
| outputs=html_outputs |
| ) |
|
|
| if __name__ == "__main__": |
| demo.launch() |
|
|