Spaces:
Running
Running
import os | |
import shutil | |
import logging | |
import pandas as pd | |
import hashlib | |
from pathlib import Path | |
from datetime import datetime | |
from modules.data_management.session_manager import SessionManager | |
logger = logging.getLogger(__name__) | |
class DataManager: | |
""" | |
Менеджер даних для роботи з файлами CSV та їх обробки. | |
""" | |
def __init__(self, current_data_dir="current_data", session_manager=None): | |
""" | |
Ініціалізація менеджера даних. | |
Args: | |
current_data_dir (str): Директорія з локальними файлами даних | |
session_manager (SessionManager, optional): Менеджер сесій або None для створення нового | |
""" | |
self.current_data_dir = Path(current_data_dir) | |
self.current_data_dir.mkdir(exist_ok=True, parents=True) | |
# Ініціалізація менеджера сесій | |
self.session_manager = session_manager or SessionManager() | |
def get_local_files(self): | |
""" | |
Отримання списку локальних CSV-файлів. | |
Returns: | |
list: Список словників з інформацією про файли | |
""" | |
files_info = [] | |
if not self.current_data_dir.exists(): | |
logger.warning(f"Директорія {self.current_data_dir} не існує") | |
return files_info | |
for file_path in self.current_data_dir.glob("*.csv"): | |
try: | |
# Отримуємо базову інформацію про файл | |
stat = file_path.stat() | |
size_kb = stat.st_size / 1024 | |
modified = datetime.fromtimestamp(stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S') | |
# Спроба зчитати перші рядки для отримання інформації про структуру | |
try: | |
df_preview = pd.read_csv(file_path, nrows=5) | |
rows_preview = len(df_preview) | |
columns_preview = len(df_preview.columns) | |
columns_list = df_preview.columns.tolist() | |
except Exception as e: | |
logger.warning(f"Не вдалося прочитати файл {file_path}: {e}") | |
rows_preview = "?" | |
columns_preview = "?" | |
columns_list = [] | |
# Формуємо інформацію про файл | |
files_info.append({ | |
"path": str(file_path), | |
"name": file_path.name, | |
"size_kb": round(size_kb, 2), | |
"modified": modified, | |
"rows_preview": rows_preview, | |
"columns_preview": columns_preview, | |
"columns_list": columns_list | |
}) | |
except Exception as e: | |
logger.error(f"Помилка при обробці файлу {file_path}: {e}") | |
# Сортуємо за часом модифікації (від найновіших до найстаріших) | |
files_info.sort(key=lambda x: x["modified"], reverse=True) | |
return files_info | |
def validate_csv_file(self, file_path): | |
""" | |
Перевірка валідності CSV-файлу. | |
Args: | |
file_path (str): Шлях до файлу | |
Returns: | |
tuple: (is_valid, info_dict) | |
is_valid - True, якщо файл валідний | |
info_dict - словник з інформацією про файл | |
""" | |
if not Path(file_path).exists(): | |
return False, {"error": f"Файл не знайдено: {file_path}"} | |
try: | |
# Отримуємо інформацію про файл | |
file_stat = Path(file_path).stat() | |
size_kb = file_stat.st_size / 1024 | |
if size_kb == 0: | |
return False, {"error": "Файл порожній"} | |
# Спроба зчитати файл | |
df = pd.read_csv(file_path) | |
# Перевірка наявності очікуваних колонок | |
required_columns = ['Summary', 'Issue key', 'Status'] | |
missing_columns = [col for col in required_columns if col not in df.columns] | |
if missing_columns: | |
return False, { | |
"error": f"Відсутні необхідні колонки: {', '.join(missing_columns)}", | |
"rows": len(df), | |
"columns": len(df.columns), | |
"columns_list": df.columns.tolist() | |
} | |
# Формуємо інформацію про файл | |
info = { | |
"rows": len(df), | |
"columns": len(df.columns), | |
"columns_list": df.columns.tolist(), | |
"size_kb": round(size_kb, 2), | |
"first_rows": df.head(5).to_dict('records') | |
} | |
return True, info | |
except Exception as e: | |
logger.error(f"Помилка при валідації CSV-файлу {file_path}: {e}") | |
return False, {"error": f"Помилка при читанні файлу: {str(e)}"} | |
def copy_files_to_session(self, session_id, file_paths_list): | |
""" | |
Копіювання вибраних файлів до сесії користувача. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
file_paths_list (list): Список шляхів до файлів для копіювання | |
Returns: | |
list: Список скопійованих файлів у сесії | |
""" | |
session_data_dir = self.session_manager.get_session_data_dir(session_id) | |
if not session_data_dir: | |
logger.error(f"Не вдалося отримати директорію даних для сесії {session_id}") | |
return [] | |
copied_files = [] | |
for file_path in file_paths_list: | |
try: | |
source_path = Path(file_path) | |
if not source_path.exists(): | |
logger.warning(f"Файл не знайдено: {file_path}") | |
continue | |
# Створюємо унікальне ім'я файлу в сесії | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
dest_filename = f"local_{timestamp}_{source_path.name}" | |
dest_path = session_data_dir / dest_filename | |
# Копіюємо файл | |
shutil.copyfile(source_path, dest_path) | |
# Додаємо інформацію про файл до сесії | |
if self.session_manager.add_data_file( | |
session_id, | |
str(dest_path), | |
file_type="local", | |
description=f"Local file: {source_path.name}" | |
): | |
copied_files.append(str(dest_path)) | |
logger.info(f"Файл {source_path.name} скопійовано до сесії {session_id}") | |
except Exception as e: | |
logger.error(f"Помилка при копіюванні файлу {file_path} до сесії {session_id}: {e}") | |
return copied_files | |
def merge_dataframes(self, session_id, dataframes, output_name=None): | |
""" | |
Об'єднання кількох DataFrame та збереження результату в сесії. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
dataframes (list): Список DataFrame для об'єднання | |
output_name (str, optional): Ім'я файлу для збереження результату | |
Returns: | |
tuple: (merged_df, output_path) - об'єднаний DataFrame та шлях до збереженого файлу | |
""" | |
if not dataframes: | |
logger.warning("Немає даних для об'єднання") | |
return None, None | |
try: | |
# Якщо є тільки один DataFrame, використовуємо його як базовий | |
if len(dataframes) == 1: | |
merged_df = dataframes[0].copy() | |
else: | |
# Об'єднуємо всі DataFrame по рядках з ігноруванням індексів | |
merged_df = pd.concat(dataframes, ignore_index=True) | |
# Видаляємо дублікати за ключовими колонками | |
if 'Issue key' in merged_df.columns: | |
merged_df.drop_duplicates(subset=['Issue key'], keep='first', inplace=True) | |
# Зберігаємо результат | |
output_path = self.session_manager.save_merged_data(session_id, merged_df, output_name) | |
return merged_df, output_path | |
except Exception as e: | |
logger.error(f"Помилка при об'єднанні даних: {e}") | |
return None, None | |
def load_data_from_files(self, session_id, file_paths_list): | |
""" | |
Завантаження даних з файлів у DataFrame. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
file_paths_list (list): Список шляхів до файлів для завантаження | |
Returns: | |
list: Список кортежів (file_path, dataframe, success) | |
""" | |
results = [] | |
for file_path in file_paths_list: | |
try: | |
# Перевіряємо, чи існує файл | |
if not Path(file_path).exists(): | |
logger.warning(f"Файл не знайдено: {file_path}") | |
results.append((file_path, None, False)) | |
continue | |
# Завантажуємо файл | |
df = pd.read_csv(file_path) | |
# Обробка дат | |
for date_col in ['Created', 'Updated', 'Resolved', 'Due Date']: | |
if date_col in df.columns: | |
df[date_col] = pd.to_datetime(df[date_col], format='%Y-%m-%dT%H:%M:%S', errors='coerce') | |
# Підготовка додаткових колонок для аналізу | |
if 'Created' in df.columns and pd.api.types.is_datetime64_dtype(df[date_col]): | |
df['Created_Date'] = df['Created'].dt.date | |
df['Created_Month'] = df['Created'].dt.to_period('M') | |
if 'Updated' in df.columns and pd.api.types.is_datetime64_dtype(df[date_col]): | |
df['Updated_Date'] = df['Updated'].dt.date | |
df['Days_Since_Update'] = (datetime.now() - df['Updated']).dt.days | |
results.append((file_path, df, True)) | |
logger.info(f"Успішно завантажено файл {file_path}, {len(df)} рядків") | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні файлу {file_path}: {e}") | |
results.append((file_path, None, False)) | |
return results | |
def initialize_session_data(self, session_id, local_files, uploaded_file=None): | |
""" | |
Ініціалізація даних сесії з локальних та завантажених файлів. | |
Args: | |
session_id (str): Ідентифікатор сесії | |
local_files (list): Список шляхів до локальних файлів | |
uploaded_file (str, optional): Шлях до завантаженого файлу | |
Returns: | |
tuple: (success, result_info) - успішність операції та інформація про результат | |
""" | |
try: | |
# Копіюємо локальні файли до сесії | |
copied_files = self.copy_files_to_session(session_id, local_files) | |
# Додаємо завантажений файл, якщо він є | |
if uploaded_file and Path(uploaded_file).exists(): | |
# Копіюємо файл до сесії | |
session_data_dir = self.session_manager.get_session_data_dir(session_id) | |
if not session_data_dir: | |
return False, {"error": "Не вдалося отримати директорію даних сесії"} | |
# Створюємо унікальне ім'я для завантаженого файлу | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
dest_filename = f"uploaded_{timestamp}_{Path(uploaded_file).name}" | |
dest_path = session_data_dir / dest_filename | |
# Копіюємо файл | |
shutil.copyfile(uploaded_file, dest_path) | |
# Додаємо інформацію про файл до сесії | |
self.session_manager.add_data_file( | |
session_id, | |
str(dest_path), | |
file_type="uploaded", | |
description=f"Uploaded file: {Path(uploaded_file).name}" | |
) | |
copied_files.append(str(dest_path)) | |
# Якщо немає файлів для обробки, повертаємо помилку | |
if not copied_files: | |
return False, {"error": "Не вибрано жодного файлу для обробки"} | |
# Завантажуємо дані з усіх файлів | |
loaded_data = self.load_data_from_files(session_id, copied_files) | |
# Фільтруємо тільки успішно завантажені файли | |
valid_data = [(path, df) for path, df, success in loaded_data if success and df is not None] | |
if not valid_data: | |
return False, {"error": "Не вдалося завантажити жодного файлу"} | |
# Отримуємо список DataFrame | |
dataframes = [df for _, df in valid_data] | |
# Об'єднуємо дані | |
merged_df, output_path = self.merge_dataframes( | |
session_id, | |
dataframes, | |
output_name=f"merged_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" | |
) | |
if merged_df is None or not output_path: | |
return False, {"error": "Не вдалося об'єднати дані"} | |
result_info = { | |
"merged_file": output_path, | |
"rows_count": len(merged_df), | |
"columns_count": len(merged_df.columns), | |
"source_files_count": len(valid_data), | |
"merged_df": merged_df # Передаємо DataFrame для подальшого використання | |
} | |
return True, result_info | |
except Exception as e: | |
logger.error(f"Помилка при ініціалізації даних сесії {session_id}: {e}") | |
return False, {"error": f"Помилка при ініціалізації даних: {str(e)}"} | |
def get_file_preview(self, file_path, max_rows=10): | |
""" | |
Отримання попереднього перегляду файлу CSV. | |
Args: | |
file_path (str): Шлях до файлу | |
max_rows (int): Максимальна кількість рядків для перегляду | |
Returns: | |
dict: Словник з інформацією про файл та його вмістом | |
""" | |
try: | |
if not Path(file_path).exists(): | |
return {"error": f"Файл не знайдено: {file_path}"} | |
# Зчитуємо перші max_rows рядків | |
df = pd.read_csv(file_path, nrows=max_rows) | |
# Отримуємо інформацію про файл | |
file_stat = Path(file_path).stat() | |
size_kb = file_stat.st_size / 1024 | |
modified = datetime.fromtimestamp(file_stat.st_mtime).strftime('%Y-%m-%d %H:%M:%S') | |
# Підраховуємо загальну кількість рядків (обережно з великими файлами) | |
total_rows = sum(1 for _ in open(file_path, 'r')) - 1 # -1 для заголовка | |
# Формуємо результат | |
result = { | |
"filename": Path(file_path).name, | |
"path": file_path, | |
"size_kb": round(size_kb, 2), | |
"modified": modified, | |
"total_rows": total_rows, | |
"columns": df.columns.tolist(), | |
"columns_count": len(df.columns), | |
"preview_rows": df.to_dict('records') | |
} | |
return result | |
except Exception as e: | |
logger.error(f"Помилка при отриманні попереднього перегляду файлу {file_path}: {e}") | |
return {"error": f"Помилка при читанні файлу: {str(e)}"} | |
def cleanup_temp_data(self): | |
""" | |
Очищення тимчасових даних, крім файлів у папці current_data. | |
Returns: | |
dict: Інформація про результати очищення | |
""" | |
try: | |
import shutil | |
import os | |
from pathlib import Path | |
cleanup_stats = { | |
"temp_files_removed": 0, | |
"session_dirs_removed": 0, | |
"indices_dirs_removed": 0, | |
"reports_removed": 0, | |
"temp_directories": [] | |
} | |
# Очищення тимчасових індексів | |
indices_dir = Path("temp/indices") | |
if indices_dir.exists(): | |
for item in indices_dir.iterdir(): | |
if item.is_dir(): | |
try: | |
shutil.rmtree(item) | |
cleanup_stats["indices_dirs_removed"] += 1 | |
except Exception as e: | |
logger.error(f"Помилка при видаленні директорії індексів {item}: {e}") | |
# Очищення тимчасових сесій | |
sessions_dir = Path("temp/sessions") | |
if sessions_dir.exists(): | |
for item in sessions_dir.iterdir(): | |
if item.is_dir(): | |
try: | |
shutil.rmtree(item) | |
cleanup_stats["session_dirs_removed"] += 1 | |
except Exception as e: | |
logger.error(f"Помилка при видаленні директорії сесій {item}: {e}") | |
# Очищення інших файлів у temp | |
temp_dir = Path("temp") | |
if temp_dir.exists(): | |
for item in temp_dir.iterdir(): | |
if item.is_file(): | |
try: | |
item.unlink() | |
cleanup_stats["temp_files_removed"] += 1 | |
except Exception as e: | |
logger.error(f"Помилка при видаленні файлу {item}: {e}") | |
# Очищення тимчасових звітів | |
reports_dir = Path("reports") | |
if reports_dir.exists(): | |
reports_count = 0 | |
# Видаляємо файли у головній директорії reports | |
for item in reports_dir.iterdir(): | |
if item.is_file(): | |
try: | |
item.unlink() | |
reports_count += 1 | |
except Exception as e: | |
logger.error(f"Помилка при видаленні звіту {item}: {e}") | |
# Перевіряємо і очищаємо підпапку візуалізацій | |
viz_dir = reports_dir / "visualizations" | |
if viz_dir.exists(): | |
for item in viz_dir.iterdir(): | |
if item.is_file(): | |
try: | |
item.unlink() | |
reports_count += 1 | |
except Exception as e: | |
logger.error(f"Помилка при видаленні візуалізації {item}: {e}") | |
cleanup_stats["reports_removed"] = reports_count | |
# Запам'ятовуємо всі очищені директорії | |
cleanup_stats["temp_directories"] = ["temp/indices", "temp/sessions", "reports", "temp"] | |
# Створюємо наново всі необхідні директорії | |
for directory in ["temp", "temp/indices", "temp/sessions", "reports", "reports/visualizations"]: | |
Path(directory).mkdir(exist_ok=True, parents=True) | |
logger.info(f"Тимчасові дані успішно очищено: {cleanup_stats}") | |
return { | |
"success": True, | |
"stats": cleanup_stats | |
} | |
except Exception as e: | |
logger.error(f"Помилка при очищенні тимчасових даних: {e}") | |
return { | |
"success": False, | |
"error": str(e) | |
} | |
# Додано функцію в модуль для обробка дат | |
def safe_strftime(date_value, format_str="%Y-%m-%d"): | |
"""Безпечне форматування дати з обробкою None та NaT значень.""" | |
import pandas as pd | |
if date_value is None or pd.isna(date_value): | |
return "Н/Д" # або будь-яке інше значення за замовчуванням | |
try: | |
return date_value.strftime(format_str) | |
except Exception: | |
return "Неправильна дата" |