Spaces:
Sleeping
Sleeping
| import gradio as gr | |
| import pandas as pd | |
| from urllib.parse import urlparse | |
| import time | |
| import requests | |
| from io import BytesIO | |
| import json | |
| import seaborn as sns | |
| import matplotlib.pyplot as plt | |
| # Try to import Plotly, install if missing, fallback gracefully | |
| try: | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| PLOTLY_AVAILABLE = True | |
| print("✅ Plotly successfully imported") | |
| except ImportError: | |
| print("⚠️ Plotly not found, attempting to install...") | |
| try: | |
| import subprocess | |
| import sys | |
| subprocess.check_call([sys.executable, "-m", "pip", "install", "plotly", "--quiet"]) | |
| import plotly.express as px | |
| import plotly.graph_objects as go | |
| PLOTLY_AVAILABLE = True | |
| print("✅ Plotly installed and imported successfully") | |
| except Exception as e: | |
| print(f"❌ Failed to install Plotly: {e}") | |
| print("📊 Charts will be disabled, but app will work normally") | |
| PLOTLY_AVAILABLE = False | |
| # ===================== URL Validation ===================== | |
| def is_instagram_url(url: str) -> bool: | |
| """Validate if URL is a proper Instagram URL""" | |
| try: | |
| url = url.strip() | |
| if not url: | |
| return False | |
| # Add https if missing | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| parsed = urlparse(url) | |
| domain = parsed.netloc.lower() | |
| # Check if it's Instagram domain | |
| if 'instagram.com' not in domain: | |
| return False | |
| # Check if it has a valid path (not just homepage) | |
| if not parsed.path or parsed.path in ['/', '']: | |
| return False | |
| return True | |
| except Exception as e: | |
| print(f"URL validation error: {e}") | |
| return False | |
| def check_url_accessible(url: str, timeout: int = 5) -> bool: | |
| """Lenient check for public reachability of the URL (Instagram often blocks bots).""" | |
| try: | |
| if not url.startswith(('http://', 'https://')): | |
| url = 'https://' + url | |
| headers = { | |
| 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ' | |
| '(KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', | |
| 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8', | |
| 'Accept-Language': 'en-US,en;q=0.5', | |
| 'Connection': 'keep-alive', | |
| } | |
| response = requests.head(url, allow_redirects=True, timeout=timeout, headers=headers) | |
| print(f"URL check response: {response.status_code}") | |
| return response.status_code in (200, 301, 302, 403, 429) or response.status_code < 500 | |
| except requests.exceptions.Timeout: | |
| print("URL check timed out - assuming URL is valid") | |
| return True | |
| except requests.exceptions.ConnectionError: | |
| print("Connection error during URL check - assuming URL is valid") | |
| return True | |
| except Exception as e: | |
| print(f"URL accessibility check failed: {e}") | |
| return True | |
| def test_file_generation(): | |
| """Test function to generate sample files with fake comments.""" | |
| try: | |
| sample_data = pd.DataFrame({ | |
| 'Пользователь': ['user1', 'user2', 'user3'], | |
| 'Комментарий': ['Тест 1', 'Тест 2', 'Тест 3'], | |
| 'Дата': ['2025-09-20', '2025-09-20', '2025-09-20'], | |
| 'Тональность': ['позитивный', 'нейтральный', 'негативный'], | |
| 'Категория': ['вопрос', 'отзыв', 'жалоба'], | |
| 'Модерация': ['безопасно', 'безопасно', 'безопасно'], | |
| 'Автоответ': ['Ответ 1', 'Ответ 2', 'Ответ 3'] | |
| }) | |
| categories = sample_data['Категория'].value_counts() | |
| sentiments = sample_data['Тональность'].value_counts() | |
| files = create_report_files(sample_data, categories, sentiments) | |
| if files: | |
| return f"✅ Тестовые файлы созданы: {len(files)} файлов", files | |
| else: | |
| return "❌ Ошибка создания тестовых файлов", [] | |
| except Exception as e: | |
| return f"❌ Ошибка тестирования: {str(e)}", [] | |
| # ===================== Charts with Plotly ===================== | |
| def make_charts(categories: pd.Series, sentiments: pd.Series): | |
| """Create visualization charts with Seaborn""" | |
| try: | |
| # Category pie chart (Matplotlib only, since Seaborn doesn’t have native pie) | |
| fig_cat, ax_cat = plt.subplots() | |
| if not categories.empty: | |
| ax_cat.pie( | |
| categories.values, | |
| labels=categories.index, | |
| autopct='%1.1f%%', | |
| startangle=140, | |
| colors=sns.color_palette("Set3", len(categories)) | |
| ) | |
| ax_cat.set_title("Распределение по категориям", color="#E6007E") | |
| else: | |
| ax_cat.text(0.5, 0.5, "Нет данных для отображения", | |
| ha="center", va="center") | |
| # Sentiment bar chart | |
| fig_sent, ax_sent = plt.subplots() | |
| if not sentiments.empty: | |
| sns.barplot( | |
| x=sentiments.index, | |
| y=sentiments.values, | |
| palette={ | |
| 'позитивный': '#28a745', | |
| 'негативный': '#dc3545', | |
| 'нейтральный': '#6c757d', | |
| 'positive': '#28a745', | |
| 'negative': '#dc3545', | |
| 'neutral': '#6c757d' | |
| }, | |
| ax=ax_sent | |
| ) | |
| ax_sent.set_title("Распределение по тональности", color="#E6007E") | |
| ax_sent.set_ylabel("Количество") | |
| ax_sent.set_xlabel("Тональность") | |
| else: | |
| ax_sent.text(0.5, 0.5, "Нет данных для отображения", | |
| ha="center", va="center") | |
| return fig_cat, fig_sent | |
| except Exception as e: | |
| print(f"Chart creation error: {e}") | |
| return None, None | |
| # ===================== File Generation ===================== | |
| def create_report_files(df: pd.DataFrame, categories: pd.Series, sentiments: pd.Series): | |
| """Create CSV and Excel report files (robust & portable)""" | |
| import os | |
| import tempfile | |
| # Ensure non-empty frame for saving | |
| if df is None or df.empty: | |
| df = pd.DataFrame({"Сообщение": ["Нет данных для отображения"]}) | |
| # Create stable temp files that persist after write | |
| csv_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".csv") | |
| csv_tmp.close() | |
| xlsx_tmp = tempfile.NamedTemporaryFile(delete=False, suffix=".xlsx") | |
| xlsx_tmp.close() | |
| csv_path = csv_tmp.name | |
| xlsx_path = xlsx_tmp.name | |
| # Save CSV (UTF-8 with BOM so Excel opens Cyrillic cleanly) | |
| df.to_csv(csv_path, index=False, encoding="utf-8-sig") | |
| # Excel writer: try openpyxl, fall back to xlsxwriter, fall back to very simple save | |
| summary_rows = { | |
| 'Метрика': [ | |
| 'Всего комментариев', | |
| 'Уникальных пользователей', | |
| 'Уникальных категорий', | |
| 'Уникальных тональностей' | |
| ], | |
| 'Значение': [ | |
| int(len(df)), | |
| int(df["Пользователь"].nunique()) if "Пользователь" in df.columns else 0, | |
| int(len(categories)) if isinstance(categories, pd.Series) and not categories.empty else 0, | |
| int(len(sentiments)) if isinstance(sentiments, pd.Series) and not sentiments.empty else 0, | |
| ] | |
| } | |
| try: | |
| try: | |
| with pd.ExcelWriter(xlsx_path, engine="openpyxl") as writer: | |
| df.to_excel(writer, sheet_name="Комментарии", index=False) | |
| pd.DataFrame(summary_rows).to_excel(writer, sheet_name="Сводка", index=False) | |
| if isinstance(categories, pd.Series) and not categories.empty: | |
| cat_df = categories.reset_index() | |
| cat_df.columns = ['Категория', 'Количество'] | |
| cat_df.to_excel(writer, sheet_name="Категории", index=False) | |
| if isinstance(sentiments, pd.Series) and not sentiments.empty: | |
| sen_df = sentiments.reset_index() | |
| sen_df.columns = ['Тональность', 'Количество'] | |
| sen_df.to_excel(writer, sheet_name="Тональности", index=False) | |
| except Exception: | |
| with pd.ExcelWriter(xlsx_path, engine="xlsxwriter") as writer: | |
| df.to_excel(writer, sheet_name="Комментарии", index=False) | |
| pd.DataFrame(summary_rows).to_excel(writer, sheet_name="Сводка", index=False) | |
| except Exception: | |
| # As a very last resort, at least save the main sheet | |
| df.to_excel(xlsx_path, index=False) | |
| files_to_return = [p for p in (csv_path, xlsx_path) if os.path.exists(p) and os.path.getsize(p) > 0] | |
| return files_to_return | |
| # ===================== Main Processing Function ===================== | |
| def process_instagram_url(url: str): | |
| """Main function to process Instagram URL and return analysis results""" | |
| # Initialize empty dataframes for consistent return structure | |
| empty_df = pd.DataFrame(columns=["Пользователь", "Комментарий", "Дата", "Тональность", "Категория", "Модерация", "Автоответ"]) | |
| empty_moderation = pd.DataFrame(columns=["Пользователь", "Комментарий", "Модерация"]) | |
| empty_answers = pd.DataFrame(columns=["Пользователь", "Комментарий", "Автоответ"]) | |
| # Validate URL | |
| if not url or not url.strip(): | |
| return ( | |
| "❌ Пожалуйста, введите Instagram URL", | |
| empty_df, empty_moderation, empty_answers, | |
| "Введите валидную Instagram ссылку для начала анализа.", | |
| None, None, [] | |
| ) | |
| if not is_instagram_url(url): | |
| return ( | |
| "❌ Это не валидная Instagram ссылка. Пожалуйста, введите корректную ссылку.", | |
| empty_df, empty_moderation, empty_answers, | |
| "Неверный формат ссылки Instagram.", | |
| None, None, [] | |
| ) | |
| # Check URL accessibility (but don't fail if check fails) | |
| print(f"Checking URL accessibility: {url}") | |
| url_accessible = check_url_accessible(url) | |
| if not url_accessible: | |
| print(f"⚠️ URL accessibility check failed, but continuing anyway...") | |
| # Don't return error here - Instagram often blocks automated checks | |
| # but the API might still work | |
| # Send request to webhook | |
| try: | |
| webhook_url = "https://azamat-m.app.n8n.cloud/webhook/instagram" | |
| payload = {"urls": [url.strip()]} | |
| headers = { | |
| "Content-Type": "application/json", | |
| "User-Agent": "InstagramAnalyzer/1.0" | |
| } | |
| print(f"Sending request to webhook: {payload}") | |
| response = requests.post(webhook_url, json=payload, headers=headers, timeout=200) # Increased timeout to 200 seconds | |
| print(f"Webhook response status: {response.status_code}") | |
| print(f"Response headers: {dict(response.headers)}") | |
| # Log first 200 chars of response for debugging | |
| if hasattr(response, 'text'): | |
| print(f"Response preview: {response.text[:200]}...") | |
| except requests.exceptions.Timeout: | |
| return ( | |
| "❌ Превышено время ожидания ответа от сервера (200 сек). Попробуйте позже.", | |
| empty_df, empty_moderation, empty_answers, | |
| "Тайм-аут запроса к серверу.", | |
| None, None, [] | |
| ) | |
| except requests.exceptions.ConnectionError: | |
| return ( | |
| "❌ Ошибка подключения к серверу анализа. Проверьте интернет-соединение.", | |
| empty_df, empty_moderation, empty_answers, | |
| "Ошибка подключения к серверу.", | |
| None, None, [] | |
| ) | |
| except Exception as e: | |
| return ( | |
| f"❌ Ошибка при отправке запроса: {str(e)}", | |
| empty_df, empty_moderation, empty_answers, | |
| f"Ошибка запроса: {str(e)}", | |
| None, None, [] | |
| ) | |
| # Check response status | |
| if response.status_code != 200: | |
| return ( | |
| f"⚠️ Сервер вернул код ошибки {response.status_code}. Попробуйте позже.", | |
| empty_df, empty_moderation, empty_answers, | |
| f"Ошибка сервера: HTTP {response.status_code}", | |
| None, None, [] | |
| ) | |
| # Parse response | |
| try: | |
| data = response.json() | |
| print(f"Received data type: {type(data)}, length: {len(data) if isinstance(data, list) else 'N/A'}") | |
| except json.JSONDecodeError as e: | |
| return ( | |
| "❌ Сервер вернул некорректный ответ. Попробуйте позже.", | |
| empty_df, empty_moderation, empty_answers, | |
| f"Ошибка парсинга ответа: {str(e)}", | |
| None, None, [] | |
| ) | |
| # Validate data format | |
| if not isinstance(data, list) or len(data) == 0: | |
| return ( | |
| "✅ Запрос выполнен успешно, но комментарии не найдены.", | |
| empty_df, empty_moderation, empty_answers, | |
| "Комментарии не найдены. Возможно, пост не содержит комментариев или они скрыты.", | |
| None, None, [] | |
| ) | |
| # Process data | |
| try: | |
| processed_rows = [] | |
| for item in data: | |
| # Extract all available fields | |
| user = item.get("user", "") | |
| comment = item.get("comment", item.get("chatInput", "")) | |
| created_at = item.get("created_at", "") | |
| sentiment = item.get("sentiment", "neutral") | |
| category = item.get("category", "общее") | |
| harmful = item.get("harmful_content", "none") | |
| auto_answer = item.get("output", "") | |
| # Format creation date if available | |
| formatted_date = "" | |
| if created_at: | |
| try: | |
| from datetime import datetime | |
| dt = datetime.fromisoformat(created_at.replace('Z', '+00:00')) | |
| formatted_date = dt.strftime("%Y-%m-%d %H:%M") | |
| except: | |
| formatted_date = created_at | |
| # Translate sentiment values to Russian if needed | |
| sentiment_mapping = { | |
| "positive": "позитивный", | |
| "negative": "негативный", | |
| "neutral": "нейтральный" | |
| } | |
| sentiment_ru = sentiment_mapping.get(sentiment.lower(), sentiment) | |
| # Translate category values to Russian if needed | |
| category_mapping = { | |
| "question": "вопрос", | |
| "complaint": "жалоба", | |
| "review": "отзыв", | |
| "general": "общее" | |
| } | |
| category_ru = category_mapping.get(category.lower(), category) | |
| # Translate harmful_content values | |
| moderation_mapping = { | |
| "none": "безопасно", | |
| "toxic": "токсичный", | |
| "spam": "спам" | |
| } | |
| moderation_ru = moderation_mapping.get(harmful.lower(), harmful) | |
| processed_rows.append({ | |
| "Пользователь": user, | |
| "Комментарий": comment, | |
| "Дата": formatted_date, | |
| "Тональность": sentiment_ru, | |
| "Категория": category_ru, | |
| "Модерация": moderation_ru, | |
| "Автоответ": auto_answer | |
| }) | |
| # Create dataframes | |
| df_all = pd.DataFrame(processed_rows) | |
| df_moderation = df_all[["Пользователь", "Комментарий", "Модерация"]].copy() | |
| df_answers = df_all[["Пользователь", "Комментарий", "Автоответ"]].copy() | |
| # Calculate statistics | |
| total_comments = len(df_all) | |
| unique_users = df_all["Пользователь"].nunique() if "Пользователь" in df_all.columns else 0 | |
| categories = df_all["Категория"].value_counts() | |
| sentiments = df_all["Тональность"].value_counts() | |
| # Create statistics markdown | |
| stats_text = f""" | |
| **📊 Общая статистика:** | |
| - **Всего комментариев:** {total_comments} | |
| - **Уникальных пользователей:** {unique_users} | |
| **📂 По категориям:** | |
| {chr(10).join([f'- **{category}:** {count}' for category, count in categories.items()])} | |
| **💭 По тональности:** | |
| {chr(10).join([f'- **{sentiment}:** {count}' for sentiment, count in sentiments.items()])} | |
| """.strip() | |
| # Create charts | |
| fig_categories, fig_sentiments = make_charts(categories, sentiments) | |
| # Create report files | |
| print("Creating report files...") | |
| report_files = create_report_files(df_all, categories, sentiments) | |
| print(f"Report files created: {report_files}") | |
| success_message = f"✅ Успешно обработано {total_comments} комментариев от {unique_users} пользователей!" | |
| return ( | |
| success_message, | |
| df_all, | |
| df_moderation, | |
| df_answers, | |
| stats_text, | |
| fig_categories, | |
| fig_sentiments, | |
| report_files | |
| ) | |
| except Exception as e: | |
| print(f"Data processing error: {e}") | |
| return ( | |
| f"❌ Ошибка при обработке данных: {str(e)}", | |
| empty_df, empty_moderation, empty_answers, | |
| f"Ошибка обработки: {str(e)}", | |
| None, None, [] | |
| ) | |
| # ===================== Custom CSS ===================== | |
| custom_css = """ | |
| /* Altel brand colors and styling */ | |
| .gradio-container { | |
| font-family: 'Segoe UI', Tahoma, Geneva, Verdana, sans-serif; | |
| max-width: 1200px; | |
| margin: 0 auto; | |
| } | |
| /* Headers and titles */ | |
| .gradio-container h1, .gradio-container h2, .gradio-container h3 { | |
| color: #E6007E !important; | |
| font-weight: 600; | |
| } | |
| /* Primary buttons */ | |
| .gradio-container .primary { | |
| background: linear-gradient(135deg, #E6007E 0%, #C5006C 100%) !important; | |
| color: white !important; | |
| border: none !important; | |
| border-radius: 8px !important; | |
| font-weight: 600 !important; | |
| box-shadow: 0 2px 4px rgba(230, 0, 126, 0.3) !important; | |
| transition: all 0.3s ease !important; | |
| } | |
| .gradio-container .primary:hover { | |
| transform: translateY(-1px) !important; | |
| box-shadow: 0 4px 8px rgba(230, 0, 126, 0.4) !important; | |
| } | |
| /* Tab styling */ | |
| .gradio-container .tab-nav button { | |
| color: #E6007E !important; | |
| border-bottom: 2px solid transparent !important; | |
| } | |
| .gradio-container .tab-nav button.selected { | |
| color: #E6007E !important; | |
| border-bottom: 2px solid #E6007E !important; | |
| font-weight: 600 !important; | |
| } | |
| /* Table headers */ | |
| .gradio-container table thead th { | |
| background-color: #E6007E !important; | |
| color: white !important; | |
| font-weight: 600 !important; | |
| } | |
| /* Cards and blocks */ | |
| .gradio-container .block { | |
| border-radius: 12px !important; | |
| border: 1px solid #f0f0f0 !important; | |
| box-shadow: 0 1px 3px rgba(0,0,0,0.1) !important; | |
| } | |
| /* Status messages */ | |
| .gradio-container .textbox textarea[readonly] { | |
| background-color: #f8f9fa !important; | |
| border-left: 4px solid #E6007E !important; | |
| } | |
| """ | |
| # ===================== Gradio Interface ===================== | |
| def create_app(): | |
| """Create the Gradio application""" | |
| with gr.Blocks(css=custom_css, title="Instagram Comment Analyzer", theme=gr.themes.Soft()) as app: | |
| # Header | |
| gr.Markdown(""" | |
| # 📸 Instagram Comment Analyzer | |
| Анализ комментариев Instagram с помощью ИИ. Получите детальную аналитику тональности, | |
| категоризацию и модерацию контента. | |
| **Как использовать:** | |
| 1. Вставьте ссылку на публичный пост Instagram | |
| 2. Нажмите "Анализировать комментарии" | |
| 3. Просмотрите результаты в различных вкладках | |
| """) | |
| # Input section | |
| with gr.Row(): | |
| with gr.Column(scale=4): | |
| url_input = gr.Textbox( | |
| label="🔗 Instagram URL", | |
| placeholder="https://www.instagram.com/p/XXXXXXXXX/", | |
| info="Введите ссылку на пост, рилс или IGTV" | |
| ) | |
| with gr.Column(scale=1): | |
| analyze_btn = gr.Button( | |
| "🚀 Анализировать комментарии", | |
| variant="primary", | |
| size="lg" | |
| ) | |
| # Status output | |
| status_output = gr.Textbox( | |
| label="📋 Статус обработки", | |
| interactive=False, | |
| lines=2 | |
| ) | |
| # Results tabs | |
| with gr.Tabs(): | |
| with gr.Tab("💬 Все комментарии"): | |
| comments_df = gr.Dataframe( | |
| headers=["Пользователь", "Комментарий", "Дата", "Тональность", "Категория", "Модерация", "Автоответ"], | |
| interactive=False, | |
| wrap=True | |
| ) | |
| with gr.Tab("🛡️ Модерация"): | |
| moderation_df = gr.Dataframe( | |
| headers=["Пользователь", "Комментарий", "Модерация"], | |
| interactive=False, | |
| wrap=True | |
| ) | |
| with gr.Tab("🤖 Автоответы"): | |
| answers_df = gr.Dataframe( | |
| headers=["Пользователь", "Комментарий", "Автоответ"], | |
| interactive=False, | |
| wrap=True | |
| ) | |
| with gr.Tab("📊 Аналитика"): | |
| with gr.Row(): | |
| stats_markdown = gr.Markdown("Загрузите Instagram ссылку для просмотра статистики.") | |
| # Use Matplotlib components because make_charts returns matplotlib fig objects | |
| with gr.Row(): | |
| with gr.Column(): | |
| categories_chart = gr.Plot(label="Распределение по категориям") | |
| with gr.Column(): | |
| sentiments_chart = gr.Plot(label="Распределение по тональности") | |
| # with gr.Column(): | |
| # categories_chart = gr.Matplotlib(label="Распределение по категориям") | |
| # with gr.Column(): | |
| # sentiments_chart = gr.Matplotlib(label="Распределение по тональности") | |
| download_files = gr.File( | |
| label="📁 Скачать отчеты (CSV + Excel)", | |
| file_count="multiple", | |
| file_types=[".csv", ".xlsx"], | |
| interactive=False, | |
| visible=True | |
| ) | |
| # Example section | |
| gr.Markdown(""" | |
| ### 📝 Примеры ссылок: | |
| - `https://www.instagram.com/p/XXXXXXXXX/` - обычный пост | |
| - `https://www.instagram.com/reel/XXXXXXXXX/` - рилс | |
| - `https://www.instagram.com/tv/XXXXXXXXX/` - IGTV | |
| ⚠️ **Важно:** Ссылка должна вести на публичный контент | |
| ### 🔧 Отладка: | |
| Если файлы не скачиваются, проверьте логи в консоли Hugging Face Spaces. | |
| """) | |
| # Add test file generation button | |
| with gr.Row(): | |
| test_files_btn = gr.Button("🧪 Создать тестовые файлы", variant="secondary") | |
| test_status = gr.Textbox(label="Статус теста", interactive=False, visible=False) | |
| test_files_output = gr.File(label="Тестовые файлы", file_count="multiple", visible=False) | |
| # Connect the processing function | |
| analyze_btn.click( | |
| fn=process_instagram_url, | |
| inputs=[url_input], | |
| outputs=[ | |
| status_output, | |
| comments_df, | |
| moderation_df, | |
| answers_df, | |
| stats_markdown, | |
| categories_chart, | |
| sentiments_chart, | |
| download_files | |
| ] | |
| ) | |
| # Connect test function | |
| test_files_btn.click( | |
| fn=test_file_generation, | |
| inputs=[], | |
| outputs=[test_status, test_files_output] | |
| ).then( | |
| lambda: (gr.update(visible=True), gr.update(visible=True)), | |
| outputs=[test_status, test_files_output] | |
| ) | |
| return app | |
| # ===================== Launch Application ===================== | |
| if __name__ == "__main__": | |
| app = create_app() | |
| app.launch( | |
| share=False, | |
| server_name="0.0.0.0", | |
| server_port=7860 | |
| ) |