jira-ai-assistant / modules /data_management /unified_index_manager.py
DocUA's picture
Єдиний коміт - очищення історії
4ad5efa
import os
import logging
import json
import shutil
from pathlib import Path
import pandas as pd
from datetime import datetime, timedelta
# Імпорт LlamaIndex компонентів
from llama_index.core import (
VectorStoreIndex,
Document,
StorageContext,
load_index_from_storage,
Settings
)
from llama_index.core.node_parser import TokenTextSplitter
from llama_index.retrievers.bm25 import BM25Retriever
from llama_index.vector_stores.faiss import FaissVectorStore
from llama_index.core.schema import TextNode
from llama_index.core.storage.docstore import SimpleDocumentStore
import faiss
from modules.config.paths import INDICES_DIR
from modules.data_management.hash_utils import generate_data_hash
from modules.data_management.index_utils import (
check_indexing_availability,
initialize_embedding_model,
check_index_integrity
)
from modules.config.ai_settings import (
get_metadata_csv,
)
# Встановлюємо формат збереження на бінарний (не JSON)
Settings.persist_json_format = False
logger = logging.getLogger(__name__)
class UnifiedIndexManager:
"""
Уніфікований менеджер для створення та управління індексами даних.
"""
def __init__(self, base_indices_dir=None):
"""
Ініціалізація менеджера індексів.
Args:
base_indices_dir (str, optional): Базова директорія для зберігання індексів
"""
self.base_indices_dir = Path(base_indices_dir) if base_indices_dir else INDICES_DIR
self.base_indices_dir.mkdir(exist_ok=True, parents=True)
# Перевірка доступності модулів для індексування
self.indexing_available = check_indexing_availability("temp/indices")
if not self.indexing_available:
logger.warning("Функціональність індексування недоступна. Встановіть необхідні пакети.")
def get_or_create_indices(self, df, session_id=None):
"""
Отримання або створення індексів для даних.
Args:
df (pandas.DataFrame): DataFrame з даними
session_id (str, optional): Ідентифікатор сесії
Returns:
dict: Інформація про індекси
"""
if not self.indexing_available:
return {"error": "Функціональність індексування недоступна. Встановіть необхідні пакети."}
try:
# Генеруємо хеш для даних
data_hash = generate_data_hash(df, key_columns=['Issue key', 'Summary', 'Status', 'Issue Type', 'Created', 'Updated'])
if not data_hash:
return {"error": "Не вдалося згенерувати хеш для даних"}
# Перевіряємо, чи існують індекси для цих даних
existing_indices = self._find_indices_by_hash(data_hash)
if existing_indices:
# Перевіряємо цілісність індексів
is_valid, message = check_index_integrity(existing_indices)
if is_valid:
logger.info(f"Знайдено існуючі індекси для даних з хешем {data_hash}")
return {
"success": True,
"indices_dir": str(existing_indices),
"data_hash": data_hash,
"reused_existing": True
}
else:
logger.warning(f"Знайдено індекси з відповідним хешем, але вони не пройшли перевірку цілісності: {message}")
# Створюємо нові індекси
# Визначаємо директорію для індексів
if session_id:
indices_path = self.base_indices_dir / session_id
else:
# Якщо не вказано session_id, використовуємо поточну дату і час
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
indices_path = self.base_indices_dir / timestamp
indices_path.mkdir(exist_ok=True, parents=True)
# Створюємо нові індекси
result = self._create_new_indices(indices_path, session_id, data_hash, df)
# Форматуємо результат
if isinstance(result, dict):
return result
else:
return {
"success": True,
"indices_dir": str(indices_path),
"data_hash": data_hash
}
except Exception as e:
logger.error(f"Помилка при отриманні або створенні індексів: {e}")
import traceback
logger.error(traceback.format_exc())
return {"error": f"Помилка при отриманні або створенні індексів: {str(e)}"}
def _find_indices_by_hash(self, data_hash):
"""
Пошук існуючих індексів за хешем даних.
Args:
data_hash (str): Хеш даних
Returns:
Path: Шлях до директорії з індексами або None, якщо не знайдено
"""
try:
# Перебираємо всі піддиректорії в базовій директорії індексів
for index_dir in self.base_indices_dir.iterdir():
if not index_dir.is_dir():
continue
# Перевіряємо метадані
metadata_file = index_dir / "metadata.json"
if not metadata_file.exists():
continue
try:
with open(metadata_file, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Перевіряємо хеш
if metadata.get("data_hash") == data_hash:
return index_dir
except Exception as e:
logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}")
return None
except Exception as e:
logger.error(f"Помилка при пошуку індексів за хешем: {e}")
return None
def _create_new_indices(self, indices_path, session_id, data_hash, df):
"""
Створення нових індексів.
Args:
indices_path (Path): Шлях для збереження індексів
session_id (str): Ідентифікатор сесії
data_hash (str): Хеш даних
df (pandas.DataFrame): DataFrame з даними
Returns:
dict: Інформація про створені індекси
"""
try:
# Ініціалізуємо модель ембедингів
embed_model = initialize_embedding_model()
if not embed_model:
return {"error": "Не вдалося ініціалізувати модель ембедингів"}
# Отримуємо розмірність ембедингів
sample_embedding = embed_model.get_text_embedding("Test")
embedding_dim = len(sample_embedding)
logger.info(f"Розмірність ембедингів: {embedding_dim}")
# Конвертуємо DataFrame в документи
documents = self._convert_dataframe_to_documents(df)
if not documents:
return {"error": "Не вдалося конвертувати дані в документи"}
# Створюємо ноди з документів
nodes = [TextNode(text=doc.text, metadata=doc.metadata) for doc in documents]
# Створюємо FAISS індекс
faiss_index = faiss.IndexFlatL2(embedding_dim)
vector_store = FaissVectorStore(faiss_index=faiss_index)
# Створюємо документне сховище
docstore = SimpleDocumentStore()
docstore.add_documents(nodes)
# Створюємо контекст зберігання
storage_context = StorageContext.from_defaults(
docstore=docstore,
vector_store=vector_store
)
# Встановлюємо модель ембедингів
Settings.embed_model = embed_model
# Створюємо індекс
index = VectorStoreIndex.from_documents(
documents,
storage_context=storage_context
)
# Зберігаємо індекс у файл (бінарний формат)
index.storage_context.persist(str(indices_path))
# Створюємо BM25 retriever і зберігаємо його параметри
bm25_retriever = BM25Retriever.from_defaults(
docstore=docstore,
similarity_top_k=10
)
self._save_bm25_data(indices_path, bm25_retriever)
# Зберігаємо метадані
self._save_indices_metadata(indices_path, {
"session_id": session_id,
"created_at": datetime.now().isoformat(),
"data_hash": data_hash,
"documents_count": len(documents),
"nodes_count": len(nodes),
"rows_count": len(df),
"columns_count": len(df.columns),
"embedding_model": str(embed_model),
"embedding_dim": embedding_dim,
"storage_format": "binary"
})
# Створюємо маркерний файл для перевірки валідності індексів
with open(indices_path / "indices.valid", "w") as f:
f.write(f"Indices created at {datetime.now().isoformat()}")
logger.info(f"Індекси успішно створено в {indices_path}")
# Зберігаємо шлях глобально, якщо доступно
self._save_indices_path_globally(str(indices_path))
return {
"success": True,
"indices_dir": str(indices_path),
"data_hash": data_hash,
"documents_count": len(documents),
"nodes_count": len(nodes),
"rows_count": len(df),
"reused_existing": False
}
except Exception as e:
logger.error(f"Помилка при створенні нових індексів: {e}")
import traceback
logger.error(traceback.format_exc())
return {"error": f"Помилка при створенні нових індексів: {str(e)}"}
def _save_indices_metadata(self, indices_path, metadata):
"""Зберігає метадані індексів у файл."""
try:
with open(indices_path / "metadata.json", "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"Помилка при збереженні метаданих: {e}")
return False
def _save_indices_path_globally(self, indices_path):
"""Зберігає шлях до індексів у глобальних об'єктах (app, index_manager)."""
try:
import builtins
if hasattr(builtins, 'app'):
builtins.app.indices_path = indices_path
logger.info(f"Шлях до індексів збережено глобально: {indices_path}")
# Якщо також є глобальний index_manager, зберігаємо в ньому
if hasattr(builtins, 'index_manager'):
builtins.index_manager.last_indices_path = indices_path
return True
except Exception as e:
logger.warning(f"Не вдалося зберегти шлях до індексів глобально: {e}")
return False
def _convert_dataframe_to_documents(self, df):
"""
Конвертує DataFrame у документи для індексування.
Кожен документ представляє один рядок CSV з усіма його полями.
"""
try:
# Перевірка типу даних
if not hasattr(df, 'iterrows'):
logger.error(f"Отримано не DataFrame: {type(df)}")
return None
# Конвертація в документи
documents = []
for idx, row in df.iterrows():
# Формуємо текст документа, включаючи всі основні поля
text_parts = []
# Додаємо основні поля
key_fields = [
('Issue key', 'Ключ задачі'),
('Summary', 'Заголовок'),
('Issue Type', 'Тип задачі'),
('Status', 'Статус'),
('Priority', 'Пріоритет'),
('Assignee', 'Виконавець'),
('Reporter', 'Автор'),
('Created', 'Створено'),
('Updated', 'Оновлено'),
('Project name', 'Проект')
]
for field, title in key_fields:
if field in row and pd.notna(row[field]):
text_parts.append(f"{title}: {str(row[field])}")
# Додаємо опис, якщо він є
if 'Description' in row and pd.notna(row['Description']):
text_parts.append(f"Опис: {str(row['Description'])}")
# Додаємо коментарі, якщо вони є
comments = []
for col in df.columns:
if col.startswith('Comment') and pd.notna(row[col]):
comments.append(str(row[col]))
if comments:
text_parts.append("Коментарі:")
for i, comment in enumerate(comments, 1):
text_parts.append(f"Коментар {i}: {comment}")
# Додаємо інформацію про зв'язки, якщо вона є
links = []
for col in df.columns:
if col.startswith('Outward issue link') and pd.notna(row[col]):
link_type = col.replace('Outward issue link (', '').replace(')', '')
links.append(f"{link_type}: {str(row[col])}")
if links:
text_parts.append("Зв'язки:")
for link in links:
text_parts.append(link)
# Додаємо користувацькі поля
custom_fields = []
for col in df.columns:
if (col.startswith('Custom field') or col.startswith('Sprint')) and pd.notna(row[col]):
field_name = col.replace('Custom field (', '').replace(')', '')
custom_fields.append(f"{field_name}: {str(row[col])}")
if custom_fields:
text_parts.append("Додаткові поля:")
for field in custom_fields:
text_parts.append(field)
# Об'єднуємо все в один текст
text = "\n".join(text_parts)
# Якщо текст порожній, використовуємо хоча б заголовок
if not text and 'Summary' in row and pd.notna(row['Summary']):
text = f"Заголовок: {str(row['Summary'])}"
elif not text:
text = f"Задача {idx}"
# Створюємо метадані - включаємо всі основні поля
metadata = get_metadata_csv(row, idx)
# Додаємо інформацію про зв'язки в метадані
if 'Outward issue link (Relates)' in row and pd.notna(row['Outward issue link (Relates)']):
metadata["related_issues"] = row['Outward issue link (Relates)']
# Створення документа
doc = Document(
text=text,
metadata=metadata
)
documents.append(doc)
logger.info(f"Створено {len(documents)} документів з DataFrame")
return documents
except Exception as e:
logger.error(f"Помилка при конвертації DataFrame в документи: {e}")
import traceback
logger.error(traceback.format_exc())
return []
def _save_bm25_data(self, indices_path, bm25_retriever):
"""
Збереження даних для BM25 retriever.
"""
try:
# Створюємо директорію для BM25
bm25_dir = indices_path / "bm25"
bm25_dir.mkdir(exist_ok=True)
# Зберігаємо параметри BM25
bm25_params = {
"similarity_top_k": bm25_retriever.similarity_top_k,
"alpha": getattr(bm25_retriever, "alpha", 0.75),
"beta": getattr(bm25_retriever, "beta", 0.75),
"index_creation_time": datetime.now().isoformat()
}
with open(bm25_dir / "params.json", "w", encoding="utf-8") as f:
json.dump(bm25_params, f, ensure_ascii=False, indent=2)
logger.info(f"Дані BM25 збережено в {bm25_dir}")
return True
except Exception as e:
logger.error(f"Помилка при збереженні даних BM25: {e}")
return False
def load_indices(self, indices_dir):
"""Завантаження індексів з директорії."""
try:
# Перевірка наявності директорії
indices_path = Path(indices_dir)
if not indices_path.exists():
logger.error(f"Директорія індексів не існує: {indices_dir}")
return None, None
# Перевірка наявності маркерного файлу
marker_path = indices_path / "indices.valid"
if not marker_path.exists():
logger.warning(f"Файл маркера не знайдено в {indices_dir}. Індекси не завантажено.")
return None, None
try:
# Спробуємо завантажити vector_store
vector_store = FaissVectorStore.from_persist_dir(indices_dir)
# Створюємо контекст зберігання
storage_context = StorageContext.from_defaults(
vector_store=vector_store,
persist_dir=indices_dir
)
# Завантажуємо індекс
index = load_index_from_storage(
storage_context=storage_context,
index_cls=VectorStoreIndex
)
# Створюємо BM25 retriever
bm25_retriever = BM25Retriever.from_defaults(
docstore=storage_context.docstore,
similarity_top_k=10
)
# Перевіряємо наявність параметрів BM25
bm25_params_path = indices_path / "bm25" / "params.json"
if bm25_params_path.exists():
try:
with open(bm25_params_path, "r", encoding="utf-8") as f:
bm25_params = json.load(f)
if "similarity_top_k" in bm25_params:
bm25_retriever.similarity_top_k = bm25_params["similarity_top_k"]
except Exception as e:
logger.warning(f"Не вдалося завантажити параметри BM25: {e}")
logger.info(f"Індекси успішно завантажено з {indices_dir}")
return index, bm25_retriever
except Exception as e:
logger.error(f"Помилка при завантаженні індексів: {e}")
import traceback
logger.error(traceback.format_exc())
# Діагностичні повідомлення
logger.info(f"Файли у директорії {indices_dir}: {[f.name for f in indices_path.iterdir() if f.is_file()]}")
return None, None
except Exception as e:
logger.error(f"Помилка при завантаженні індексів: {e}")
return None, None
def cleanup_old_indices(self, max_age_days=7, max_indices=20):
"""
Очищення застарілих індексів.
Args:
max_age_days (int): Максимальний вік індексів у днях
max_indices (int): Максимальна кількість індексів для зберігання
Returns:
int: Кількість видалених директорій
"""
try:
# Збираємо інформацію про всі директорії індексів
index_dirs = []
for index_dir in self.base_indices_dir.iterdir():
if not index_dir.is_dir():
continue
# Перевіряємо метадані
metadata_file = index_dir / "metadata.json"
if not metadata_file.exists():
continue
try:
with open(metadata_file, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Отримуємо час створення
created_at = metadata.get("created_at", "")
index_dirs.append({
"path": str(index_dir),
"created_at": created_at
})
except Exception as e:
logger.error(f"Помилка при перевірці метаданих {metadata_file}: {e}")
# Якщо немає директорій, виходимо
if not index_dirs:
return 0
# Сортуємо директорії за часом створення (від найновіших до найстаріших)
index_dirs.sort(key=lambda x: x["created_at"], reverse=True)
# Визначаємо директорії для видалення
dirs_to_delete = []
# 1. Залишаємо max_indices найновіших директорій
if len(index_dirs) > max_indices:
dirs_to_delete.extend(index_dirs[max_indices:])
# 2. Перевіряємо, чи є серед залишених застарілі директорії
cutoff_date = (datetime.now() - timedelta(days=max_age_days)).isoformat()
for index_info in index_dirs[:max_indices]:
if index_info["created_at"] < cutoff_date:
dirs_to_delete.append(index_info)
# Видаляємо директорії
deleted_count = 0
for dir_info in dirs_to_delete:
try:
dir_path = Path(dir_info["path"])
if dir_path.exists():
shutil.rmtree(dir_path)
logger.info(f"Видалено застарілу директорію індексів: {dir_path}")
deleted_count += 1
except Exception as e:
logger.error(f"Помилка при видаленні директорії {dir_info['path']}: {e}")
return deleted_count
except Exception as e:
logger.error(f"Помилка при очищенні застарілих індексів: {e}")
return 0