Spaces:
Runtime error
Runtime error
import pandas as pd | |
import numpy as np | |
from datetime import datetime, timedelta | |
import logging | |
logger = logging.getLogger(__name__) | |
class JiraDataAnalyzer: | |
""" | |
Клас для аналізу даних Jira | |
""" | |
def __init__(self, df): | |
""" | |
Ініціалізація аналізатора даних. | |
Args: | |
df (pandas.DataFrame): DataFrame з даними Jira | |
""" | |
self.df = df | |
def generate_basic_statistics(self): | |
""" | |
Генерація базової статистики по даним Jira. | |
Returns: | |
dict: Словник з базовою статистикою | |
""" | |
try: | |
stats = { | |
'total_tickets': len(self.df), | |
'status_counts': {}, | |
'type_counts': {}, | |
'priority_counts': {}, | |
'assignee_counts': {}, | |
'created_stats': {}, | |
'updated_stats': {} | |
} | |
# Статистика за статусами | |
if 'Status' in self.df.columns: | |
status_counts = self.df['Status'].value_counts() | |
stats['status_counts'] = status_counts.to_dict() | |
# Статистика за типами | |
if 'Issue Type' in self.df.columns: | |
type_counts = self.df['Issue Type'].value_counts() | |
stats['type_counts'] = type_counts.to_dict() | |
# Статистика за пріоритетами | |
if 'Priority' in self.df.columns: | |
priority_counts = self.df['Priority'].value_counts() | |
stats['priority_counts'] = priority_counts.to_dict() | |
# Статистика за призначеними користувачами | |
if 'Assignee' in self.df.columns: | |
assignee_counts = self.df['Assignee'].value_counts().head(10) # Топ 10 | |
stats['assignee_counts'] = assignee_counts.to_dict() | |
# Статистика за часом створення | |
if 'Created' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Created']): | |
created_min = self.df['Created'].min() | |
created_max = self.df['Created'].max() | |
# Групування за місяцями | |
if 'Created_Month' in self.df.columns: | |
created_by_month = self.df['Created_Month'].value_counts().sort_index() | |
stats['created_by_month'] = {str(k): v for k, v in created_by_month.items()} | |
stats['created_stats'] = { | |
'min': created_min.strftime('%Y-%m-%d') if created_min else None, | |
'max': created_max.strftime('%Y-%m-%d') if created_max else None, | |
'last_7_days': len(self.df[self.df['Created'] > (datetime.now() - timedelta(days=7))]) | |
} | |
# Статистика за часом оновлення | |
if 'Updated' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Updated']): | |
updated_min = self.df['Updated'].min() | |
updated_max = self.df['Updated'].max() | |
stats['updated_stats'] = { | |
'min': updated_min.strftime('%Y-%m-%d') if updated_min else None, | |
'max': updated_max.strftime('%Y-%m-%d') if updated_max else None, | |
'last_7_days': len(self.df[self.df['Updated'] > (datetime.now() - timedelta(days=7))]) | |
} | |
logger.info("Базова статистика успішно згенерована") | |
return stats | |
except Exception as e: | |
logger.error(f"Помилка при генерації базової статистики: {e}") | |
return {'error': str(e)} | |
def analyze_inactive_issues(self, days=14): | |
""" | |
Аналіз неактивних тікетів (не оновлювались протягом певної кількості днів). | |
Args: | |
days (int): Кількість днів неактивності | |
Returns: | |
dict: Інформація про неактивні тікети | |
""" | |
try: | |
if 'Updated' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Updated']): | |
logger.warning("Колонка 'Updated' відсутня або не містить дат") | |
return {'error': "Неможливо аналізувати неактивні тікети"} | |
# Визначення неактивних тікетів | |
cutoff_date = datetime.now() - timedelta(days=days) | |
inactive_issues = self.df[self.df['Updated'] < cutoff_date] | |
inactive_data = { | |
'total_count': len(inactive_issues), | |
'percentage': round(len(inactive_issues) / len(self.df) * 100, 2) if len(self.df) > 0 else 0, | |
'by_status': {}, | |
'by_priority': {}, | |
'top_inactive': [] | |
} | |
# Розподіл за статусами | |
if 'Status' in inactive_issues.columns: | |
inactive_data['by_status'] = inactive_issues['Status'].value_counts().to_dict() | |
# Розподіл за пріоритетами | |
if 'Priority' in inactive_issues.columns: | |
inactive_data['by_priority'] = inactive_issues['Priority'].value_counts().to_dict() | |
# Топ 5 неактивних тікетів | |
if len(inactive_issues) > 0: | |
top_inactive = inactive_issues.sort_values('Updated', ascending=True).head(5) | |
for _, row in top_inactive.iterrows(): | |
issue_data = { | |
'key': row.get('Issue key', 'Unknown'), | |
'summary': row.get('Summary', 'Unknown'), | |
'status': row.get('Status', 'Unknown'), | |
'last_updated': row['Updated'].strftime('%Y-%m-%d') if pd.notna(row['Updated']) else 'Unknown', | |
'days_inactive': (datetime.now() - row['Updated']).days if pd.notna(row['Updated']) else 'Unknown' | |
} | |
inactive_data['top_inactive'].append(issue_data) | |
logger.info(f"Знайдено {len(inactive_issues)} неактивних тікетів (>{days} днів)") | |
return inactive_data | |
except Exception as e: | |
logger.error(f"Помилка при аналізі неактивних тікетів: {e}") | |
return {'error': str(e)} | |
def analyze_timeline(self): | |
""" | |
Аналіз часової шкали проекту (зміна стану тікетів з часом). | |
Returns: | |
pandas.DataFrame: Дані для візуалізації або None у випадку помилки | |
""" | |
try: | |
if 'Created' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Created']): | |
logger.warning("Колонка 'Created' відсутня або не містить дат") | |
return None | |
if 'Updated' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Updated']): | |
logger.warning("Колонка 'Updated' відсутня або не містить дат") | |
return None | |
# Визначення часового діапазону | |
min_date = self.df['Created'].min().date() | |
max_date = self.df['Updated'].max().date() | |
# Створення часового ряду для кожного дня | |
date_range = pd.date_range(start=min_date, end=max_date, freq='D') | |
# Збір статистики для кожної дати | |
timeline_data = [] | |
for date in date_range: | |
date_str = date.strftime('%Y-%m-%d') | |
# Тікети, створені до цієї дати | |
created_until = self.df[self.df['Created'].dt.date <= date.date()] | |
# Статуси тікетів на цю дату | |
status_counts = {} | |
# Для кожного тікета визначаємо його статус на цю дату | |
for _, row in created_until.iterrows(): | |
# Якщо тікет був оновлений після цієї дати, використовуємо його поточний статус | |
if row['Updated'].date() >= date.date(): | |
status = row.get('Status', 'Unknown') | |
status_counts[status] = status_counts.get(status, 0) + 1 | |
# Додаємо запис для цієї дати | |
timeline_data.append({ | |
'Date': date_str, | |
'Total': len(created_until), | |
**status_counts | |
}) | |
# Створення DataFrame | |
timeline_df = pd.DataFrame(timeline_data) | |
logger.info("Часова шкала успішно проаналізована") | |
return timeline_df | |
except Exception as e: | |
logger.error(f"Помилка при аналізі часової шкали: {e}") | |
return None | |
def analyze_lead_time(self): | |
""" | |
Аналіз часу виконання тікетів (Lead Time). | |
Returns: | |
dict: Статистика по часу виконання | |
""" | |
try: | |
if 'Created' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Created']): | |
logger.warning("Колонка 'Created' відсутня або не містить дат") | |
return {'error': "Неможливо аналізувати час виконання"} | |
if 'Resolved' not in self.df.columns: | |
logger.warning("Колонка 'Resolved' відсутня") | |
return {'error': "Неможливо аналізувати час виконання"} | |
# Конвертація колонки Resolved до datetime, якщо потрібно | |
if not pd.api.types.is_datetime64_dtype(self.df['Resolved']): | |
self.df['Resolved'] = pd.to_datetime(self.df['Resolved'], errors='coerce') | |
# Фільтрація завершених тікетів | |
completed_issues = self.df.dropna(subset=['Resolved']) | |
if len(completed_issues) == 0: | |
logger.warning("Немає завершених тікетів для аналізу") | |
return {'total_count': 0} | |
# Обчислення Lead Time (в днях) | |
completed_issues['Lead_Time_Days'] = (completed_issues['Resolved'] - completed_issues['Created']).dt.days | |
# Фільтрація некоректних значень | |
valid_lead_time = completed_issues[completed_issues['Lead_Time_Days'] >= 0] | |
lead_time_stats = { | |
'total_count': len(valid_lead_time), | |
'avg_lead_time': round(valid_lead_time['Lead_Time_Days'].mean(), 2), | |
'median_lead_time': round(valid_lead_time['Lead_Time_Days'].median(), 2), | |
'min_lead_time': valid_lead_time['Lead_Time_Days'].min(), | |
'max_lead_time': valid_lead_time['Lead_Time_Days'].max(), | |
'by_type': {}, | |
'by_priority': {} | |
} | |
# Розподіл за типами | |
if 'Issue Type' in valid_lead_time.columns: | |
lead_time_by_type = valid_lead_time.groupby('Issue Type')['Lead_Time_Days'].mean().round(2) | |
lead_time_stats['by_type'] = lead_time_by_type.to_dict() | |
# Розподіл за пріоритетами | |
if 'Priority' in valid_lead_time.columns: | |
lead_time_by_priority = valid_lead_time.groupby('Priority')['Lead_Time_Days'].mean().round(2) | |
lead_time_stats['by_priority'] = lead_time_by_priority.to_dict() | |
logger.info("Час виконання успішно проаналізований") | |
return lead_time_stats | |
except Exception as e: | |
logger.error(f"Помилка при аналізі часу виконання: {e}") | |
return {'error': str(e)} |