DocUA's picture
Initial commit
a7174ff
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class JiraDataAnalyzer:
"""
Клас для аналізу даних Jira
"""
def __init__(self, df):
"""
Ініціалізація аналізатора даних.
Args:
df (pandas.DataFrame): DataFrame з даними Jira
"""
self.df = df
def generate_basic_statistics(self):
"""
Генерація базової статистики по даним Jira.
Returns:
dict: Словник з базовою статистикою
"""
try:
stats = {
'total_tickets': len(self.df),
'status_counts': {},
'type_counts': {},
'priority_counts': {},
'assignee_counts': {},
'created_stats': {},
'updated_stats': {}
}
# Статистика за статусами
if 'Status' in self.df.columns:
status_counts = self.df['Status'].value_counts()
stats['status_counts'] = status_counts.to_dict()
# Статистика за типами
if 'Issue Type' in self.df.columns:
type_counts = self.df['Issue Type'].value_counts()
stats['type_counts'] = type_counts.to_dict()
# Статистика за пріоритетами
if 'Priority' in self.df.columns:
priority_counts = self.df['Priority'].value_counts()
stats['priority_counts'] = priority_counts.to_dict()
# Статистика за призначеними користувачами
if 'Assignee' in self.df.columns:
assignee_counts = self.df['Assignee'].value_counts().head(10) # Топ 10
stats['assignee_counts'] = assignee_counts.to_dict()
# Статистика за часом створення
if 'Created' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Created']):
created_min = self.df['Created'].min()
created_max = self.df['Created'].max()
# Групування за місяцями
if 'Created_Month' in self.df.columns:
created_by_month = self.df['Created_Month'].value_counts().sort_index()
stats['created_by_month'] = {str(k): v for k, v in created_by_month.items()}
stats['created_stats'] = {
'min': created_min.strftime('%Y-%m-%d') if created_min else None,
'max': created_max.strftime('%Y-%m-%d') if created_max else None,
'last_7_days': len(self.df[self.df['Created'] > (datetime.now() - timedelta(days=7))])
}
# Статистика за часом оновлення
if 'Updated' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Updated']):
updated_min = self.df['Updated'].min()
updated_max = self.df['Updated'].max()
stats['updated_stats'] = {
'min': updated_min.strftime('%Y-%m-%d') if updated_min else None,
'max': updated_max.strftime('%Y-%m-%d') if updated_max else None,
'last_7_days': len(self.df[self.df['Updated'] > (datetime.now() - timedelta(days=7))])
}
logger.info("Базова статистика успішно згенерована")
return stats
except Exception as e:
logger.error(f"Помилка при генерації базової статистики: {e}")
return {'error': str(e)}
def analyze_inactive_issues(self, days=14):
"""
Аналіз неактивних тікетів (не оновлювались протягом певної кількості днів).
Args:
days (int): Кількість днів неактивності
Returns:
dict: Інформація про неактивні тікети
"""
try:
if 'Updated' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Updated']):
logger.warning("Колонка 'Updated' відсутня або не містить дат")
return {'error': "Неможливо аналізувати неактивні тікети"}
# Визначення неактивних тікетів
cutoff_date = datetime.now() - timedelta(days=days)
inactive_issues = self.df[self.df['Updated'] < cutoff_date]
inactive_data = {
'total_count': len(inactive_issues),
'percentage': round(len(inactive_issues) / len(self.df) * 100, 2) if len(self.df) > 0 else 0,
'by_status': {},
'by_priority': {},
'top_inactive': []
}
# Розподіл за статусами
if 'Status' in inactive_issues.columns:
inactive_data['by_status'] = inactive_issues['Status'].value_counts().to_dict()
# Розподіл за пріоритетами
if 'Priority' in inactive_issues.columns:
inactive_data['by_priority'] = inactive_issues['Priority'].value_counts().to_dict()
# Топ 5 неактивних тікетів
if len(inactive_issues) > 0:
top_inactive = inactive_issues.sort_values('Updated', ascending=True).head(5)
for _, row in top_inactive.iterrows():
issue_data = {
'key': row.get('Issue key', 'Unknown'),
'summary': row.get('Summary', 'Unknown'),
'status': row.get('Status', 'Unknown'),
'last_updated': row['Updated'].strftime('%Y-%m-%d') if pd.notna(row['Updated']) else 'Unknown',
'days_inactive': (datetime.now() - row['Updated']).days if pd.notna(row['Updated']) else 'Unknown'
}
inactive_data['top_inactive'].append(issue_data)
logger.info(f"Знайдено {len(inactive_issues)} неактивних тікетів (>{days} днів)")
return inactive_data
except Exception as e:
logger.error(f"Помилка при аналізі неактивних тікетів: {e}")
return {'error': str(e)}
def analyze_timeline(self):
"""
Аналіз часової шкали проекту (зміна стану тікетів з часом).
Returns:
pandas.DataFrame: Дані для візуалізації або None у випадку помилки
"""
try:
if 'Created' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Created']):
logger.warning("Колонка 'Created' відсутня або не містить дат")
return None
if 'Updated' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Updated']):
logger.warning("Колонка 'Updated' відсутня або не містить дат")
return None
# Визначення часового діапазону
min_date = self.df['Created'].min().date()
max_date = self.df['Updated'].max().date()
# Створення часового ряду для кожного дня
date_range = pd.date_range(start=min_date, end=max_date, freq='D')
# Збір статистики для кожної дати
timeline_data = []
for date in date_range:
date_str = date.strftime('%Y-%m-%d')
# Тікети, створені до цієї дати
created_until = self.df[self.df['Created'].dt.date <= date.date()]
# Статуси тікетів на цю дату
status_counts = {}
# Для кожного тікета визначаємо його статус на цю дату
for _, row in created_until.iterrows():
# Якщо тікет був оновлений після цієї дати, використовуємо його поточний статус
if row['Updated'].date() >= date.date():
status = row.get('Status', 'Unknown')
status_counts[status] = status_counts.get(status, 0) + 1
# Додаємо запис для цієї дати
timeline_data.append({
'Date': date_str,
'Total': len(created_until),
**status_counts
})
# Створення DataFrame
timeline_df = pd.DataFrame(timeline_data)
logger.info("Часова шкала успішно проаналізована")
return timeline_df
except Exception as e:
logger.error(f"Помилка при аналізі часової шкали: {e}")
return None
def analyze_lead_time(self):
"""
Аналіз часу виконання тікетів (Lead Time).
Returns:
dict: Статистика по часу виконання
"""
try:
if 'Created' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Created']):
logger.warning("Колонка 'Created' відсутня або не містить дат")
return {'error': "Неможливо аналізувати час виконання"}
if 'Resolved' not in self.df.columns:
logger.warning("Колонка 'Resolved' відсутня")
return {'error': "Неможливо аналізувати час виконання"}
# Конвертація колонки Resolved до datetime, якщо потрібно
if not pd.api.types.is_datetime64_dtype(self.df['Resolved']):
self.df['Resolved'] = pd.to_datetime(self.df['Resolved'], errors='coerce')
# Фільтрація завершених тікетів
completed_issues = self.df.dropna(subset=['Resolved'])
if len(completed_issues) == 0:
logger.warning("Немає завершених тікетів для аналізу")
return {'total_count': 0}
# Обчислення Lead Time (в днях)
completed_issues['Lead_Time_Days'] = (completed_issues['Resolved'] - completed_issues['Created']).dt.days
# Фільтрація некоректних значень
valid_lead_time = completed_issues[completed_issues['Lead_Time_Days'] >= 0]
lead_time_stats = {
'total_count': len(valid_lead_time),
'avg_lead_time': round(valid_lead_time['Lead_Time_Days'].mean(), 2),
'median_lead_time': round(valid_lead_time['Lead_Time_Days'].median(), 2),
'min_lead_time': valid_lead_time['Lead_Time_Days'].min(),
'max_lead_time': valid_lead_time['Lead_Time_Days'].max(),
'by_type': {},
'by_priority': {}
}
# Розподіл за типами
if 'Issue Type' in valid_lead_time.columns:
lead_time_by_type = valid_lead_time.groupby('Issue Type')['Lead_Time_Days'].mean().round(2)
lead_time_stats['by_type'] = lead_time_by_type.to_dict()
# Розподіл за пріоритетами
if 'Priority' in valid_lead_time.columns:
lead_time_by_priority = valid_lead_time.groupby('Priority')['Lead_Time_Days'].mean().round(2)
lead_time_stats['by_priority'] = lead_time_by_priority.to_dict()
logger.info("Час виконання успішно проаналізований")
return lead_time_stats
except Exception as e:
logger.error(f"Помилка при аналізі часу виконання: {e}")
return {'error': str(e)}