Course_Project / app.py
fruitpicker01's picture
Update app.py
0dbbb57 verified
import os
import json
import pickle
import tempfile
from pathlib import Path
from typing import Optional, Dict, Any, List, Tuple
import traceback
import re
from datetime import datetime
try:
from pydantic import BaseModel, Field
HAS_PYDANTIC = True
except ImportError:
HAS_PYDANTIC = False
print("⚠️ Pydantic не установлен, структурированный вывод недоступен")
try:
import numpy as np
import faiss
HAS_FAISS = True
except ImportError:
HAS_FAISS = False
print("⚠️ FAISS не установлен, будет использован поиск по ключевым словам")
try:
import gradio as gr
HAS_GRADIO = True
except ImportError:
HAS_GRADIO = False
print("⚠️ Gradio не установлен")
from openai import OpenAI
# Pydantic модели для структурированного вывода
if HAS_PYDANTIC:
class SourceInfo(BaseModel):
"""Информация об источнике"""
page: int = Field(description="Номер страницы в отчете")
relevance_score: float = Field(description="Оценка релевантности от 0 до 1")
content_preview: str = Field(description="Краткое описание содержимого")
class ThinkingProcess(BaseModel):
"""Процесс рассуждений (Chain-of-Thought)"""
question_analysis: str = Field(description="Анализ вопроса пользователя")
information_found: str = Field(description="Найденная в источниках информация")
reasoning_steps: List[str] = Field(description="Шаги логических рассуждений")
conclusion: str = Field(description="Выводы на основе анализа")
class FinancialAnswer(BaseModel):
"""Структурированный ответ по финансовой отчетности"""
thinking: ThinkingProcess = Field(description="Процесс рассуждений")
answer: str = Field(description="Основной ответ на вопрос")
confidence: float = Field(description="Уверенность в ответе от 0 до 1")
sources: List[SourceInfo] = Field(description="Использованные источники")
key_metrics: Optional[Dict[str, Any]] = Field(description="Ключевые числовые показатели", default=None)
timestamp: str = Field(default_factory=lambda: datetime.now().isoformat())
class VectorRAGSystem:
"""RAG система с векторным поиском и резервным режимом"""
def __init__(self):
self.chunks = []
self.word_index = {}
self.faiss_index = None
self.metadata = {}
self.client = None
self.is_initialized = False
self.pdf_doc = None
self.page_texts = {} # Кеш текстов страниц
# Модели и параметры
self.embedding_model = "text-embedding-3-large"
self.embedding_dim = 3072
self.generation_model = "gpt-4o"
self.reranking_model = "gpt-4o-mini"
# Параметры поиска
self.max_chunks_for_rerank = 15
self.final_chunks_count = 5
self.vector_search_k = 20
# Режим работы
self.vector_mode = HAS_FAISS
def initialize_with_api_key(self, api_key: str) -> Tuple[str, str]:
"""Инициализация системы с API ключом"""
try:
if not api_key.strip():
return "❌ Введите OpenAI API ключ", ""
# Инициализация OpenAI клиента
self.client = OpenAI(api_key=api_key.strip())
# Загрузка данных
if not self.load_data():
return "❌ Ошибка загрузки данных", ""
self.is_initialized = True
stats = self._generate_stats()
return "✅ Векторная RAG система инициализирована", stats
except Exception as e:
return f"❌ Ошибка инициализации: {str(e)}", ""
def load_data(self) -> bool:
"""Загрузка векторных данных"""
try:
# Загружаем только векторные данные
if self.vector_mode and self.load_vector_data():
return True
print("❌ Векторные данные не найдены или не удалось загрузить")
return False
except Exception as e:
print(f"❌ Ошибка загрузки данных: {e}")
return False
def load_vector_data(self) -> bool:
"""Загрузка векторных данных и сохранение полной metadata_list с caption."""
try:
print("🔄 Попытка загрузки векторных данных...")
faiss_file = "chunks_flatip.faiss"
metadata_file = "metadata.json"
if not all(os.path.exists(f) for f in [faiss_file, metadata_file]):
print("📁 Векторные файлы не найдены")
return False
# 1) Читаем весь список метаданных, сохраняем его
with open(metadata_file, 'r', encoding='utf-8') as f:
metadata_list = json.load(f)
self.metadata_list = metadata_list
# 2) Строим self.chunks, сохраняя каждый item целиком
self.chunks = []
for i, item in enumerate(metadata_list):
chunk_id = item.get("chunk_id",
item.get("table_id",
item.get("img_id", None)))
self.chunks.append({
"page": item["page"],
"chunk_id": chunk_id,
"chunk_index": i,
"text": "", # заполним в vector_search
"metadata": item # здесь есть caption, type и т.д.
})
# 3) Загружаем FAISS-индекс
if HAS_FAISS:
self.faiss_index = faiss.read_index(faiss_file)
# 4) Загружаем PDF для parent-page enrichment
pdf_path = "data/Сбер 2023.pdf"
if os.path.exists(pdf_path):
import fitz
self.pdf_doc = fitz.open(pdf_path)
print(f"✅ PDF загружен: {self.pdf_doc.page_count} страниц")
else:
print("❌ PDF не найден для enrichment")
self.pdf_doc = None
print(f"✅ Загружены векторы: {len(self.chunks)} чанков")
return True
except Exception as e:
print(f"❌ Ошибка load_vector_data: {e}")
return False
def get_page_text(self, page_num: int) -> str:
"""Получение полного текста страницы с кешированием"""
if page_num in self.page_texts:
return self.page_texts[page_num]
try:
if not self.pdf_doc or page_num < 1 or page_num > self.pdf_doc.page_count:
return ""
page = self.pdf_doc[page_num - 1] # PyMuPDF использует 0-based индексы
text = page.get_text()
# Кешируем текст
self.page_texts[page_num] = text
return text
except Exception as e:
print(f"❌ Ошибка получения текста страницы {page_num}: {e}")
return ""
def _generate_stats(self) -> str:
"""Генерация статистики системы"""
total_chunks = len(self.chunks)
mode = "Векторный поиск" if self.vector_mode and self.faiss_index else "Базовый режим"
structured_output = "✅ Pydantic" if HAS_PYDANTIC else "❌ Недоступно"
pdf_enrichment = "✅ Активен" if self.pdf_doc else "❌ Недоступен"
stats = f"""🧠 **Advanced RAG система с Chain-of-Thought готова!**
📊 **Технические характеристики:**
- 📦 Векторных эмбеддингов: {total_chunks}
- 🔍 Режим поиска: {mode}
- 🧠 Модель генерации: {self.generation_model}
- 🎯 LLM реранкинг: {self.reranking_model}
- 📄 Parent-page enrichment: {pdf_enrichment}
- 📋 Структурированный вывод: {structured_output}
🚀 **Архитектурные особенности:**
- 📚 **Предобработка** PDF файла (текст и таблицы) через pdfplumber
- 🔎 **Векторный поиск** с text-embedding-3-large
- 📄 **Parent-page enrichment** через PyMuPDF
- 🧠 **LLM реранкинг** для повышения релевантности
- 🤔 **Chain-of-Thought** рассуждения
- 📋 **JSON Schema** для структурированных ответов
- 📊 **Confidence scoring** и детальная аналитика
💡 **Готова к интеллектуальному анализу отчета ПАО Сбербанк 2023!**"""
return stats
def search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]:
"""Основной метод поиска"""
if self.vector_mode and self.faiss_index and self.client:
return self.vector_search(query, k)
else:
print("⚠️ Векторный режим отключен")
return []
def vector_search(self, query: str, k: int = 20) -> List[Tuple[Dict, float]]:
"""Векторный поиск + enrichment с caption из metadata_list."""
try:
response = self.client.embeddings.create(
model=self.embedding_model,
input=[query]
)
q_emb = np.array(response.data[0].embedding, dtype=np.float32).reshape(1, -1)
faiss.normalize_L2(q_emb)
scores, indices = self.faiss_index.search(q_emb, k)
results = []
for score, idx in zip(scores[0], indices[0]):
if 0 <= idx < len(self.chunks):
record = self.chunks[idx].copy()
meta_item = self.metadata_list[idx]
# базовый текст страницы
page_text = self.get_page_text(record["page"]) or ""
# если это картинка и есть caption — добавляем его сверху
if meta_item.get("type") == "image" and meta_item.get("caption"):
caption = meta_item["caption"]
record["text"] = caption + "\n\n" + page_text
else:
record["text"] = page_text
results.append((record, float(score)))
return results
except Exception as e:
print(f"❌ Ошибка vector_search: {e}")
return []
def rerank_with_llm(self, query: str, chunks: List[Tuple[Dict, float]]) -> List[Tuple[Dict, float]]:
"""LLM реранкинг результатов"""
if not chunks or not self.client:
return chunks
try:
chunks_to_rerank = chunks[:self.max_chunks_for_rerank]
docs_text = ""
for i, (chunk, _) in enumerate(chunks_to_rerank):
preview = chunk['text'][:300] + "..." if len(chunk['text']) > 300 else chunk['text']
docs_text += f"\nДокумент {i+1} (стр. {chunk['page']}):\n{preview}\n"
prompt = f"""Оцени релевантность каждого документа для ответа на вопрос по шкале 1-10.
Вопрос: {query}
Документы:{docs_text}
Инструкции:
1. Оценивай точность и полноту информации для ответа
2. Высшие баллы (8-10) - прямой ответ на вопрос
3. Средние баллы (5-7) - частично релевантная информация
4. Низкие баллы (1-4) - слабо связано с вопросом
Верни только числа через запятую (например: 8,6,9,4,7):"""
response = self.client.chat.completions.create(
model=self.reranking_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=100,
temperature=0
)
scores_text = response.choices[0].message.content.strip()
numbers = re.findall(r'\d+\.?\d*', scores_text)
scores = [max(0, min(10, float(num))) for num in numbers]
reranked = []
for i, (chunk, original_score) in enumerate(chunks):
rerank_score = scores[i] if i < len(scores) else 0
reranked.append((chunk, rerank_score))
reranked.sort(key=lambda x: x[1], reverse=True)
return reranked
except Exception as e:
print(f"❌ Ошибка реранкинга: {e}")
return chunks
def generate_answer(self, query: str, context_chunks: List[Tuple[Dict, float]]) -> str:
"""Генерация ответа с Chain-of-Thought и структурированным выводом"""
if not self.client:
return "❌ OpenAI API не настроен"
try:
# Подготавливаем контекст с метаинформацией
context_parts = []
sources_info = []
for i, (chunk, score) in enumerate(context_chunks[:self.final_chunks_count]):
text = chunk.get('text', '')
clean_text = text.encode('utf-8', errors='ignore').decode('utf-8')
# Ограничиваем длину для лучшей обработки
if len(clean_text) > 1500:
clean_text = clean_text[:1500] + "..."
context_parts.append(f"Источник {i+1} (страница {chunk['page']}):\n{clean_text}")
sources_info.append({
"page": chunk['page'],
"score": float(score),
"preview": clean_text[:200] + "..." if len(clean_text) > 200 else clean_text
})
context = "\n\n".join(context_parts)
clean_query = query.encode('utf-8', errors='ignore').decode('utf-8')
# Используем структурированный вывод, если доступен Pydantic
if HAS_PYDANTIC:
return self._generate_structured_answer(clean_query, context, sources_info)
else:
return self._generate_simple_answer(clean_query, context)
except Exception as e:
return f"❌ Ошибка генерации ответа: {str(e)}"
def _generate_structured_answer(self, query: str, context: str, sources_info: List[Dict]) -> str:
"""Генерация структурированного ответа с Chain-of-Thought"""
try:
# JSON Schema для принуждения к структуре
schema = FinancialAnswer.model_json_schema()
prompt = f"""Ты — эксперт по анализу финансовых отчетов ПАО Сбербанк. Проанализируй вопрос пользователя, используя Chain-of-Thought.
ВОПРОС: {query}
КОНТЕКСТ ИЗ ОТЧЕТА:
{context}
Отвечай строго JSON со следующими полями:
{{
"thinking": {{
"question_analysis": "<анализ вопроса>",
"information_found": "<что найдено в источниках>",
"reasoning_steps": ["<шаг 1>", "<шаг 2>", "..."],
"conclusion": "<краткий вывод>"
}},
"answer": "<основной ответ на русском языке>",
"confidence": <число от 0 до 1>,
"sources": [
{{
"page": <номер страницы>,
"relevance_score": <0–1>,
"content_preview": "<превью текста>"
}},
]
}}"""
response = self.client.chat.completions.create(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=2000,
temperature=0.1,
response_format={"type": "json_object"}
)
json_response = response.choices[0].message.content.strip()
print("Raw JSON response:", json_response)
# Парсим и валидируем JSON
parsed = json.loads(json_response)
validated = FinancialAnswer(**parsed)
return self._format_structured_response(validated)
except Exception as e:
print(f"⚠️ Ошибка в _generate_structured_answer: {e}")
return self._generate_simple_answer(query, context)
def _generate_simple_answer(self, query: str, context: str) -> str:
"""Генерация простого ответа с Chain-of-Thought (fallback)"""
prompt = f"""Ты - эксперт по анализу финансовых отчетов. Ответь на вопрос, используя Chain-of-Thought рассуждения.
ВОПРОС: {query}
КОНТЕКСТ ИЗ ОТЧЕТА:
{context}
ФОРМАТ ОТВЕТА:
🤔 **АНАЛИЗ ВОПРОСА:**
[Что именно спрашивает пользователь]
📊 **НАЙДЕННАЯ ИНФОРМАЦИЯ:**
[Какие данные есть в источниках]
🔍 **РАССУЖДЕНИЯ:**
[Логические шаги анализа]
✅ **ВЫВОДЫ:**
[Финальный ответ с конкретными данными]
ИНСТРУКЦИИ:
- Отвечай только на основе предоставленной информации
- Используй конкретные данные и цифры из отчета
- Указывай номера страниц при цитировании
- Отвечай на русском языке"""
response = self.client.chat.completions.create(
model=self.generation_model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1500,
temperature=0.1
)
return response.choices[0].message.content.strip()
def _format_structured_response(self, response: 'FinancialAnswer') -> str:
"""Форматирование структурированного ответа для отображения"""
formatted = f"""🤔 **ПРОЦЕСС РАССУЖДЕНИЙ:**
📝 **Анализ вопроса:** {response.thinking.question_analysis}
📊 **Найденная информация:** {response.thinking.information_found}
🔍 **Шаги рассуждений:**
"""
for i, step in enumerate(response.thinking.reasoning_steps, 1):
formatted += f"{i}. {step}\n"
formatted += f"\n💡 **Выводы:** {response.thinking.conclusion}\n"
formatted += f"\n✅ **ФИНАЛЬНЫЙ ОТВЕТ:**\n{response.answer}\n"
formatted += f"\n📊 **Уверенность:** {response.confidence:.1%}\n"
if response.key_metrics:
formatted += f"\n📈 **Ключевые показатели:**\n"
for key, value in response.key_metrics.items():
formatted += f"- {key}: {value}\n"
formatted += f"\n📚 **Источники:**\n"
for i, source in enumerate(response.sources, 1):
formatted += f"{i}. Страница {source.page} (релевантность: {source.relevance_score:.1%})\n"
formatted += f" {source.content_preview}\n"
return formatted
def process_query(self, query: str) -> Dict[str, Any]:
"""Обработка пользовательского запроса"""
if not self.is_initialized:
return {
"answer": "❌ Система не инициализирована. Введите API ключ.",
"sources": [],
"debug_info": {}
}
if not query.strip():
return {
"answer": "Пожалуйста, введите ваш вопрос.",
"sources": [],
"debug_info": {}
}
try:
# Поиск
search_results = self.search(query, k=self.vector_search_k)
if not search_results:
return {
"answer": "К сожалению, не удалось найти релевантную информацию по вашему вопросу.",
"sources": [],
"debug_info": {"step": "search", "results_count": 0}
}
# Реранкинг
reranked_results = self.rerank_with_llm(query, search_results)
# Генерация ответа
answer = self.generate_answer(query, reranked_results)
# Подготовка источников
sources = []
# Создаем словарь для быстрого поиска search_score по chunk_index
search_scores = {}
for chunk, score in search_results:
search_scores[chunk.get("chunk_index", -1)] = score
for chunk, score in reranked_results[:self.final_chunks_count]:
sources.append({
"page": chunk["page"],
"search_score": search_scores.get(chunk.get("chunk_index", -1), 0),
"rerank_score": score,
"preview": chunk["text"][:200] + "..." if len(chunk["text"]) > 200 else chunk["text"]
})
debug_info = {
"search_results": len(search_results),
"reranked_results": len(reranked_results),
"final_chunks": len(sources),
"search_method": "vector" if self.vector_mode else "keyword"
}
return {
"answer": answer,
"sources": sources,
"debug_info": debug_info
}
except Exception as e:
print(f"❌ Ошибка обработки запроса: {e}")
traceback.print_exc()
return {
"answer": f"❌ Ошибка обработки запроса: {str(e)}",
"sources": [],
"debug_info": {"error": str(e)}
}
# Глобальная переменная системы
rag_system = VectorRAGSystem()
def initialize_system(api_key: str) -> Tuple[str, str]:
"""Инициализация системы"""
return rag_system.initialize_with_api_key(api_key)
def ask_question(question: str) -> Tuple[str, str]:
"""Обработка вопроса"""
result = rag_system.process_query(question)
answer = result["answer"]
# Форматируем информацию об источниках
sources_info = ""
if result["sources"]:
sources_info = "\n📚 **Источники:**\n"
for i, source in enumerate(result["sources"], 1):
sources_info += f"\n**{i}.** Страница {source['page']} "
sources_info += f"(поиск: {source['search_score']:.3f}, "
sources_info += f"релевантность: {source['rerank_score']:.1f}/10)\n"
sources_info += f"*Превью:* {source['preview']}\n"
# Добавляем отладочную информацию
if result.get("debug_info"):
debug = result["debug_info"]
sources_info += f"\n🔍 **Статистика поиска:**\n"
sources_info += f"- Метод поиска: {debug.get('search_method', 'unknown')}\n"
sources_info += f"- Найдено результатов: {debug.get('search_results', 0)}\n"
sources_info += f"- После реранкинга: {debug.get('reranked_results', 0)}\n"
sources_info += f"- Использовано в ответе: {debug.get('final_chunks', 0)}\n"
return answer, sources_info
def create_demo_interface():
"""Создание демо интерфейса"""
if not HAS_GRADIO:
print("❌ Gradio не установлен. Установите: pip install gradio")
return None
with gr.Blocks(
title="Vector RAG Demo - Сбер 2023",
theme=gr.themes.Soft(),
css="""
.main-header { text-align: center; margin-bottom: 2rem; }
.feature-box { background-color: #f8f9fa; padding: 1rem; border-radius: 8px; margin: 1rem 0; }
"""
) as demo:
gr.Markdown("""
<div class="main-header">
<h1>🧠 Advanced RAG with Chain-of-Thought: Анализ отчета Сбера 2023</h1>
<p>Интеллектуальная система с векторным поиском, LLM реранкингом и структурированными рассуждениями</p>
<p><strong>text-embedding-3-large • FAISS • GPT-4o • JSON Schema • Chain-of-Thought • Parent-page enrichment</strong></p>
</div>
""")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("### ⚙️ Настройка")
api_key_input = gr.Textbox(
label="OpenAI API Key",
placeholder="sk-proj-...",
type="password",
info="Введите ваш OpenAI API ключ"
)
init_btn = gr.Button("🚀 Инициализировать", variant="primary")
status_output = gr.Textbox(
label="Статус",
interactive=False,
lines=2
)
with gr.Column(scale=1):
stats_output = gr.Markdown("### 📊 Ожидание инициализации...")
gr.Markdown("### 💬 Задайте вопрос")
with gr.Row():
question_input = gr.Textbox(
label="Ваш вопрос",
placeholder="Например: Каковы основные финансовые показатели Сбера за 2023 год?",
lines=2,
scale=4
)
ask_btn = gr.Button("🔍 Поиск", variant="primary", scale=1)
with gr.Row():
with gr.Column(scale=2):
answer_output = gr.Textbox(
label="Ответ системы",
lines=12,
interactive=False
)
with gr.Column(scale=1):
sources_output = gr.Textbox(
label="Источники и статистика",
lines=12,
interactive=False
)
# Примеры вопросов
gr.Markdown("""
### 💡 Примеры вопросов:
- Каковы основные финансовые показатели Сбера за 2023 год?
- Какова чистая прибыль банка в 2023 году?
- Расскажите о кредитном портфеле Сбербанка
- Какие технологические инициативы развивает Сбер?
- Каковы показатели рентабельности банка?
- Какие ESG инициативы реализует Сбер?
""")
# Event handlers
init_btn.click(
fn=initialize_system,
inputs=[api_key_input],
outputs=[status_output, stats_output]
)
ask_btn.click(
fn=ask_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
question_input.submit(
fn=ask_question,
inputs=[question_input],
outputs=[answer_output, sources_output]
)
return demo
# Запуск для Hugging Face Spaces
demo = create_demo_interface()
if __name__ == "__main__":
if demo:
demo.launch()
else:
print("❌ Не удалось создать интерфейс")