DocUA's picture
Єдиний коміт - очищення історії
4ad5efa
import pandas as pd
from datetime import datetime
import logging
import os
from pathlib import Path
import io
import hashlib
logger = logging.getLogger(__name__)
class JiraCsvImporter:
"""
Клас для імпорту даних з CSV-файлів Jira
"""
def __init__(self, file_path):
"""
Ініціалізація імпортера CSV.
Args:
file_path (str): Шлях до CSV-файлу
"""
self.file_path = file_path
self.df = None
self.file_hash = None
def load_data(self):
"""
Завантаження даних з CSV-файлу.
Returns:
pandas.DataFrame: Завантажені дані або None у випадку помилки
"""
try:
logger.info(f"Завантаження CSV-файлу: {self.file_path}")
print(f"Завантаження CSV-файлу: {self.file_path}") # Додаткове логування в консоль
# Перевірка існування файлу
if not os.path.exists(self.file_path):
logger.error(f"Файл не знайдено: {self.file_path}")
print(f"Файл не знайдено: {self.file_path}")
return None
# Перевірка розміру файлу
file_size = os.path.getsize(self.file_path)
logger.info(f"Розмір файлу: {file_size} байт")
if file_size == 0:
logger.error("Файл порожній")
return None
# Генеруємо хеш файлу для відстеження змін
self.file_hash = self._generate_file_hash()
if self.file_hash:
logger.info(f"Згенеровано хеш CSV файлу: {self.file_hash}")
# Додаткове логування дозволів на файл
try:
import stat
st = os.stat(self.file_path)
permissions = stat.filemode(st.st_mode)
logger.info(f"Дозволи файлу: {permissions}")
except Exception as e:
logger.warning(f"Не вдалося отримати дозволи файлу: {e}")
# Спробуємо різні методи зчитування файлу
success = False
# Метод 1: Стандартний pandas.read_csv
try:
self.df = pd.read_csv(self.file_path)
logger.info("Метод 1 (стандартний read_csv) успішний")
success = True
except Exception as e1:
logger.warning(f"Помилка методу 1: {e1}")
# Метод 2: Явно вказуємо кодування
if not success:
try:
self.df = pd.read_csv(self.file_path, encoding='utf-8')
logger.info("Метод 2 (utf-8) успішний")
success = True
except Exception as e2:
logger.warning(f"Помилка методу 2: {e2}")
# Метод 3: Альтернативне кодування
if not success:
try:
self.df = pd.read_csv(self.file_path, encoding='latin1')
logger.info("Метод 3 (latin1) успішний")
success = True
except Exception as e3:
logger.warning(f"Помилка методу 3: {e3}")
# Метод 4: Читаємо вміст файлу та використовуємо StringIO
if not success:
try:
with open(self.file_path, 'rb') as f:
content = f.read()
self.df = pd.read_csv(io.StringIO(content.decode('utf-8', errors='replace')))
logger.info("Метод 4 (StringIO з utf-8 і errors='replace') успішний")
success = True
except Exception as e4:
logger.warning(f"Помилка методу 4: {e4}")
# Метод 5: Спроба з latin1 і StringIO
if not success:
try:
with open(self.file_path, 'rb') as f:
content = f.read()
self.df = pd.read_csv(io.StringIO(content.decode('latin1', errors='replace')))
logger.info("Метод 5 (StringIO з latin1 і errors='replace') успішний")
success = True
except Exception as e5:
logger.warning(f"Помилка методу 5: {e5}")
if not success:
logger.error("Всі методи зчитування файлу невдалі")
return None
# Відображення наявних колонок для діагностики
print(f"Наявні колонки: {self.df.columns.tolist()}")
print(f"Кількість рядків: {len(self.df)}")
logger.info(f"Наявні колонки: {self.df.columns.tolist()}")
logger.info(f"Кількість рядків: {len(self.df)}")
# Обробка дат
self._process_dates()
# Очищення та підготовка даних
self._clean_data()
# Перевіряємо наявність індексів для цього CSV
if self.file_hash:
# Перевіряємо та оновлюємо метадані файлу
self._check_indices_metadata()
logger.info(f"Успішно завантажено {len(self.df)} записів")
print(f"Успішно завантажено {len(self.df)} записів")
return self.df
except Exception as e:
logger.error(f"Помилка при завантаженні CSV-файлу: {e}")
import traceback
error_details = traceback.format_exc()
print(f"Помилка при завантаженні CSV-файлу: {e}")
print(f"Деталі помилки: {error_details}")
logger.error(error_details)
return None
def _generate_file_hash(self):
"""
Генерує хеш для CSV файлу на основі його вмісту
Returns:
str: Хеш файлу або None у випадку помилки
"""
try:
# Читаємо файл блоками для ефективного хешування великих файлів
sha256 = hashlib.sha256()
with open(self.file_path, "rb") as f:
for byte_block in iter(lambda: f.read(4096), b""):
sha256.update(byte_block)
return sha256.hexdigest()
except Exception as e:
logger.error(f"Помилка при генерації хешу CSV: {e}")
return None
def _check_indices_metadata(self):
"""
Перевіряє наявність індексів для поточного CSV файлу
та оновлює метадані при необхідності.
"""
try:
import json
from pathlib import Path
# Шлях до директорії індексів
indices_dir = Path("temp/indices")
if not indices_dir.exists():
return
# Отримання списку піддиректорій з індексами
subdirs = [d for d in indices_dir.iterdir() if d.is_dir()]
if not subdirs:
return
# Перевіряємо кожну директорію на відповідність хешу
for directory in subdirs:
metadata_path = directory / "metadata.json"
if metadata_path.exists():
try:
with open(metadata_path, "r", encoding="utf-8") as f:
metadata = json.load(f)
# Якщо знайдено відповідні індекси, додаємо інформацію про колонки
if "csv_hash" in metadata and metadata["csv_hash"] == self.file_hash:
# Оновлюємо інформацію про колонки
metadata["columns"] = self.df.columns.tolist()
metadata["rows_count"] = len(self.df)
metadata["last_used"] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
# Оновлюємо файл метаданих
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, ensure_ascii=False, indent=2)
logger.info(f"Оновлено метадані для індексів: {directory}")
break
except Exception as md_err:
logger.warning(f"Помилка при перевірці метаданих {metadata_path}: {md_err}")
except Exception as e:
logger.warning(f"Помилка при перевірці метаданих індексів: {e}")
def _check_required_columns(self):
"""
Перевірка наявності необхідних колонок у CSV-файлі.
Returns:
bool: True, якщо всі необхідні колонки присутні
"""
# Основні колонки, які очікуються у файлі Jira
basic_columns = ['Summary', 'Issue key', 'Status', 'Issue Type', 'Priority', 'Created', 'Updated']
# Альтернативні назви колонок
alternative_columns = {
'Summary': ['Summary', 'Короткий опис'],
'Issue key': ['Issue key', 'Key', 'Ключ'],
'Status': ['Status', 'Статус'],
'Issue Type': ['Issue Type', 'Type', 'Тип'],
'Priority': ['Priority', 'Пріоритет'],
'Created': ['Created', 'Створено'],
'Updated': ['Updated', 'Оновлено']
}
# Перевірка наявності колонок
missing_columns = []
for col in basic_columns:
found = False
# Перевірка основної назви
if col in self.df.columns:
found = True
else:
# Перевірка альтернативних назв
for alt_col in alternative_columns.get(col, []):
if alt_col in self.df.columns:
# Перейменування колонки до стандартного імені
self.df.rename(columns={alt_col: col}, inplace=True)
found = True
break
if not found:
missing_columns.append(col)
if missing_columns:
logger.warning(f"Відсутні колонки: {', '.join(missing_columns)}")
print(f"Відсутні колонки: {', '.join(missing_columns)}")
return False
return True
def _process_dates(self):
"""
Обробка дат у DataFrame.
"""
try:
# Перетворення колонок з датами
date_columns = ['Created', 'Updated', 'Resolved', 'Due Date']
for col in date_columns:
if col in self.df.columns:
try:
self.df[col] = pd.to_datetime(self.df[col], errors='coerce')
print(f"Колонку {col} успішно конвертовано до datetime")
except Exception as e:
logger.warning(f"Не вдалося конвертувати колонку {col} до datetime: {e}")
print(f"Не вдалося конвертувати колонку {col} до datetime: {e}")
except Exception as e:
logger.error(f"Помилка при обробці дат: {e}")
print(f"Помилка при обробці дат: {e}")
def _clean_data(self):
"""
Очищення та підготовка даних.
"""
try:
# Видалення порожніх рядків
if 'Issue key' in self.df.columns:
self.df.dropna(subset=['Issue key'], inplace=True)
print(f"Видалено порожні рядки за колонкою 'Issue key'")
# Додаткова обробка даних
if 'Status' in self.df.columns:
self.df['Status'] = self.df['Status'].fillna('Unknown')
print(f"Заповнено відсутні значення в колонці 'Status'")
if 'Priority' in self.df.columns:
self.df['Priority'] = self.df['Priority'].fillna('Not set')
print(f"Заповнено відсутні значення в колонці 'Priority'")
# Створення додаткових колонок для аналізу
if 'Created' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Created']):
self.df['Created_Date'] = self.df['Created'].dt.date
self.df['Created_Month'] = self.df['Created'].dt.to_period('M')
print(f"Створено додаткові колонки для дат створення")
if 'Updated' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Updated']):
self.df['Updated_Date'] = self.df['Updated'].dt.date
self.df['Days_Since_Update'] = (datetime.now() - self.df['Updated']).dt.days
print(f"Створено додаткові колонки для дат оновлення")
except Exception as e:
logger.error(f"Помилка при очищенні даних: {e}")
print(f"Помилка при очищенні даних: {e}")
def export_to_csv(self, output_path=None):
"""
Експорт оброблених даних у CSV-файл.
Args:
output_path (str): Шлях для збереження файлу. Якщо None, створюється автоматично.
Returns:
str: Шлях до збереженого файлу або None у випадку помилки
"""
if self.df is None:
logger.error("Немає даних для експорту")
return None
try:
if output_path is None:
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
output_dir = Path("exported_data")
output_dir.mkdir(exist_ok=True)
output_path = output_dir / f"jira_data_{timestamp}.csv"
self.df.to_csv(output_path, index=False, encoding='utf-8')
logger.info(f"Дані успішно експортовано у {output_path}")
return str(output_path)
except Exception as e:
logger.error(f"Помилка при експорті даних: {e}")
return None