DocUA's picture
Єдиний коміт - очищення історії
4ad5efa
import os
import shutil
import logging
import pandas as pd
import hashlib
from pathlib import Path
from datetime import datetime
from modules.data_management.session_manager import SessionManager
logger = logging.getLogger(__name__)
class DataManager:
"""
Менеджер даних для роботи з файлами CSV та їх обробки.
"""
def __init__(self, current_data_dir="current_data", session_manager=None):
"""
Ініціалізація менеджера даних.
Args:
current_data_dir (str): Директорія з локальними файлами даних
session_manager (SessionManager, optional): Менеджер сесій або None для створення нового
"""
self.current_data_dir = Path(current_data_dir)
self.current_data_dir.mkdir(exist_ok=True, parents=True)
# Ініціалізація менеджера сесій
self.session_manager = session_manager or SessionManager()
def get_local_files(self):
"""
Отримання списку локальних CSV-файлів.
Returns:
list: Список словників з інформацією про файли
"""
files_info = []
if not self.current_data_dir.exists():
logger.warning(f"Директорія {self.current_data_dir} не існує")
return files_info
for file_path in self.current_data_dir.glob("*.csv"):
try:
# Отримуємо базову інформацію про файл
stat = file_path.stat()
size_kb = stat.st_size / 1024
modified = datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
# Спроба зчитати перші рядки для отримання інформації про структуру
try:
df_preview = pd.read_csv(file_path, nrows=5)
rows_preview = len(df_preview)
columns_preview = len(df_preview.columns)
columns_list = df_preview.columns.tolist()
except Exception as e:
logger.warning(f"Не вдалося прочитати файл {file_path}: {e}")
rows_preview = "?"
columns_preview = "?"
columns_list = []
# Формуємо інформацію про файл
files_info.append({
"path": str(file_path),
"name": file_path.name,
"size_kb": round(size_kb, 2),
"modified": modified,
"rows_preview": rows_preview,
"columns_preview": columns_preview,
"columns_list": columns_list
})
except Exception as e:
logger.error(f"Помилка при обробці файлу {file_path}: {e}")
# Сортуємо за часом модифікації (від найновіших до найстаріших)
files_info.sort(key=lambda x: x["modified"], reverse=True)
return files_info
def validate_csv_file(self, file_path):
"""
Перевірка валідності CSV-файлу.
Args:
file_path (str): Шлях до файлу
Returns:
tuple: (is_valid, info_dict)
is_valid - True, якщо файл валідний
info_dict - словник з інформацією про файл
"""
if not Path(file_path).exists():
return False, {"error": f"Файл не знайдено: {file_path}"}
try:
# Отримуємо інформацію про файл
file_stat = Path(file_path).stat()
size_kb = file_stat.st_size / 1024
if size_kb == 0:
return False, {"error": "Файл порожній"}
# Спроба зчитати файл
df = pd.read_csv(file_path)
# Перевірка наявності очікуваних колонок
required_columns = ['Summary', 'Issue key', 'Status']
missing_columns = [col for col in required_columns if col not in df.columns]
if missing_columns:
return False, {
"error": f"Відсутні необхідні колонки: {', '.join(missing_columns)}",
"rows": len(df),
"columns": len(df.columns),
"columns_list": df.columns.tolist()
}
# Формуємо інформацію про файл
info = {
"rows": len(df),
"columns": len(df.columns),
"columns_list": df.columns.tolist(),
"size_kb": round(size_kb, 2),
"first_rows": df.head(5).to_dict('records')
}
return True, info
except Exception as e:
logger.error(f"Помилка при валідації CSV-файлу {file_path}: {e}")
return False, {"error": f"Помилка при читанні файлу: {str(e)}"}
def copy_files_to_session(self, session_id, file_paths_list):
"""
Копіювання вибраних файлів до сесії користувача.
Args:
session_id (str): Ідентифікатор сесії
file_paths_list (list): Список шляхів до файлів для копіювання
Returns:
list: Список скопійованих файлів у сесії
"""
session_data_dir = self.session_manager.get_session_data_dir(session_id)
if not session_data_dir:
logger.error(f"Не вдалося отримати директорію даних для сесії {session_id}")
return []
copied_files = []
for file_path in file_paths_list:
try:
source_path = Path(file_path)
if not source_path.exists():
logger.warning(f"Файл не знайдено: {file_path}")
continue
# Створюємо унікальне ім'я файлу в сесії
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
dest_filename = f"local_{timestamp}_{source_path.name}"
dest_path = session_data_dir / dest_filename
# Копіюємо файл
shutil.copyfile(source_path, dest_path)
# Додаємо інформацію про файл до сесії
if self.session_manager.add_data_file(
session_id,
str(dest_path),
file_type="local",
description=f"Local file: {source_path.name}"
):
copied_files.append(str(dest_path))
logger.info(f"Файл {source_path.name} скопійовано до сесії {session_id}")
except Exception as e:
logger.error(f"Помилка при копіюванні файлу {file_path} до сесії {session_id}: {e}")
return copied_files
def merge_dataframes(self, session_id, dataframes, output_name=None):
"""
Об'єднання кількох DataFrame та збереження результату в сесії.
Args:
session_id (str): Ідентифікатор сесії
dataframes (list): Список DataFrame для об'єднання
output_name (str, optional): Ім'я файлу для збереження результату
Returns:
tuple: (merged_df, output_path) - об'єднаний DataFrame та шлях до збереженого файлу
"""
if not dataframes:
logger.warning("Немає даних для об'єднання")
return None, None
try:
# Якщо є тільки один DataFrame, використовуємо його як базовий
if len(dataframes) == 1:
merged_df = dataframes[0].copy()
else:
# Об'єднуємо всі DataFrame по рядках з ігноруванням індексів
merged_df = pd.concat(dataframes, ignore_index=True)
# Видаляємо дублікати за ключовими колонками
if 'Issue key' in merged_df.columns:
merged_df.drop_duplicates(subset=['Issue key'], keep='first', inplace=True)
# Зберігаємо результат
output_path = self.session_manager.save_merged_data(session_id, merged_df, output_name)
return merged_df, output_path
except Exception as e:
logger.error(f"Помилка при об'єднанні даних: {e}")
return None, None
def load_data_from_files(self, session_id, file_paths_list):
"""
Завантаження даних з файлів у DataFrame.
Args:
session_id (str): Ідентифікатор сесії
file_paths_list (list): Список шляхів до файлів для завантаження
Returns:
list: Список кортежів (file_path, dataframe, success)
"""
results = []
for file_path in file_paths_list:
try:
# Перевіряємо, чи існує файл
if not Path(file_path).exists():
logger.warning(f"Файл не знайдено: {file_path}")
results.append((file_path, None, False))
continue
# Завантажуємо файл
df = pd.read_csv(file_path)
# Обробка дат
for date_col in ['Created', 'Updated', 'Resolved', 'Due Date']:
if date_col in df.columns:
df[date_col] = pd.to_datetime(df[date_col], format='%Y-%m-%dT%H:%M:%S', errors='coerce')
# Підготовка додаткових колонок для аналізу
if 'Created' in df.columns and pd.api.types.is_datetime64_dtype(df[date_col]):
df['Created_Date'] = df['Created'].dt.date
df['Created_Month'] = df['Created'].dt.to_period('M')
if 'Updated' in df.columns and pd.api.types.is_datetime64_dtype(df[date_col]):
df['Updated_Date'] = df['Updated'].dt.date
df['Days_Since_Update'] = (datetime.now() - df['Updated']).dt.days
results.append((file_path, df, True))
logger.info(f"Успішно завантажено файл {file_path}, {len(df)} рядків")
except Exception as e:
logger.error(f"Помилка при завантаженні файлу {file_path}: {e}")
results.append((file_path, None, False))
return results
def initialize_session_data(self, session_id, local_files, uploaded_file=None):
"""
Ініціалізація даних сесії з локальних та завантажених файлів.
Args:
session_id (str): Ідентифікатор сесії
local_files (list): Список шляхів до локальних файлів
uploaded_file (str, optional): Шлях до завантаженого файлу
Returns:
tuple: (success, result_info) - успішність операції та інформація про результат
"""
try:
# Копіюємо локальні файли до сесії
copied_files = self.copy_files_to_session(session_id, local_files)
# Додаємо завантажений файл, якщо він є
if uploaded_file and Path(uploaded_file).exists():
# Копіюємо файл до сесії
session_data_dir = self.session_manager.get_session_data_dir(session_id)
if not session_data_dir:
return False, {"error": "Не вдалося отримати директорію даних сесії"}
# Створюємо унікальне ім'я для завантаженого файлу
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
dest_filename = f"uploaded_{timestamp}_{Path(uploaded_file).name}"
dest_path = session_data_dir / dest_filename
# Копіюємо файл
shutil.copyfile(uploaded_file, dest_path)
# Додаємо інформацію про файл до сесії
self.session_manager.add_data_file(
session_id,
str(dest_path),
file_type="uploaded",
description=f"Uploaded file: {Path(uploaded_file).name}"
)
copied_files.append(str(dest_path))
# Якщо немає файлів для обробки, повертаємо помилку
if not copied_files:
return False, {"error": "Не вибрано жодного файлу для обробки"}
# Завантажуємо дані з усіх файлів
loaded_data = self.load_data_from_files(session_id, copied_files)
# Фільтруємо тільки успішно завантажені файли
valid_data = [(path, df) for path, df, success in loaded_data if success and df is not None]
if not valid_data:
return False, {"error": "Не вдалося завантажити жодного файлу"}
# Отримуємо список DataFrame
dataframes = [df for _, df in valid_data]
# Об'єднуємо дані
merged_df, output_path = self.merge_dataframes(
session_id,
dataframes,
output_name=f"merged_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
)
if merged_df is None or not output_path:
return False, {"error": "Не вдалося об'єднати дані"}
result_info = {
"merged_file": output_path,
"rows_count": len(merged_df),
"columns_count": len(merged_df.columns),
"source_files_count": len(valid_data),
"merged_df": merged_df # Передаємо DataFrame для подальшого використання
}
return True, result_info
except Exception as e:
logger.error(f"Помилка при ініціалізації даних сесії {session_id}: {e}")
return False, {"error": f"Помилка при ініціалізації даних: {str(e)}"}
def get_file_preview(self, file_path, max_rows=10):
"""
Отримання попереднього перегляду файлу CSV.
Args:
file_path (str): Шлях до файлу
max_rows (int): Максимальна кількість рядків для перегляду
Returns:
dict: Словник з інформацією про файл та його вмістом
"""
try:
if not Path(file_path).exists():
return {"error": f"Файл не знайдено: {file_path}"}
# Зчитуємо перші max_rows рядків
df = pd.read_csv(file_path, nrows=max_rows)
# Отримуємо інформацію про файл
file_stat = Path(file_path).stat()
size_kb = file_stat.st_size / 1024
modified = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S')
# Підраховуємо загальну кількість рядків (обережно з великими файлами)
total_rows = sum(1 for _ in open(file_path, 'r')) - 1 # -1 для заголовка
# Формуємо результат
result = {
"filename": Path(file_path).name,
"path": file_path,
"size_kb": round(size_kb, 2),
"modified": modified,
"total_rows": total_rows,
"columns": df.columns.tolist(),
"columns_count": len(df.columns),
"preview_rows": df.to_dict('records')
}
return result
except Exception as e:
logger.error(f"Помилка при отриманні попереднього перегляду файлу {file_path}: {e}")
return {"error": f"Помилка при читанні файлу: {str(e)}"}
def cleanup_temp_data(self):
"""
Очищення тимчасових даних, крім файлів у папці current_data.
Returns:
dict: Інформація про результати очищення
"""
try:
import shutil
import os
from pathlib import Path
cleanup_stats = {
"temp_files_removed": 0,
"session_dirs_removed": 0,
"indices_dirs_removed": 0,
"reports_removed": 0,
"temp_directories": []
}
# Очищення тимчасових індексів
indices_dir = Path("temp/indices")
if indices_dir.exists():
for item in indices_dir.iterdir():
if item.is_dir():
try:
shutil.rmtree(item)
cleanup_stats["indices_dirs_removed"] += 1
except Exception as e:
logger.error(f"Помилка при видаленні директорії індексів {item}: {e}")
# Очищення тимчасових сесій
sessions_dir = Path("temp/sessions")
if sessions_dir.exists():
for item in sessions_dir.iterdir():
if item.is_dir():
try:
shutil.rmtree(item)
cleanup_stats["session_dirs_removed"] += 1
except Exception as e:
logger.error(f"Помилка при видаленні директорії сесій {item}: {e}")
# Очищення інших файлів у temp
temp_dir = Path("temp")
if temp_dir.exists():
for item in temp_dir.iterdir():
if item.is_file():
try:
item.unlink()
cleanup_stats["temp_files_removed"] += 1
except Exception as e:
logger.error(f"Помилка при видаленні файлу {item}: {e}")
# Очищення тимчасових звітів
reports_dir = Path("reports")
if reports_dir.exists():
reports_count = 0
# Видаляємо файли у головній директорії reports
for item in reports_dir.iterdir():
if item.is_file():
try:
item.unlink()
reports_count += 1
except Exception as e:
logger.error(f"Помилка при видаленні звіту {item}: {e}")
# Перевіряємо і очищаємо підпапку візуалізацій
viz_dir = reports_dir / "visualizations"
if viz_dir.exists():
for item in viz_dir.iterdir():
if item.is_file():
try:
item.unlink()
reports_count += 1
except Exception as e:
logger.error(f"Помилка при видаленні візуалізації {item}: {e}")
cleanup_stats["reports_removed"] = reports_count
# Запам'ятовуємо всі очищені директорії
cleanup_stats["temp_directories"] = ["temp/indices", "temp/sessions", "reports", "temp"]
# Створюємо наново всі необхідні директорії
for directory in ["temp", "temp/indices", "temp/sessions", "reports", "reports/visualizations"]:
Path(directory).mkdir(exist_ok=True, parents=True)
logger.info(f"Тимчасові дані успішно очищено: {cleanup_stats}")
return {
"success": True,
"stats": cleanup_stats
}
except Exception as e:
logger.error(f"Помилка при очищенні тимчасових даних: {e}")
return {
"success": False,
"error": str(e)
}
# Додано функцію в модуль для обробка дат
def safe_strftime(date_value, format_str="%Y-%m-%d"):
"""Безпечне форматування дати з обробкою None та NaT значень."""
import pandas as pd
if date_value is None or pd.isna(date_value):
return "Н/Д" # або будь-яке інше значення за замовчуванням
try:
return date_value.strftime(format_str)
except Exception:
return "Неправильна дата"