Spaces:
Running
Running
import os | |
import logging | |
import json | |
import shutil | |
from pathlib import Path | |
import pandas as pd | |
from datetime import datetime, timedelta | |
import hashlib | |
import uuid | |
import faiss | |
from modules.data_management.index_utils import validate_index_directory | |
from modules.data_management.index_utils import check_indexing_availability, initialize_embedding_model | |
from modules.data_management.hash_utils import generate_data_hash | |
from modules.data_management.index_utils import check_index_integrity | |
from modules.config.paths import INDICES_DIR | |
from modules.config.ai_settings import ( | |
CHUNK_SIZE, | |
CHUNK_OVERLAP, | |
EXCLUDED_EMBED_METADATA_KEYS, | |
EXCLUDED_LLM_METADATA_KEYS | |
) | |
logger = logging.getLogger(__name__) | |
# Перевірка доступності модулів для індексування | |
INDEXING_AVAILABLE = check_indexing_availability() | |
INDEXING_MODULES = { | |
"VectorStoreIndex": None, | |
"StorageContext": None, | |
"SimpleDocumentStore": None, | |
"TokenTextSplitter": None, | |
"BM25Retriever": None, | |
"FaissVectorStore": None, | |
"Settings": None | |
} | |
def _generate_data_hash(self, df): | |
""" | |
Генерація хешу для DataFrame для ідентифікації унікальних даних. | |
Args: | |
df (pandas.DataFrame): DataFrame для хешування | |
Returns: | |
str: Хеш даних | |
""" | |
# Використовуємо основні колонки для хешування | |
key_columns = ['Issue key', 'Summary', 'Status', 'Issue Type', 'Created', 'Updated'] | |
return generate_data_hash(df, key_columns) | |
class IndexManager: | |
""" | |
Менеджер для створення та управління індексами даних (FAISS, BM25). | |
""" | |
def __init__(self, base_indices_dir="temp/indices"): | |
""" | |
Ініціалізація менеджера індексів. | |
Args: | |
base_indices_dir (str): Базова директорія для зберігання індексів | |
""" | |
self.base_indices_dir = Path(base_indices_dir) if base_indices_dir else INDICES_DIR | |
self.base_indices_dir.mkdir(exist_ok=True, parents=True) | |
# Перевірка доступності модулів для індексування | |
self.indexing_available = INDEXING_AVAILABLE | |
if not self.indexing_available: | |
logger.warning("Функціональність індексування недоступна. Встановіть необхідні пакети.") | |
def create_indices_for_session(self, session_id, merged_df, indices_dir=None): | |
""" | |
Створення індексів для даних сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
merged_df (pandas.DataFrame): DataFrame з об'єднаними даними | |
indices_dir (str, optional): Директорія для збереження індексів. | |
Якщо None, використовується директорія сесії. | |
Returns: | |
dict: Інформація про створені індекси | |
""" | |
if not self.indexing_available: | |
return {"error": "Функціональність індексування недоступна. Встановіть необхідні пакети."} | |
try: | |
# Визначаємо директорію для індексів | |
indices_path = Path(indices_dir) if indices_dir else self.base_indices_dir / session_id | |
indices_path.mkdir(exist_ok=True, parents=True) | |
# Генеруємо хеш для даних | |
data_hash = self._generate_data_hash(merged_df) | |
# Перевіряємо, чи існують індекси для цих даних | |
existing_indices = self._find_indices_by_hash(data_hash) | |
if existing_indices: | |
return self._reuse_existing_indices(existing_indices, indices_path, session_id, data_hash, merged_df) | |
# Створюємо нові індекси | |
return self._create_new_indices(indices_path, session_id, data_hash, merged_df) | |
except Exception as e: | |
logger.error(f"Помилка при створенні індексів: {e}") | |
return {"error": f"Помилка при створенні індексів: {str(e)}"} | |
def _reuse_existing_indices(self, existing_indices, indices_path, session_id, data_hash, merged_df): | |
""" | |
Повторне використання існуючих індексів. | |
Args: | |
existing_indices (str): Шлях до існуючих індексів | |
indices_path (Path): Шлях для нових індексів | |
session_id (str): Ідентифікатор сесії | |
data_hash (str): Хеш даних | |
merged_df (pandas.DataFrame): DataFrame з даними | |
Returns: | |
dict: Інформація про скопійовані індекси | |
""" | |
logger.info(f"Знайдено існуючі індекси для даних з хешем {data_hash}") | |
try: | |
# Спочатку очищаємо цільову директорію | |
if indices_path.exists(): | |
for item in indices_path.iterdir(): | |
if item.is_file(): | |
item.unlink() | |
elif item.is_dir(): | |
shutil.rmtree(item) | |
# Копіюємо індекси | |
for item in Path(existing_indices).iterdir(): | |
if item.is_file(): | |
shutil.copy2(item, indices_path) | |
elif item.is_dir(): | |
shutil.copytree(item, indices_path / item.name) | |
logger.info(f"Індекси успішно скопійовано в {indices_path}") | |
# Оновлюємо метадані | |
metadata = { | |
"session_id": session_id, | |
"created_at": datetime.now().isoformat(), | |
"data_hash": data_hash, | |
"rows_count": len(merged_df), | |
"columns_count": len(merged_df.columns), | |
"copied_from": str(existing_indices) | |
} | |
with open(indices_path / "metadata.json", "w", encoding="utf-8") as f: | |
json.dump(metadata, f, ensure_ascii=False, indent=2) | |
return { | |
"success": True, | |
"indices_dir": str(indices_path), | |
"data_hash": data_hash, | |
"reused_existing": True, | |
"source": str(existing_indices) | |
} | |
except Exception as copy_err: | |
logger.error(f"Помилка при копіюванні індексів: {copy_err}") | |
# Продовжуємо створення нових індексів | |
return self._create_new_indices(indices_path, session_id, data_hash, merged_df) | |
def _create_new_indices(self, indices_path, session_id, data_hash, merged_df): | |
""" | |
Створення нових індексів. | |
Зберігає індекси у форматі, сумісному з jira_hybrid_chat.py. | |
""" | |
if not INDEXING_AVAILABLE: | |
return {"error": "Функціональність індексування недоступна"} | |
try: | |
logger.info(f"Створення нових індексів для сесії {session_id}") | |
# Імпортуємо необхідні модулі напряму | |
from llama_index.core import VectorStoreIndex, StorageContext, Settings | |
from llama_index.core.storage.docstore import SimpleDocumentStore | |
from llama_index.core.node_parser import TokenTextSplitter | |
from llama_index.retrievers.bm25 import BM25Retriever | |
from llama_index.vector_stores.faiss import FaissVectorStore | |
import faiss | |
# Ініціалізуємо модель ембедингів | |
from modules.data_management.index_utils import initialize_embedding_model | |
embed_model = initialize_embedding_model() | |
# Отримуємо розмірність ембедингів динамічно | |
import numpy as np | |
test_embedding = embed_model.get_text_embedding("Тестовий текст") | |
embed_dim = len(test_embedding) | |
logger.info(f"Розмірність ембедингів: {embed_dim}") | |
# Конвертуємо DataFrame в документи | |
documents = self._convert_dataframe_to_documents(merged_df) | |
# Створюємо розділювач тексту | |
text_splitter = TokenTextSplitter( | |
chunk_size=CHUNK_SIZE, | |
chunk_overlap=CHUNK_OVERLAP | |
) | |
# Встановлюємо формат збереження на JSON через глобальні налаштування | |
# Це важливо для сумісності з jira_hybrid_chat.py | |
Settings.persist_json_format = True | |
# Створюємо FAISS індекс | |
faiss_index = faiss.IndexFlatL2(embed_dim) | |
# Створюємо контекст зберігання | |
docstore = SimpleDocumentStore() | |
vector_store = FaissVectorStore(faiss_index=faiss_index) | |
storage_context = StorageContext.from_defaults( | |
docstore=docstore, | |
vector_store=vector_store | |
) | |
# Встановлюємо модель ембедингів у налаштуваннях | |
Settings.embed_model = embed_model | |
# Створюємо індекс | |
index = VectorStoreIndex.from_documents( | |
documents, | |
storage_context=storage_context, | |
transformations=[text_splitter] | |
) | |
# Зберігаємо індекс у форматі JSON (через глобальні налаштування) | |
# НЕ передаємо json_format як аргумент | |
index.storage_context.persist(persist_dir=str(indices_path)) | |
# Створюємо BM25 retriever | |
bm25_retriever = BM25Retriever.from_defaults( | |
docstore=docstore, | |
similarity_top_k=10 | |
) | |
# Зберігаємо параметри BM25 | |
bm25_dir = indices_path / "bm25" | |
bm25_dir.mkdir(exist_ok=True) | |
with open(bm25_dir / "params.json", "w", encoding="utf-8") as f: | |
json.dump({"similarity_top_k": 10}, f) | |
# Зберігаємо метадані | |
metadata = { | |
"session_id": session_id, | |
"created_at": datetime.now().isoformat(), | |
"data_hash": data_hash, | |
"rows_count": len(merged_df), | |
"columns_count": len(merged_df.columns), | |
"embedding_model": embed_model.__class__.__name__, | |
"embedding_dim": embed_dim, | |
"format": "json" # Вказуємо використаний формат збереження | |
} | |
with open(indices_path / "metadata.json", "w", encoding="utf-8") as f: | |
json.dump(metadata, f, ensure_ascii=False, indent=2) | |
with open(indices_path / "indices.valid", "w", encoding="utf-8") as f: | |
f.write(f"Indices created at {datetime.now().isoformat()}") | |
logger.info(f"Створено файл-маркер indices.valid") | |
return { | |
"success": True, | |
"indices_dir": str(indices_path), | |
"data_hash": data_hash | |
} | |
except Exception as e: | |
logger.error(f"Помилка при створенні нових індексів: {e}") | |
return {"error": f"Помилка при створенні нових індексів: {str(e)}"} | |
def _save_bm25_data(self, indices_path, bm25_retriever): | |
""" | |
Збереження даних для BM25 retriever. | |
Args: | |
indices_path (Path): Шлях до директорії індексів | |
bm25_retriever (BM25Retriever): Об'єкт BM25Retriever | |
Returns: | |
bool: True, якщо дані успішно збережені, False у випадку помилки | |
""" | |
try: | |
# Створюємо директорію для BM25 | |
bm25_dir = indices_path / "bm25" | |
bm25_dir.mkdir(exist_ok=True) | |
# Зберігаємо параметри BM25 | |
bm25_params = { | |
"similarity_top_k": bm25_retriever.similarity_top_k, | |
"alpha": getattr(bm25_retriever, "alpha", 0.75), | |
"beta": getattr(bm25_retriever, "beta", 0.75), | |
"index_creation_time": datetime.now().isoformat() | |
} | |
with open(bm25_dir / "params.json", "w", encoding="utf-8") as f: | |
json.dump(bm25_params, f, ensure_ascii=False, indent=2) | |
logger.info(f"Дані BM25 збережено в {bm25_dir}") | |
return True | |
except Exception as e: | |
logger.error(f"Помилка при збереженні даних BM25: {e}") | |
return False | |
def _convert_dataframe_to_documents(self, df): | |
""" | |
Конвертує DataFrame в документи для індексування. | |
Args: | |
df (pandas.DataFrame): DataFrame для конвертації | |
Returns: | |
list: Список документів | |
""" | |
try: | |
# Імпортуємо Document напряму | |
from llama_index.core import Document | |
documents = [] | |
# Перебираємо рядки DataFrame | |
for idx, row in df.iterrows(): | |
# Створюємо текст документа | |
text = f"Issue Key: {row.get('Issue key', '')}\n" | |
text += f"Summary: {row.get('Summary', '')}\n" | |
text += f"Status: {row.get('Status', '')}\n" | |
text += f"Issue Type: {row.get('Issue Type', '')}\n" | |
# Додаємо опис, якщо він є | |
if 'Description' in row and pd.notna(row['Description']): | |
text += f"Description: {row['Description']}\n" | |
# Додаємо коментарі, якщо вони є | |
if 'Comments' in row and pd.notna(row['Comments']): | |
text += f"Comments: {row['Comments']}\n" | |
# Створюємо метадані | |
metadata = { | |
"issue_key": row.get('Issue key', ''), | |
"summary": row.get('Summary', ''), | |
"status": row.get('Status', ''), | |
"issue_type": row.get('Issue Type', ''), | |
"created": str(row.get('Created', '')), | |
"updated": str(row.get('Updated', '')) | |
} | |
# Створюємо документ | |
doc = Document( | |
text=text, | |
metadata=metadata | |
) | |
documents.append(doc) | |
logger.info(f"Створено {len(documents)} документів з DataFrame") | |
return documents | |
except Exception as e: | |
logger.error(f"Помилка при конвертації DataFrame в документи: {e}") | |
raise | |
def _generate_data_hash(self, df): | |
""" | |
Генерація хешу для DataFrame для ідентифікації унікальних даних. | |
Args: | |
df (pandas.DataFrame): DataFrame для хешування | |
Returns: | |
str: Хеш даних | |
""" | |
try: | |
# Використовуємо основні колонки для хешування | |
key_columns = ['Issue key', 'Summary', 'Status', 'Issue Type', 'Created', 'Updated'] | |
# Фільтруємо тільки наявні колонки | |
available_columns = [col for col in key_columns if col in df.columns] | |
if not available_columns: | |
# Якщо немає жодної ключової колонки, використовуємо всі дані | |
data_str = df.to_json() | |
else: | |
# Інакше використовуємо тільки ключові колонки | |
data_str = df[available_columns].to_json() | |
# Створюємо хеш | |
hash_object = hashlib.sha256(data_str.encode()) | |
data_hash = hash_object.hexdigest() | |
return data_hash | |
except Exception as e: | |
logger.error(f"Помилка при генерації хешу даних: {e}") | |
# У випадку помилки повертаємо випадковий хеш | |
return str(uuid.uuid4()) | |
def _find_indices_by_hash(self, data_hash): | |
""" | |
Пошук існуючих індексів за хешем даних. | |
Args: | |
data_hash (str): Хеш даних | |
Returns: | |
str: Шлях до директорії з індексами або None, якщо не знайдено | |
""" | |
try: | |
# Перебираємо всі піддиректорії в базовій директорії індексів | |
for index_dir in self.base_indices_dir.iterdir(): | |
if not index_dir.is_dir(): | |
continue | |
# Перевіряємо метадані | |
metadata_file = index_dir / "metadata.json" | |
if not metadata_file.exists(): | |
continue | |
try: | |
with open(metadata_file, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
# Перевіряємо хеш | |
if metadata.get("data_hash") == data_hash: | |
# Перевіряємо наявність необхідних файлів | |
if validate_index_directory(index_dir): | |
logger.info(f"Знайдено існуючі індекси з відповідним хешем: {index_dir}") | |
return str(index_dir) | |
else: | |
logger.warning(f"Знайдено індекси з відповідним хешем, але вони неповні: {index_dir}") | |
except Exception as e: | |
logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}") | |
logger.info(f"Не знайдено існуючих індексів з хешем {data_hash}") | |
return None | |
except Exception as e: | |
logger.error(f"Помилка при пошуку індексів за хешем: {e}") | |
return None | |
def cleanup_old_indices(self, max_age_days=7, max_indices=20): | |
""" | |
Очищення застарілих індексів. | |
Args: | |
max_age_days (int): Максимальний вік індексів у днях | |
max_indices (int): Максимальна кількість індексів для зберігання | |
Returns: | |
int: Кількість видалених директорій індексів | |
""" | |
try: | |
# Перевіряємо, чи існує базова директорія | |
if not self.base_indices_dir.exists(): | |
return 0 | |
# Отримуємо список директорій індексів | |
index_dirs = [] | |
for index_dir in self.base_indices_dir.iterdir(): | |
if not index_dir.is_dir(): | |
continue | |
# Перевіряємо метадані для отримання часу створення | |
metadata_file = index_dir / "metadata.json" | |
created_at = None | |
if metadata_file.exists(): | |
try: | |
with open(metadata_file, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
created_at = metadata.get("created_at") | |
except Exception: | |
pass | |
# Якщо немає метаданих, використовуємо час створення директорії | |
if not created_at: | |
created_at = datetime.fromtimestamp(index_dir.stat().st_mtime).isoformat() | |
# Додаємо інформацію про директорію | |
index_dirs.append({ | |
"path": str(index_dir), | |
"created_at": created_at | |
}) | |
# Якщо немає директорій для обробки, повертаємо 0 | |
if not index_dirs: | |
return 0 | |
# Сортуємо директорії за часом створення (від найновіших до найстаріших) | |
index_dirs.sort(key=lambda x: x["created_at"], reverse=True) | |
# Визначаємо директорії для видалення | |
dirs_to_delete = [] | |
# 1. Залишаємо max_indices найновіших директорій | |
if len(index_dirs) > max_indices: | |
dirs_to_delete.extend(index_dirs[max_indices:]) | |
# 2. Перевіряємо, чи є серед залишених застарілі директорії | |
cutoff_date = (datetime.now() - timedelta(days=max_age_days)).isoformat() | |
for index_info in index_dirs[:max_indices]: | |
if index_info["created_at"] < cutoff_date: | |
dirs_to_delete.append(index_info) | |
# Видаляємо директорії | |
deleted_count = 0 | |
for dir_info in dirs_to_delete: | |
try: | |
dir_path = Path(dir_info["path"]) | |
if dir_path.exists(): | |
shutil.rmtree(dir_path) | |
logger.info(f"Видалено застарілу директорію індексів: {dir_path}") | |
deleted_count += 1 | |
except Exception as e: | |
logger.error(f"Помилка при видаленні директорії {dir_info['path']}: {e}") | |
return deleted_count | |
except Exception as e: | |
logger.error(f"Помилка при очищенні застарілих індексів: {e}") | |
return 0 | |
def load_indices(self, indices_dir): | |
""" | |
Завантаження індексів з директорії. | |
Args: | |
indices_dir (str): Шлях до директорії з індексами | |
Returns: | |
tuple: (VectorStoreIndex, BM25Retriever) або (None, None) у випадку помилки | |
""" | |
if not self.indexing_available: | |
logger.warning("Функціональність індексування недоступна. Встановіть необхідні пакети.") | |
return None, None | |
try: | |
# Перевіряємо цілісність індексів | |
is_valid, message = check_index_integrity(indices_dir) | |
if not is_valid: | |
logger.error(f"Індекси не пройшли перевірку цілісності: {message}") | |
return None, None | |
indices_path = Path(indices_dir) | |
if not indices_path.exists(): | |
logger.error(f"Директорія індексів не існує: {indices_dir}") | |
return None, None | |
# Перевіряємо наявність необхідних файлів | |
if not (indices_path / "docstore.json").exists(): | |
logger.error(f"Директорія індексів не містить необхідних файлів: {indices_dir}") | |
return None, None | |
# Імпортуємо необхідні модулі | |
StorageContext = INDEXING_MODULES.get("StorageContext") | |
VectorStoreIndex = INDEXING_MODULES.get("VectorStoreIndex") | |
BM25Retriever = INDEXING_MODULES.get("BM25Retriever") | |
# Завантажуємо контекст зберігання | |
storage_context = StorageContext.from_defaults(persist_dir=str(indices_path)) | |
# Завантажуємо індекс | |
index = VectorStoreIndex.from_storage_context(storage_context) | |
# Створюємо BM25 retriever | |
bm25_retriever = BM25Retriever.from_defaults( | |
docstore=storage_context.docstore, | |
similarity_top_k=10 | |
) | |
# Завантажуємо параметри BM25, якщо вони є | |
bm25_params_file = indices_path / "bm25" / "params.json" | |
if bm25_params_file.exists(): | |
try: | |
with open(bm25_params_file, "r", encoding="utf-8") as f: | |
bm25_params = json.load(f) | |
# Встановлюємо параметри | |
if "similarity_top_k" in bm25_params: | |
bm25_retriever.similarity_top_k = bm25_params["similarity_top_k"] | |
except Exception as e: | |
logger.warning(f"Помилка при завантаженні параметрів BM25: {e}") | |
logger.info(f"Індекси успішно завантажено з {indices_dir}") | |
return index, bm25_retriever | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні індексів: {e}") | |
return None, None |