import pandas as pd import numpy as np from datetime import datetime, timedelta import logging logger = logging.getLogger(__name__) class JiraDataAnalyzer: """ Клас для аналізу даних Jira """ def __init__(self, df): """ Ініціалізація аналізатора даних. Args: df (pandas.DataFrame): DataFrame з даними Jira """ self.df = df def generate_basic_statistics(self): """ Генерація базової статистики по даним Jira. Returns: dict: Словник з базовою статистикою """ try: stats = { 'total_tickets': len(self.df), 'status_counts': {}, 'type_counts': {}, 'priority_counts': {}, 'assignee_counts': {}, 'created_stats': {}, 'updated_stats': {} } # Статистика за статусами if 'Status' in self.df.columns: status_counts = self.df['Status'].value_counts() stats['status_counts'] = status_counts.to_dict() # Статистика за типами if 'Issue Type' in self.df.columns: type_counts = self.df['Issue Type'].value_counts() stats['type_counts'] = type_counts.to_dict() # Статистика за пріоритетами if 'Priority' in self.df.columns: priority_counts = self.df['Priority'].value_counts() stats['priority_counts'] = priority_counts.to_dict() # Статистика за призначеними користувачами if 'Assignee' in self.df.columns: assignee_counts = self.df['Assignee'].value_counts().head(10) # Топ 10 stats['assignee_counts'] = assignee_counts.to_dict() # Статистика за часом створення if 'Created' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Created']): created_min = self.df['Created'].min() created_max = self.df['Created'].max() # Групування за місяцями if 'Created_Month' in self.df.columns: created_by_month = self.df['Created_Month'].value_counts().sort_index() stats['created_by_month'] = {str(k): v for k, v in created_by_month.items()} stats['created_stats'] = { 'min': created_min.strftime('%Y-%m-%d') if created_min else None, 'max': created_max.strftime('%Y-%m-%d') if created_max else None, 'last_7_days': len(self.df[self.df['Created'] > (datetime.now() - timedelta(days=7))]) } # Статистика за часом оновлення if 'Updated' in self.df.columns and pd.api.types.is_datetime64_dtype(self.df['Updated']): updated_min = self.df['Updated'].min() updated_max = self.df['Updated'].max() stats['updated_stats'] = { 'min': updated_min.strftime('%Y-%m-%d') if updated_min else None, 'max': updated_max.strftime('%Y-%m-%d') if updated_max else None, 'last_7_days': len(self.df[self.df['Updated'] > (datetime.now() - timedelta(days=7))]) } logger.info("Базова статистика успішно згенерована") return stats except Exception as e: logger.error(f"Помилка при генерації базової статистики: {e}") return {'error': str(e)} def analyze_inactive_issues(self, days=14): """ Аналіз неактивних тікетів (не оновлювались протягом певної кількості днів). Args: days (int): Кількість днів неактивності Returns: dict: Інформація про неактивні тікети """ try: if 'Updated' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Updated']): logger.warning("Колонка 'Updated' відсутня або не містить дат") return {'error': "Неможливо аналізувати неактивні тікети"} # Визначення неактивних тікетів cutoff_date = datetime.now() - timedelta(days=days) inactive_issues = self.df[self.df['Updated'] < cutoff_date] inactive_data = { 'total_count': len(inactive_issues), 'percentage': round(len(inactive_issues) / len(self.df) * 100, 2) if len(self.df) > 0 else 0, 'by_status': {}, 'by_priority': {}, 'top_inactive': [] } # Розподіл за статусами if 'Status' in inactive_issues.columns: inactive_data['by_status'] = inactive_issues['Status'].value_counts().to_dict() # Розподіл за пріоритетами if 'Priority' in inactive_issues.columns: inactive_data['by_priority'] = inactive_issues['Priority'].value_counts().to_dict() # Топ 5 неактивних тікетів if len(inactive_issues) > 0: top_inactive = inactive_issues.sort_values('Updated', ascending=True).head(5) for _, row in top_inactive.iterrows(): issue_data = { 'key': row.get('Issue key', 'Unknown'), 'summary': row.get('Summary', 'Unknown'), 'status': row.get('Status', 'Unknown'), 'last_updated': row['Updated'].strftime('%Y-%m-%d') if pd.notna(row['Updated']) else 'Unknown', 'days_inactive': (datetime.now() - row['Updated']).days if pd.notna(row['Updated']) else 'Unknown' } inactive_data['top_inactive'].append(issue_data) logger.info(f"Знайдено {len(inactive_issues)} неактивних тікетів (>{days} днів)") return inactive_data except Exception as e: logger.error(f"Помилка при аналізі неактивних тікетів: {e}") return {'error': str(e)} def analyze_timeline(self): """ Аналіз часової шкали проекту (зміна стану тікетів з часом). Returns: pandas.DataFrame: Дані для візуалізації або None у випадку помилки """ try: if 'Created' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Created']): logger.warning("Колонка 'Created' відсутня або не містить дат") return None if 'Updated' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Updated']): logger.warning("Колонка 'Updated' відсутня або не містить дат") return None # Визначення часового діапазону min_date = self.df['Created'].min().date() max_date = self.df['Updated'].max().date() # Створення часового ряду для кожного дня date_range = pd.date_range(start=min_date, end=max_date, freq='D') # Збір статистики для кожної дати timeline_data = [] for date in date_range: date_str = date.strftime('%Y-%m-%d') # Тікети, створені до цієї дати created_until = self.df[self.df['Created'].dt.date <= date.date()] # Статуси тікетів на цю дату status_counts = {} # Для кожного тікета визначаємо його статус на цю дату for _, row in created_until.iterrows(): # Якщо тікет був оновлений після цієї дати, використовуємо його поточний статус if row['Updated'].date() >= date.date(): status = row.get('Status', 'Unknown') status_counts[status] = status_counts.get(status, 0) + 1 # Додаємо запис для цієї дати timeline_data.append({ 'Date': date_str, 'Total': len(created_until), **status_counts }) # Створення DataFrame timeline_df = pd.DataFrame(timeline_data) logger.info("Часова шкала успішно проаналізована") return timeline_df except Exception as e: logger.error(f"Помилка при аналізі часової шкали: {e}") return None def analyze_lead_time(self): """ Аналіз часу виконання тікетів (Lead Time). Returns: dict: Статистика по часу виконання """ try: if 'Created' not in self.df.columns or not pd.api.types.is_datetime64_dtype(self.df['Created']): logger.warning("Колонка 'Created' відсутня або не містить дат") return {'error': "Неможливо аналізувати час виконання"} if 'Resolved' not in self.df.columns: logger.warning("Колонка 'Resolved' відсутня") return {'error': "Неможливо аналізувати час виконання"} # Конвертація колонки Resolved до datetime, якщо потрібно if not pd.api.types.is_datetime64_dtype(self.df['Resolved']): self.df['Resolved'] = pd.to_datetime(self.df['Resolved'], errors='coerce') # Фільтрація завершених тікетів completed_issues = self.df.dropna(subset=['Resolved']) if len(completed_issues) == 0: logger.warning("Немає завершених тікетів для аналізу") return {'total_count': 0} # Обчислення Lead Time (в днях) completed_issues['Lead_Time_Days'] = (completed_issues['Resolved'] - completed_issues['Created']).dt.days # Фільтрація некоректних значень valid_lead_time = completed_issues[completed_issues['Lead_Time_Days'] >= 0] lead_time_stats = { 'total_count': len(valid_lead_time), 'avg_lead_time': round(valid_lead_time['Lead_Time_Days'].mean(), 2), 'median_lead_time': round(valid_lead_time['Lead_Time_Days'].median(), 2), 'min_lead_time': valid_lead_time['Lead_Time_Days'].min(), 'max_lead_time': valid_lead_time['Lead_Time_Days'].max(), 'by_type': {}, 'by_priority': {} } # Розподіл за типами if 'Issue Type' in valid_lead_time.columns: lead_time_by_type = valid_lead_time.groupby('Issue Type')['Lead_Time_Days'].mean().round(2) lead_time_stats['by_type'] = lead_time_by_type.to_dict() # Розподіл за пріоритетами if 'Priority' in valid_lead_time.columns: lead_time_by_priority = valid_lead_time.groupby('Priority')['Lead_Time_Days'].mean().round(2) lead_time_stats['by_priority'] = lead_time_by_priority.to_dict() logger.info("Час виконання успішно проаналізований") return lead_time_stats except Exception as e: logger.error(f"Помилка при аналізі часу виконання: {e}") return {'error': str(e)}