Spaces:
Running
Running
import pandas as pd | |
from datetime import datetime | |
import logging | |
import os | |
from pathlib import Path | |
import io | |
import hashlib | |
logger = logging.getLogger(__name__) | |
class JiraCsvImporter: | |
""" | |
Клас для імпорту даних з CSV-файлів Jira | |
""" | |
def __init__(self, file_path): | |
""" | |
Ініціалізація імпортера CSV. | |
Args: | |
file_path (str): Шлях до CSV-файлу | |
""" | |
self.file_path = file_path | |
self.df = None | |
self.file_hash = None | |
def load_data(self): | |
""" | |
Завантаження даних з CSV-файлу. | |
Returns: | |
pandas.DataFrame: Завантажені дані або None у випадку помилки | |
""" | |
try: | |
logger.info(f"Завантаження CSV-файлу: {self.file_path}") | |
print(f"Завантаження CSV-файлу: {self.file_path}") # Додаткове логування в консоль | |
# Перевірка існування файлу | |
if not os.path.exists(self.file_path): | |
logger.error(f"Файл не знайдено: {self.file_path}") | |
print(f"Файл не знайдено: {self.file_path}") | |
return None | |
# Перевірка розміру файлу | |
file_size = os.path.getsize(self.file_path) | |
logger.info(f"Розмір файлу: {file_size} байт") | |
if file_size == 0: | |
logger.error("Файл порожній") | |
return None | |
# Генеруємо хеш файлу для відстеження змін | |
self.file_hash = self._generate_file_hash() | |
if self.file_hash: | |
logger.info(f"Згенеровано хеш CSV файлу: {self.file_hash}") | |
# Додаткове логування дозволів на файл | |
try: | |
import stat | |
st = os.stat(self.file_path) | |
permissions = stat.filemode(st.st_mode) | |
logger.info(f"Дозволи файлу: {permissions}") | |
except Exception as e: | |
logger.warning(f"Не вдалося отримати дозволи файлу: {e}") | |
# Спробуємо різні методи зчитування файлу | |
success = False | |
# Метод 1: Стандартний pandas.read_csv | |
try: | |
self.df = pd.read_csv(self.file_path) | |
logger.info("Метод 1 (стандартний read_csv) успішний") | |
success = True | |
except Exception as e1: | |
logger.warning(f"Помилка методу 1: {e1}") | |
# Метод 2: Явно вказуємо кодування | |
if not success: | |
try: | |
self.df = pd.read_csv(self.file_path, encoding='utf-8') | |
logger.info("Метод 2 (utf-8) успішний") | |
success = True | |
except Exception as e2: | |
logger.warning(f"Помилка методу 2: {e2}") | |
# Метод 3: Альтернативне кодування | |
if not success: | |
try: | |
self.df = pd.read_csv(self.file_path, encoding='latin1') | |
logger.info("Метод 3 (latin1) успішний") | |
success = True | |
except Exception as e3: | |
logger.warning(f"Помилка методу 3: {e3}") | |
# Метод 4: Читаємо вміст файлу та використовуємо StringIO | |
if not success: | |
try: | |
with open(self.file_path, 'rb') as f: | |
content = f.read() | |
self.df = pd.read_csv(io.StringIO(content.decode('utf-8', errors='replace'))) | |
logger.info("Метод 4 (StringIO з utf-8 і errors='replace') успішний") | |
success = True | |
except Exception as e4: | |
logger.warning(f"Помилка методу 4: {e4}") | |
# Метод 5: Спроба з latin1 і StringIO | |
if not success: | |
try: | |
with open(self.file_path, 'rb') as f: | |
content = f.read() | |
self.df = pd.read_csv(io.StringIO(content.decode('latin1', errors='replace'))) | |
logger.info("Метод 5 (StringIO з latin1 і errors='replace') успішний") | |
success = True | |
except Exception as e5: | |
logger.warning(f"Помилка методу 5: {e5}") | |
if not success: | |
logger.error("Всі методи зчитування файлу невдалі") | |
return None | |
# Відображення наявних колонок для діагностики | |
print(f"Наявні колонки: {self.df.columns.tolist()}") | |
print(f"Кількість рядків: {len(self.df)}") | |
logger.info(f"Наявні колонки: {self.df.columns.tolist()}") | |
logger.info(f"Кількість рядків: {len(self.df)}") | |
# Обробка дат | |
self._process_dates() | |
# Очищення та підготовка даних | |
self._clean_data() | |
# Перевіряємо наявність індексів для цього CSV | |
if self.file_hash: | |
# Перевіряємо та оновлюємо метадані файлу | |
self._check_indices_metadata() | |
logger.info(f"Успішно завантажено {len(self.df)} записів") | |
print(f"Успішно завантажено {len(self.df)} записів") | |
return self.df | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні CSV-файлу: {e}") | |
import traceback | |
error_details = traceback.format_exc() | |
print(f"Помилка при завантаженні CSV-файлу: {e}") | |
print(f"Деталі помилки: {error_details}") | |
logger.error(error_details) | |
return None | |
def _generate_file_hash(self): | |
""" | |
Генерує хеш для CSV файлу на основі його вмісту | |
Returns: | |
str: Хеш файлу або None у випадку помилки | |
""" | |
try: | |
# Читаємо файл блоками для ефективного хешування великих файлів | |
sha256 = hashlib.sha256() | |
with open(self.file_path, "rb") as f: | |
for byte_block in iter(lambda: f.read(4096), b""): | |
sha256.update(byte_block) | |
return sha256.hexdigest() | |
except Exception as e: | |
logger.error(f"Помилка при генерації хешу CSV: {e}") | |
return None | |
def _check_indices_metadata(self): | |
""" | |
Перевіряє наявність індексів для поточного CSV файлу | |
та оновлює метадані при необхідності. | |
""" | |
try: | |
import json | |
from pathlib import Path | |
# Шлях до директорії індексів | |
indices_dir = Path("temp/indices") | |
if not indices_dir.exists(): | |
return | |
# Отримання списку піддиректорій з індексами | |
subdirs = [d for d in indices_dir.iterdir() if d.is_dir()] | |
if not subdirs: | |
return | |
# Перевіряємо кожну директорію на відповідність хешу | |
for directory in subdirs: | |
metadata_path = directory / "metadata.json" | |
if metadata_path.exists(): | |
try: | |
with open(metadata_path, "r", encoding="utf-8") as f: | |
metadata = json.load(f) | |
# Якщо знайдено відповідні індекси, додаємо інформацію про колонки | |
if "csv_hash" in metadata and metadata["csv_hash"] == self.file_hash: | |
# Оновлюємо інформацію про колонки | |
metadata["columns"] = self.df.columns.tolist() | |
metadata["rows_count"] = len(self.df) | |
metadata["last_used"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S') | |
# Оновлюємо файл метаданих | |
with open(metadata_path, "w", encoding="utf-8") as f: | |
json.dump(metadata, f, ensure_ascii=False, indent=2) | |
logger.info(f"Оновлено метадані для індексів: {directory}") | |
break | |
except Exception as md_err: | |
logger.warning(f"Помилка при перевірці метаданих {metadata_path}: {md_err}") | |
except Exception as e: | |
logger.warning(f"Помилка при перевірці метаданих індексів: {e}") | |
def _check_required_columns(self): | |
""" | |
Перевірка наявності необхідних колонок у CSV-файлі. | |
Returns: | |
bool: True, якщо всі необхідні колонки присутні | |
""" | |
# Основні колонки, які очікуються у файлі Jira | |
basic_columns = ['Summary', 'Issue key', 'Status', 'Issue Type', 'Priority', 'Created', 'Updated'] | |
# Альтернативні назви колонок | |
alternative_columns = { | |
'Summary': ['Summary', 'Короткий опис'], | |
'Issue key': ['Issue key', 'Key', 'Ключ'], | |
'Status': ['Status', 'Статус'], | |
'Issue Type': ['Issue Type', 'Type', 'Тип'], | |
'Priority': ['Priority', 'Пріоритет'], | |
'Created': ['Created', 'Створено'], | |
'Updated': ['Updated', 'Оновлено'] | |
} | |
# Перевірка наявності колонок | |
missing_columns = [] | |
for col in basic_columns: | |
found = False | |
# Перевірка основної назви | |
if col in self.df.columns: | |
found = True | |
else: | |
# Перевірка альтернативних назв | |
for alt_col in alternative_columns.get(col, []): | |
if alt_col in self.df.columns: | |
# Перейменування колонки до стандартного імені | |
self.df.rename(columns={alt_col: col}, inplace=True) | |
found = True | |
break | |
if not found: | |
missing_columns.append(col) | |
if missing_columns: | |
logger.warning(f"Відсутні колонки: {', '.join(missing_columns)}") | |
print(f"Відсутні колонки: {', '.join(missing_columns)}") | |
return False | |
return True | |
def _process_dates(self): | |
""" | |
Обробка дат у DataFrame. | |
""" | |
try: | |
# Перетворення колонок з датами | |
date_columns = ['Created', 'Updated', 'Resolved', 'Due Date'] | |
for col in date_columns: | |
if col in self.df.columns: | |
try: | |
self.df[col] = pd.to_datetime(self.df[col], errors='coerce') | |
print(f"Колонку {col} успішно конвертовано до datetime") | |
except Exception as e: | |
logger.warning(f"Не вдалося конвертувати колонку {col} до datetime: {e}") | |
print(f"Не вдалося конвертувати колонку {col} до datetime: {e}") | |
except Exception as e: | |
logger.error(f"Помилка при обробці дат: {e}") | |
print(f"Помилка при обробці дат: {e}") | |
def _clean_data(self): | |
""" | |
Очищення та підготовка даних. | |
""" | |
try: | |
# Видалення порожніх рядків | |
if 'Issue key' in self.df.columns: | |
self.df.dropna(subset=['Issue key'], inplace=True) | |
print(f"Видалено порожні рядки за колонкою 'Issue key'") | |
# Додаткова обробка даних | |
if 'Status' in self.df.columns: | |
self.df['Status'] = self.df['Status'].fillna('Unknown') | |
print(f"Заповнено відсутні значення в колонці 'Status'") | |
if 'Priority' in self.df.columns: | |
self.df['Priority'] = self.df['Priority'].fillna('Not set') | |
print(f"Заповнено відсутні значення в колонці 'Priority'") | |
# Створення додаткових колонок для аналізу | |
if 'Created' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Created']): | |
self.df['Created_Date'] = self.df['Created'].dt.date | |
self.df['Created_Month'] = self.df['Created'].dt.to_period('M') | |
print(f"Створено додаткові колонки для дат створення") | |
if 'Updated' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Updated']): | |
self.df['Updated_Date'] = self.df['Updated'].dt.date | |
self.df['Days_Since_Update'] = (datetime.now() - self.df['Updated']).dt.days | |
print(f"Створено додаткові колонки для дат оновлення") | |
except Exception as e: | |
logger.error(f"Помилка при очищенні даних: {e}") | |
print(f"Помилка при очищенні даних: {e}") | |
def export_to_csv(self, output_path=None): | |
""" | |
Експорт оброблених даних у CSV-файл. | |
Args: | |
output_path (str): Шлях для збереження файлу. Якщо None, створюється автоматично. | |
Returns: | |
str: Шлях до збереженого файлу або None у випадку помилки | |
""" | |
if self.df is None: | |
logger.error("Немає даних для експорту") | |
return None | |
try: | |
if output_path is None: | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
output_dir = Path("exported_data") | |
output_dir.mkdir(exist_ok=True) | |
output_path = output_dir / f"jira_data_{timestamp}.csv" | |
self.df.to_csv(output_path, index=False, encoding='utf-8') | |
logger.info(f"Дані успішно експортовано у {output_path}") | |
return str(output_path) | |
except Exception as e: | |
logger.error(f"Помилка при експорті даних: {e}") | |
return None |