Spaces:
Running
Running
import os | |
import logging | |
import json | |
import shutil | |
from pathlib import Path | |
import pandas as pd | |
from datetime import datetime, timedelta | |
# Імпорт LlamaIndex компонентів | |
from llama_index.core import ( | |
VectorStoreIndex, | |
Document, | |
StorageContext, | |
load_index_from_storage, | |
Settings | |
) | |
from llama_index.core.node_parser import TokenTextSplitter | |
from llama_index.retrievers.bm25 import BM25Retriever | |
from llama_index.vector_stores.faiss import FaissVectorStore | |
from llama_index.core.schema import TextNode | |
from llama_index.core.storage.docstore import SimpleDocumentStore | |
import faiss | |
from modules.config.paths import INDICES_DIR | |
from modules.data_management.hash_utils import generate_data_hash | |
from modules.data_management.index_utils import ( | |
check_indexing_availability, | |
initialize_embedding_model, | |
check_index_integrity | |
) | |
from modules.config.ai_settings import ( | |
get_metadata_csv, | |
) | |
# Встановлюємо формат збереження на бінарний (не JSON) | |
Settings.persist_json_format = False | |
logger = logging.getLogger(__name__) | |
class UnifiedIndexManager: | |
""" | |
Уніфікований менеджер для створення та управління індексами даних. | |
""" | |
def __init__(self, base_indices_dir=None): | |
""" | |
Ініціалізація менеджера індексів. | |
Args: | |
base_indices_dir (str, optional): Базова директорія для зберігання індексів | |
""" | |
self.base_indices_dir = Path(base_indices_dir) if base_indices_dir else INDICES_DIR | |
self.base_indices_dir.mkdir(exist_ok=True, parents=True) | |
# Перевірка доступності модулів для індексування | |
self.indexing_available = check_indexing_availability("temp/indices") | |
if not self.indexing_available: | |
logger.warning("Функціональність індексування недоступна. Встановіть необхідні пакети.") | |
def get_or_create_indices(self, df, session_id=None): | |
""" | |
Отримання або створення індексів для даних. | |
Args: | |
df (pandas.DataFrame): DataFrame з даними | |
session_id (str, optional): Ідентифікатор сесії | |
Returns: | |
dict: Інформація про індекси | |
""" | |
if not self.indexing_available: | |
return {"error": "Функціональність індексування недоступна. Встановіть необхідні пакети."} | |
try: | |
# Генеруємо хеш для даних | |
data_hash = generate_data_hash(df, key_columns=['Issue key', 'Summary', 'Status', 'Issue Type', 'Created', 'Updated']) | |
if not data_hash: | |
return {"error": "Не вдалося згенерувати хеш для даних"} | |
# Перевіряємо, чи існують індекси для цих даних | |
existing_indices = self._find_indices_by_hash(data_hash) | |
if existing_indices: | |
# Перевіряємо цілісність індексів | |
is_valid, message = check_index_integrity(existing_indices) | |
if is_valid: | |
logger.info(f"Знайдено існуючі індекси для даних з хешем {data_hash}") | |
return { | |
"success": True, | |
"indices_dir": str(existing_indices), | |
"data_hash": data_hash, | |
"reused_existing": True | |
} | |
else: | |
logger.warning(f"Знайдено індекси з відповідним хешем, але вони не пройшли перевірку цілісності: {message}") | |
# Створюємо нові індекси | |
# Визначаємо директорію для індексів | |
if session_id: | |
indices_path = self.base_indices_dir / session_id | |
else: | |
# Якщо не вказано session_id, використовуємо поточну дату і час | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
indices_path = self.base_indices_dir / timestamp | |
indices_path.mkdir(exist_ok=True, parents=True) | |
# Створюємо нові індекси | |
result = self._create_new_indices(indices_path, session_id, data_hash, df) | |
# Форматуємо результат | |
if isinstance(result, dict): | |
return result | |
else: | |
return { | |
"success": True, | |
"indices_dir": str(indices_path), | |
"data_hash": data_hash | |
} | |
except Exception as e: | |
logger.error(f"Помилка при отриманні або створенні індексів: {e}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return {"error": f"Помилка при отриманні або створенні індексів: {str(e)}"} | |
def _find_indices_by_hash(self, data_hash): | |
""" | |
Пошук існуючих індексів за хешем даних. | |
Args: | |
data_hash (str): Хеш даних | |
Returns: | |
Path: Шлях до директорії з індексами або None, якщо не знайдено | |
""" | |
try: | |
# Перебираємо всі піддиректорії в базовій директорії індексів | |
for index_dir in self.base_indices_dir.iterdir(): | |
if not index_dir.is_dir(): | |
continue | |
# Перевіряємо метадані | |
metadata_file = index_dir / "metadata.json" | |
if not metadata_file.exists(): | |
continue | |
try: | |
with open(metadata_file, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
# Перевіряємо хеш | |
if metadata.get("data_hash") == data_hash: | |
return index_dir | |
except Exception as e: | |
logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}") | |
return None | |
except Exception as e: | |
logger.error(f"Помилка при пошуку індексів за хешем: {e}") | |
return None | |
def _create_new_indices(self, indices_path, session_id, data_hash, df): | |
""" | |
Створення нових індексів. | |
Args: | |
indices_path (Path): Шлях для збереження індексів | |
session_id (str): Ідентифікатор сесії | |
data_hash (str): Хеш даних | |
df (pandas.DataFrame): DataFrame з даними | |
Returns: | |
dict: Інформація про створені індекси | |
""" | |
try: | |
# Ініціалізуємо модель ембедингів | |
embed_model = initialize_embedding_model() | |
if not embed_model: | |
return {"error": "Не вдалося ініціалізувати модель ембедингів"} | |
# Отримуємо розмірність ембедингів | |
sample_embedding = embed_model.get_text_embedding("Test") | |
embedding_dim = len(sample_embedding) | |
logger.info(f"Розмірність ембедингів: {embedding_dim}") | |
# Конвертуємо DataFrame в документи | |
documents = self._convert_dataframe_to_documents(df) | |
if not documents: | |
return {"error": "Не вдалося конвертувати дані в документи"} | |
# Створюємо ноди з документів | |
nodes = [TextNode(text=doc.text, metadata=doc.metadata) for doc in documents] | |
# Створюємо FAISS індекс | |
faiss_index = faiss.IndexFlatL2(embedding_dim) | |
vector_store = FaissVectorStore(faiss_index=faiss_index) | |
# Створюємо документне сховище | |
docstore = SimpleDocumentStore() | |
docstore.add_documents(nodes) | |
# Створюємо контекст зберігання | |
storage_context = StorageContext.from_defaults( | |
docstore=docstore, | |
vector_store=vector_store | |
) | |
# Встановлюємо модель ембедингів | |
Settings.embed_model = embed_model | |
# Створюємо індекс | |
index = VectorStoreIndex.from_documents( | |
documents, | |
storage_context=storage_context | |
) | |
# Зберігаємо індекс у файл (бінарний формат) | |
index.storage_context.persist(str(indices_path)) | |
# Створюємо BM25 retriever і зберігаємо його параметри | |
bm25_retriever = BM25Retriever.from_defaults( | |
docstore=docstore, | |
similarity_top_k=10 | |
) | |
self._save_bm25_data(indices_path, bm25_retriever) | |
# Зберігаємо метадані | |
self._save_indices_metadata(indices_path, { | |
"session_id": session_id, | |
"created_at": datetime.now().isoformat(), | |
"data_hash": data_hash, | |
"documents_count": len(documents), | |
"nodes_count": len(nodes), | |
"rows_count": len(df), | |
"columns_count": len(df.columns), | |
"embedding_model": str(embed_model), | |
"embedding_dim": embedding_dim, | |
"storage_format": "binary" | |
}) | |
# Створюємо маркерний файл для перевірки валідності індексів | |
with open(indices_path / "indices.valid", "w") as f: | |
f.write(f"Indices created at {datetime.now().isoformat()}") | |
logger.info(f"Індекси успішно створено в {indices_path}") | |
# Зберігаємо шлях глобально, якщо доступно | |
self._save_indices_path_globally(str(indices_path)) | |
return { | |
"success": True, | |
"indices_dir": str(indices_path), | |
"data_hash": data_hash, | |
"documents_count": len(documents), | |
"nodes_count": len(nodes), | |
"rows_count": len(df), | |
"reused_existing": False | |
} | |
except Exception as e: | |
logger.error(f"Помилка при створенні нових індексів: {e}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return {"error": f"Помилка при створенні нових індексів: {str(e)}"} | |
def _save_indices_metadata(self, indices_path, metadata): | |
"""Зберігає метадані індексів у файл.""" | |
try: | |
with open(indices_path / "metadata.json", "w", encoding="utf-8") as f: | |
json.dump(metadata, f, ensure_ascii=False, indent=2) | |
return True | |
except Exception as e: | |
logger.error(f"Помилка при збереженні метаданих: {e}") | |
return False | |
def _save_indices_path_globally(self, indices_path): | |
"""Зберігає шлях до індексів у глобальних об'єктах (app, index_manager).""" | |
try: | |
import builtins | |
if hasattr(builtins, 'app'): | |
builtins.app.indices_path = indices_path | |
logger.info(f"Шлях до індексів збережено глобально: {indices_path}") | |
# Якщо також є глобальний index_manager, зберігаємо в ньому | |
if hasattr(builtins, 'index_manager'): | |
builtins.index_manager.last_indices_path = indices_path | |
return True | |
except Exception as e: | |
logger.warning(f"Не вдалося зберегти шлях до індексів глобально: {e}") | |
return False | |
def _convert_dataframe_to_documents(self, df): | |
""" | |
Конвертує DataFrame у документи для індексування. | |
Кожен документ представляє один рядок CSV з усіма його полями. | |
""" | |
try: | |
# Перевірка типу даних | |
if not hasattr(df, 'iterrows'): | |
logger.error(f"Отримано не DataFrame: {type(df)}") | |
return None | |
# Конвертація в документи | |
documents = [] | |
for idx, row in df.iterrows(): | |
# Формуємо текст документа, включаючи всі основні поля | |
text_parts = [] | |
# Додаємо основні поля | |
key_fields = [ | |
('Issue key', 'Ключ задачі'), | |
('Summary', 'Заголовок'), | |
('Issue Type', 'Тип задачі'), | |
('Status', 'Статус'), | |
('Priority', 'Пріоритет'), | |
('Assignee', 'Виконавець'), | |
('Reporter', 'Автор'), | |
('Created', 'Створено'), | |
('Updated', 'Оновлено'), | |
('Project name', 'Проект') | |
] | |
for field, title in key_fields: | |
if field in row and pd.notna(row[field]): | |
text_parts.append(f"{title}: {str(row[field])}") | |
# Додаємо опис, якщо він є | |
if 'Description' in row and pd.notna(row['Description']): | |
text_parts.append(f"Опис: {str(row['Description'])}") | |
# Додаємо коментарі, якщо вони є | |
comments = [] | |
for col in df.columns: | |
if col.startswith('Comment') and pd.notna(row[col]): | |
comments.append(str(row[col])) | |
if comments: | |
text_parts.append("Коментарі:") | |
for i, comment in enumerate(comments, 1): | |
text_parts.append(f"Коментар {i}: {comment}") | |
# Додаємо інформацію про зв'язки, якщо вона є | |
links = [] | |
for col in df.columns: | |
if col.startswith('Outward issue link') and pd.notna(row[col]): | |
link_type = col.replace('Outward issue link (', '').replace(')', '') | |
links.append(f"{link_type}: {str(row[col])}") | |
if links: | |
text_parts.append("Зв'язки:") | |
for link in links: | |
text_parts.append(link) | |
# Додаємо користувацькі поля | |
custom_fields = [] | |
for col in df.columns: | |
if (col.startswith('Custom field') or col.startswith('Sprint')) and pd.notna(row[col]): | |
field_name = col.replace('Custom field (', '').replace(')', '') | |
custom_fields.append(f"{field_name}: {str(row[col])}") | |
if custom_fields: | |
text_parts.append("Додаткові поля:") | |
for field in custom_fields: | |
text_parts.append(field) | |
# Об'єднуємо все в один текст | |
text = "\n".join(text_parts) | |
# Якщо текст порожній, використовуємо хоча б заголовок | |
if not text and 'Summary' in row and pd.notna(row['Summary']): | |
text = f"Заголовок: {str(row['Summary'])}" | |
elif not text: | |
text = f"Задача {idx}" | |
# Створюємо метадані - включаємо всі основні поля | |
metadata = get_metadata_csv(row, idx) | |
# Додаємо інформацію про зв'язки в метадані | |
if 'Outward issue link (Relates)' in row and pd.notna(row['Outward issue link (Relates)']): | |
metadata["related_issues"] = row['Outward issue link (Relates)'] | |
# Створення документа | |
doc = Document( | |
text=text, | |
metadata=metadata | |
) | |
documents.append(doc) | |
logger.info(f"Створено {len(documents)} документів з DataFrame") | |
return documents | |
except Exception as e: | |
logger.error(f"Помилка при конвертації DataFrame в документи: {e}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
return [] | |
def _save_bm25_data(self, indices_path, bm25_retriever): | |
""" | |
Збереження даних для BM25 retriever. | |
""" | |
try: | |
# Створюємо директорію для BM25 | |
bm25_dir = indices_path / "bm25" | |
bm25_dir.mkdir(exist_ok=True) | |
# Зберігаємо параметри BM25 | |
bm25_params = { | |
"similarity_top_k": bm25_retriever.similarity_top_k, | |
"alpha": getattr(bm25_retriever, "alpha", 0.75), | |
"beta": getattr(bm25_retriever, "beta", 0.75), | |
"index_creation_time": datetime.now().isoformat() | |
} | |
with open(bm25_dir / "params.json", "w", encoding="utf-8") as f: | |
json.dump(bm25_params, f, ensure_ascii=False, indent=2) | |
logger.info(f"Дані BM25 збережено в {bm25_dir}") | |
return True | |
except Exception as e: | |
logger.error(f"Помилка при збереженні даних BM25: {e}") | |
return False | |
def load_indices(self, indices_dir): | |
"""Завантаження індексів з директорії.""" | |
try: | |
# Перевірка наявності директорії | |
indices_path = Path(indices_dir) | |
if not indices_path.exists(): | |
logger.error(f"Директорія індексів не існує: {indices_dir}") | |
return None, None | |
# Перевірка наявності маркерного файлу | |
marker_path = indices_path / "indices.valid" | |
if not marker_path.exists(): | |
logger.warning(f"Файл маркера не знайдено в {indices_dir}. Індекси не завантажено.") | |
return None, None | |
try: | |
# Спробуємо завантажити vector_store | |
vector_store = FaissVectorStore.from_persist_dir(indices_dir) | |
# Створюємо контекст зберігання | |
storage_context = StorageContext.from_defaults( | |
vector_store=vector_store, | |
persist_dir=indices_dir | |
) | |
# Завантажуємо індекс | |
index = load_index_from_storage( | |
storage_context=storage_context, | |
index_cls=VectorStoreIndex | |
) | |
# Створюємо BM25 retriever | |
bm25_retriever = BM25Retriever.from_defaults( | |
docstore=storage_context.docstore, | |
similarity_top_k=10 | |
) | |
# Перевіряємо наявність параметрів BM25 | |
bm25_params_path = indices_path / "bm25" / "params.json" | |
if bm25_params_path.exists(): | |
try: | |
with open(bm25_params_path, "r", encoding="utf-8") as f: | |
bm25_params = json.load(f) | |
if "similarity_top_k" in bm25_params: | |
bm25_retriever.similarity_top_k = bm25_params["similarity_top_k"] | |
except Exception as e: | |
logger.warning(f"Не вдалося завантажити параметри BM25: {e}") | |
logger.info(f"Індекси успішно завантажено з {indices_dir}") | |
return index, bm25_retriever | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні індексів: {e}") | |
import traceback | |
logger.error(traceback.format_exc()) | |
# Діагностичні повідомлення | |
logger.info(f"Файли у директорії {indices_dir}: {[f.name for f in indices_path.iterdir() if f.is_file()]}") | |
return None, None | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні індексів: {e}") | |
return None, None | |
def cleanup_old_indices(self, max_age_days=7, max_indices=20): | |
""" | |
Очищення застарілих індексів. | |
Args: | |
max_age_days (int): Максимальний вік індексів у днях | |
max_indices (int): Максимальна кількість індексів для зберігання | |
Returns: | |
int: Кількість видалених директорій | |
""" | |
try: | |
# Збираємо інформацію про всі директорії індексів | |
index_dirs = [] | |
for index_dir in self.base_indices_dir.iterdir(): | |
if not index_dir.is_dir(): | |
continue | |
# Перевіряємо метадані | |
metadata_file = index_dir / "metadata.json" | |
if not metadata_file.exists(): | |
continue | |
try: | |
with open(metadata_file, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
# Отримуємо час створення | |
created_at = metadata.get("created_at", "") | |
index_dirs.append({ | |
"path": str(index_dir), | |
"created_at": created_at | |
}) | |
except Exception as e: | |
logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}") | |
# Якщо немає директорій, виходимо | |
if not index_dirs: | |
return 0 | |
# Сортуємо директорії за часом створення (від найновіших до найстаріших) | |
index_dirs.sort(key=lambda x: x["created_at"], reverse=True) | |
# Визначаємо директорії для видалення | |
dirs_to_delete = [] | |
# 1. Залишаємо max_indices найновіших директорій | |
if len(index_dirs) > max_indices: | |
dirs_to_delete.extend(index_dirs[max_indices:]) | |
# 2. Перевіряємо, чи є серед залишених застарілі директорії | |
cutoff_date = (datetime.now() - timedelta(days=max_age_days)).isoformat() | |
for index_info in index_dirs[:max_indices]: | |
if index_info["created_at"] < cutoff_date: | |
dirs_to_delete.append(index_info) | |
# Видаляємо директорії | |
deleted_count = 0 | |
for dir_info in dirs_to_delete: | |
try: | |
dir_path = Path(dir_info["path"]) | |
if dir_path.exists(): | |
shutil.rmtree(dir_path) | |
logger.info(f"Видалено застарілу директорію індексів: {dir_path}") | |
deleted_count += 1 | |
except Exception as e: | |
logger.error(f"Помилка при видаленні директорії {dir_info['path']}: {e}") | |
return deleted_count | |
except Exception as e: | |
logger.error(f"Помилка при очищенні застарілих індексів: {e}") | |
return 0 |