Spaces:
Runtime error
Runtime error
import os | |
import logging | |
import pandas as pd | |
import json | |
from datetime import datetime | |
from pathlib import Path | |
import importlib | |
import requests | |
logger = logging.getLogger(__name__) | |
class AppManager: | |
""" | |
Класс, який керує роботою додатку Jira AI Assistant | |
""" | |
def __init__(self): | |
""" | |
Ініціалізація менеджера додатку | |
""" | |
self.config = self._load_config() | |
self.setup_logging() | |
self.data = None | |
self.analyses = {} | |
self.reports = {} | |
# Створення директорій для даних, якщо вони не існують | |
self._create_directories() | |
def _load_config(self): | |
""" | |
Завантаження конфігурації додатку | |
Returns: | |
dict: Конфігурація додатку | |
""" | |
try: | |
# Спочатку спробуємо завантажити з файлу | |
config_path = Path("config.json") | |
if config_path.exists(): | |
with open(config_path, 'r', encoding='utf-8') as f: | |
config = json.load(f) | |
logger.info("Конфігурація завантажена з файлу") | |
return config | |
# Якщо файл не існує, використовуємо стандартну конфігурацію | |
config = { | |
"app_name": "Jira AI Assistant", | |
"version": "1.0.0", | |
"data_dir": "data", | |
"reports_dir": "reports", | |
"temp_dir": "temp", | |
"log_dir": "logs", | |
"log_level": "INFO", | |
"default_inactive_days": 14, | |
"openai_model": "gpt-3.5-turbo", | |
"gemini_model": "gemini-pro", | |
"max_results": 500 | |
} | |
# Зберігаємо стандартну конфігурацію у файл | |
with open(config_path, 'w', encoding='utf-8') as f: | |
json.dump(config, f, indent=2) | |
logger.info("Створено стандартну конфігурацію") | |
return config | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні конфігурації: {e}") | |
# Аварійна конфігурація | |
return { | |
"app_name": "Jira AI Assistant", | |
"version": "1.0.0", | |
"data_dir": "data", | |
"reports_dir": "reports", | |
"temp_dir": "temp", | |
"log_dir": "logs", | |
"log_level": "INFO", | |
"default_inactive_days": 14, | |
"openai_model": "gpt-3.5-turbo", | |
"gemini_model": "gemini-pro", | |
"max_results": 500 | |
} | |
def setup_logging(self): | |
""" | |
Налаштування логування | |
""" | |
try: | |
log_dir = Path(self.config.get("log_dir", "logs")) | |
log_dir.mkdir(exist_ok=True, parents=True) | |
log_file = log_dir / f"app_{datetime.now().strftime('%Y%m%d')}.log" | |
# Рівень логування з конфігурації | |
log_level_str = self.config.get("log_level", "INFO") | |
log_level = getattr(logging, log_level_str, logging.INFO) | |
# Налаштування логера | |
logging.basicConfig( | |
level=log_level, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(log_file), | |
logging.StreamHandler() | |
] | |
) | |
logger.info(f"Логування налаштовано. Рівень: {log_level_str}, файл: {log_file}") | |
except Exception as e: | |
print(f"Помилка при налаштуванні логування: {e}") | |
# Аварійне налаштування логування | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
def _create_directories(self): | |
""" | |
Створення необхідних директорій | |
""" | |
try: | |
directories = [ | |
self.config.get("data_dir", "data"), | |
self.config.get("reports_dir", "reports"), | |
self.config.get("temp_dir", "temp"), | |
self.config.get("log_dir", "logs") | |
] | |
for directory in directories: | |
Path(directory).mkdir(exist_ok=True, parents=True) | |
logger.info("Створено необхідні директорії") | |
except Exception as e: | |
logger.error(f"Помилка при створенні директорій: {e}") | |
def load_csv_data(self, file_path): | |
""" | |
Завантаження даних з CSV файлу | |
Args: | |
file_path (str): Шлях до CSV файлу | |
Returns: | |
pandas.DataFrame: Завантажені дані або None у випадку помилки | |
""" | |
try: | |
logger.info(f"Завантаження даних з CSV файлу: {file_path}") | |
# Імпортуємо необхідний модуль | |
from modules.data_import.csv_importer import JiraCsvImporter | |
# Створюємо імпортер та завантажуємо дані | |
importer = JiraCsvImporter(file_path) | |
self.data = importer.load_data() | |
if self.data is None: | |
logger.error("Не вдалося завантажити дані з CSV файлу") | |
return None | |
logger.info(f"Успішно завантажено {len(self.data)} записів") | |
# Зберігаємо копію даних | |
self._save_data_copy(file_path) | |
return self.data | |
except Exception as e: | |
logger.error(f"Помилка при завантаженні даних з CSV: {e}") | |
return None | |
def _save_data_copy(self, original_file_path): | |
""" | |
Збереження копії даних | |
Args: | |
original_file_path (str): Шлях до оригінального файлу | |
""" | |
try: | |
if self.data is None: | |
return | |
# Створюємо ім'я файлу на основі оригінального | |
file_name = os.path.basename(original_file_path) | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
new_file_name = f"{os.path.splitext(file_name)[0]}_{timestamp}.csv" | |
# Шлях для збереження | |
data_dir = Path(self.config.get("data_dir", "data")) | |
save_path = data_dir / new_file_name | |
# Зберігаємо дані | |
self.data.to_csv(save_path, index=False, encoding='utf-8') | |
logger.info(f"Збережено копію даних у {save_path}") | |
except Exception as e: | |
logger.error(f"Помилка при збереженні копії даних: {e}") | |
def connect_to_jira(self, jira_url, username, api_token): | |
""" | |
Підключення до Jira API | |
Args: | |
jira_url (str): URL Jira сервера | |
username (str): Ім'я користувача | |
api_token (str): API токен | |
Returns: | |
bool: True, якщо підключення успішне, False у іншому випадку | |
""" | |
try: | |
logger.info(f"Тестування підключення до Jira: {jira_url}") | |
# Спроба прямого HTTP запиту до сервера | |
response = requests.get( | |
f"{jira_url}/rest/api/2/serverInfo", | |
auth=(username, api_token), | |
timeout=10, | |
verify=True | |
) | |
if response.status_code == 200: | |
logger.info("Успішне підключення до Jira API") | |
# Зберігаємо дані про підключення | |
self.jira_connection = { | |
"url": jira_url, | |
"username": username, | |
"api_token": api_token | |
} | |
return True | |
else: | |
logger.error(f"Помилка підключення до Jira: {response.status_code}, {response.text}") | |
return False | |
except Exception as e: | |
logger.error(f"Помилка при підключенні до Jira: {e}") | |
return False | |
def get_jira_data(self, project_key, board_id=None, max_results=None): | |
""" | |
Отримання даних з Jira API | |
Args: | |
project_key (str): Ключ проекту | |
board_id (int): ID дошки (необов'язково) | |
max_results (int): Максимальна кількість результатів | |
Returns: | |
pandas.DataFrame: Отримані дані або None у випадку помилки | |
""" | |
try: | |
if not hasattr(self, 'jira_connection'): | |
logger.error("Немає з'єднання з Jira") | |
return None | |
logger.info(f"Отримання даних з Jira для проекту {project_key}") | |
# Імпортуємо необхідний модуль | |
from modules.data_import.jira_api import JiraConnector | |
# Параметри з'єднання | |
jira_url = self.jira_connection["url"] | |
username = self.jira_connection["username"] | |
api_token = self.jira_connection["api_token"] | |
# Створюємо коннектор | |
connector = JiraConnector(jira_url, username, api_token) | |
# Отримуємо дані | |
if board_id: | |
issues = connector.get_board_issues( | |
board_id, | |
project_key, | |
max_results=max_results or self.config.get("max_results", 500) | |
) | |
else: | |
issues = connector.get_project_issues( | |
project_key, | |
max_results=max_results or self.config.get("max_results", 500) | |
) | |
if not issues: | |
logger.error("Не вдалося отримати тікети з Jira") | |
return None | |
# Експортуємо у CSV та завантажуємо дані | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
temp_dir = Path(self.config.get("temp_dir", "temp")) | |
temp_csv_path = temp_dir / f"jira_export_{project_key}_{timestamp}.csv" | |
df = connector.export_issues_to_csv(issues, temp_csv_path) | |
self.data = df | |
logger.info(f"Успішно отримано {len(df)} тікетів з Jira") | |
return df | |
except Exception as e: | |
logger.error(f"Помилка при отриманні даних з Jira: {e}") | |
return None | |
def analyze_data(self, inactive_days=None): | |
""" | |
Аналіз завантажених даних | |
Args: | |
inactive_days (int): Кількість днів для визначення неактивних тікетів | |
Returns: | |
dict: Результати аналізу | |
""" | |
try: | |
if self.data is None: | |
logger.error("Немає даних для аналізу") | |
return None | |
logger.info("Аналіз даних...") | |
# Параметри аналізу | |
if inactive_days is None: | |
inactive_days = self.config.get("default_inactive_days", 14) | |
# Імпортуємо необхідний модуль | |
from modules.data_analysis.statistics import JiraDataAnalyzer | |
# Створюємо аналізатор та виконуємо аналіз | |
analyzer = JiraDataAnalyzer(self.data) | |
# Генеруємо базову статистику | |
stats = analyzer.generate_basic_statistics() | |
# Аналізуємо неактивні тікети | |
inactive_issues = analyzer.analyze_inactive_issues(days=inactive_days) | |
# Аналізуємо часову шкалу | |
timeline = analyzer.analyze_timeline() | |
# Аналізуємо час виконання | |
lead_time = analyzer.analyze_lead_time() | |
# Зберігаємо результати аналізу | |
analysis_result = { | |
"stats": stats, | |
"inactive_issues": inactive_issues, | |
"timeline": timeline.to_dict() if isinstance(timeline, pd.DataFrame) else None, | |
"lead_time": lead_time | |
} | |
# Зберігаємо в історії аналізів | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
self.analyses[timestamp] = analysis_result | |
logger.info("Аналіз даних успішно завершено") | |
return analysis_result | |
except Exception as e: | |
logger.error(f"Помилка при аналізі даних: {e}") | |
return None | |
def generate_visualizations(self, analysis_result=None): | |
""" | |
Генерація візуалізацій на основі аналізу | |
Args: | |
analysis_result (dict): Результати аналізу або None для використання останнього аналізу | |
Returns: | |
dict: Об'єкти Figure для різних візуалізацій | |
""" | |
try: | |
if self.data is None: | |
logger.error("Немає даних для візуалізації") | |
return None | |
# Якщо аналіз не вказано, використовуємо останній | |
if analysis_result is None: | |
if not self.analyses: | |
logger.error("Немає результатів аналізу для візуалізації") | |
return None | |
# Отримуємо останній аналіз | |
last_timestamp = max(self.analyses.keys()) | |
analysis_result = self.analyses[last_timestamp] | |
logger.info("Генерація візуалізацій...") | |
# Імпортуємо необхідний модуль | |
from modules.data_analysis.visualizations import JiraVisualizer | |
# Створюємо візуалізатор | |
visualizer = JiraVisualizer(self.data) | |
# Генеруємо візуалізації | |
visualizations = visualizer.plot_all() | |
logger.info("Візуалізації успішно згенеровано") | |
return visualizations | |
except Exception as e: | |
logger.error(f"Помилка при генерації візуалізацій: {e}") | |
return None | |
def analyze_with_ai(self, analysis_result=None, api_key=None, model_type="openai"): | |
""" | |
Аналіз даних за допомогою AI | |
Args: | |
analysis_result (dict): Результати аналізу або None для використання останнього аналізу | |
api_key (str): API ключ для LLM | |
model_type (str): Тип моделі ("openai" або "gemini") | |
Returns: | |
str: Результат AI аналізу | |
""" | |
try: | |
# Якщо аналіз не вказано, використовуємо останній | |
if analysis_result is None: | |
if not self.analyses: | |
logger.error("Немає результатів аналізу для AI") | |
return None | |
# Отримуємо останній аналіз | |
last_timestamp = max(self.analyses.keys()) | |
analysis_result = self.analyses[last_timestamp] | |
logger.info(f"Аналіз даних за допомогою AI ({model_type})...") | |
# Імпортуємо необхідний модуль | |
from modules.ai_analysis.llm_connector import LLMConnector | |
# Створюємо коннектор до LLM | |
llm = LLMConnector(api_key=api_key, model_type=model_type) | |
# Виконуємо аналіз | |
stats = analysis_result.get("stats", {}) | |
inactive_issues = analysis_result.get("inactive_issues", {}) | |
ai_analysis = llm.analyze_jira_data(stats, inactive_issues) | |
logger.info("AI аналіз успішно завершено") | |
return ai_analysis | |
except Exception as e: | |
logger.error(f"Помилка при AI аналізі: {e}") | |
return f"Помилка при виконанні AI аналізу: {str(e)}" | |
def generate_report(self, analysis_result=None, ai_analysis=None, format="markdown", include_visualizations=True): | |
""" | |
Генерація звіту на основі аналізу | |
Args: | |
analysis_result (dict): Результати аналізу або None для використання останнього аналізу | |
ai_analysis (str): Результат AI аналізу | |
format (str): Формат звіту ("markdown", "html", "pdf") | |
include_visualizations (bool): Чи включати візуалізації у звіт | |
Returns: | |
str: Текст звіту | |
""" | |
try: | |
if self.data is None: | |
logger.error("Немає даних для генерації звіту") | |
return None | |
# Якщо аналіз не вказано, використовуємо останній | |
if analysis_result is None: | |
if not self.analyses: | |
logger.error("Немає результатів аналізу для звіту") | |
return None | |
# Отримуємо останній аналіз | |
last_timestamp = max(self.analyses.keys()) | |
analysis_result = self.analyses[last_timestamp] | |
logger.info(f"Генерація звіту у форматі {format}...") | |
# Імпортуємо необхідний модуль | |
from modules.reporting.report_generator import ReportGenerator | |
# Отримуємо дані з аналізу | |
stats = analysis_result.get("stats", {}) | |
inactive_issues = analysis_result.get("inactive_issues", {}) | |
# Генеруємо візуалізації, якщо потрібно | |
visualization_data = None | |
if include_visualizations: | |
visualization_data = self.generate_visualizations(analysis_result) | |
# Створюємо генератор звітів | |
report_generator = ReportGenerator(self.data, stats, inactive_issues, ai_analysis) | |
# Генеруємо звіт у потрібному форматі | |
if format.lower() == "markdown": | |
report = report_generator.create_markdown_report() | |
elif format.lower() == "html": | |
report = report_generator.create_html_report(include_visualizations=include_visualizations, | |
visualization_data=visualization_data) | |
else: | |
# Для інших форматів спочатку генеруємо HTML | |
temp_html = report_generator.create_html_report(include_visualizations=include_visualizations, | |
visualization_data=visualization_data) | |
report = temp_html | |
# Зберігаємо звіт в історії | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
self.reports[timestamp] = { | |
"format": format, | |
"report": report | |
} | |
logger.info(f"Звіт успішно згенеровано у форматі {format}") | |
return report | |
except Exception as e: | |
logger.error(f"Помилка при генерації звіту: {e}") | |
return f"Помилка при генерації звіту: {str(e)}" | |
def save_report(self, report=None, filepath=None, format="markdown", include_visualizations=True): | |
""" | |
Збереження звіту у файл | |
Args: | |
report (str): Текст звіту або None для генерації нового | |
filepath (str): Шлях для збереження файлу | |
format (str): Формат звіту ("markdown", "html", "pdf") | |
include_visualizations (bool): Чи включати візуалізації у звіт | |
Returns: | |
str: Шлях до збереженого файлу | |
""" | |
try: | |
# Якщо звіт не вказано, генеруємо новий | |
if report is None: | |
report = self.generate_report(format=format, include_visualizations=include_visualizations) | |
if report is None: | |
logger.error("Не вдалося згенерувати звіт") | |
return None | |
# Якщо шлях не вказано, створюємо стандартний | |
if filepath is None: | |
reports_dir = Path(self.config.get("reports_dir", "reports")) | |
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') | |
if format.lower() == "markdown": | |
filepath = reports_dir / f"jira_report_{timestamp}.md" | |
elif format.lower() == "html": | |
filepath = reports_dir / f"jira_report_{timestamp}.html" | |
else: | |
filepath = reports_dir / f"jira_report_{timestamp}.pdf" | |
# Імпортуємо необхідний модуль | |
from modules.reporting.report_generator import ReportGenerator | |
# Створюємо генератор звітів (лише для використання методу save_report) | |
report_generator = ReportGenerator(self.data) | |
# Генеруємо візуалізації, якщо потрібно | |
visualization_data = None | |
if include_visualizations: | |
visualization_data = self.generate_visualizations() | |
# Зберігаємо звіт | |
saved_path = report_generator.save_report( | |
filepath=str(filepath), | |
format=format, | |
include_visualizations=include_visualizations, | |
visualization_data=visualization_data | |
) | |
if saved_path: | |
logger.info(f"Звіт успішно збережено у {saved_path}") | |
return saved_path | |
else: | |
logger.error("Не вдалося зберегти звіт") | |
return None | |
except Exception as e: | |
logger.error(f"Помилка при збереженні звіту: {e}") | |
return None | |
def send_to_slack(self, channel, message, report=None, api_token=None): | |
""" | |
Відправлення повідомлення в Slack | |
Args: | |
channel (str): Назва каналу (наприклад, '#general') | |
message (str): Текст повідомлення | |
report (str): URL або шлях до звіту (необов'язково) | |
api_token (str): Slack Bot Token | |
Returns: | |
bool: True, якщо повідомлення успішно відправлено, False у іншому випадку | |
""" | |
try: | |
logger.info(f"Відправлення повідомлення в Slack канал {channel}...") | |
# Отримуємо токен | |
token = api_token or os.getenv("SLACK_BOT_TOKEN") | |
if not token: | |
logger.error("Не вказано Slack Bot Token") | |
return False | |
# Формуємо дані для запиту | |
slack_message = { | |
"channel": channel, | |
"text": message | |
} | |
# Якщо є звіт, додаємо його як вкладення | |
if report and report.startswith(("http://", "https://")): | |
slack_message["attachments"] = [ | |
{ | |
"title": "Звіт аналізу Jira", | |
"title_link": report, | |
"text": "Завантажити звіт" | |
} | |
] | |
# Відправляємо запит до Slack API | |
headers = { | |
"Authorization": f"Bearer {token}", | |
"Content-Type": "application/json" | |
} | |
response = requests.post( | |
"https://slack.com/api/chat.postMessage", | |
headers=headers, | |
json=slack_message | |
) | |
if response.status_code == 200 and response.json().get("ok"): | |
logger.info("Повідомлення успішно відправлено в Slack") | |
return True | |
else: | |
logger.error(f"Помилка при відправленні повідомлення в Slack: {response.text}") | |
return False | |
except Exception as e: | |
logger.error(f"Помилка при відправленні повідомлення в Slack: {e}") | |
return False |