import gradio as gr import spaces import pandas as pd import torch from transformers import pipeline, AutoModelForSeq2SeqLM, AutoTokenizer import plotly.graph_objects as go import logging import io from rapidfuzz import fuzz import time import os groq_key = os.environ['groq_key'] from langchain_openai import ChatOpenAI from langchain.prompts import PromptTemplate from openpyxl import load_workbook from openpyxl.utils.dataframe import dataframe_to_rows def fuzzy_deduplicate(df, column, threshold=55): """Deduplicate rows based on fuzzy matching of text content""" seen_texts = [] indices_to_keep = [] for i, text in enumerate(df[column]): if pd.isna(text): indices_to_keep.append(i) continue text = str(text) if not seen_texts or all(fuzz.ratio(text, seen) < threshold for seen in seen_texts): seen_texts.append(text) indices_to_keep.append(i) return df.iloc[indices_to_keep] logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ProcessControl: def __init__(self): self.stop_requested = False def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False class ProcessControl: def __init__(self): self.stop_requested = False self.error = None def request_stop(self): self.stop_requested = True def should_stop(self): return self.stop_requested def reset(self): self.stop_requested = False self.error = None def set_error(self, error): self.error = error self.stop_requested = True class EventDetector: def __init__(self): try: # Initialize models device = "cuda" if torch.cuda.is_available() else "cpu" logger.info(f"Initializing models on device: {device}") # Initialize all models self.initialize_models(device) # Move initialization to separate method self.device = device self.initialized = True logger.info("All models initialized successfully") except Exception as e: logger.error(f"Error in EventDetector initialization: {str(e)}") raise @spaces.GPU(duration=30) def initialize_models(self, device): """Initialize all models with GPU support""" # Initialize translation model self.translator = pipeline( "translation", model="Helsinki-NLP/opus-mt-ru-en", device=device ) self.rutranslator = pipeline( "translation", model="Helsinki-NLP/opus-mt-en-ru", device=device ) # Initialize sentiment models self.finbert = pipeline( "sentiment-analysis", model="ProsusAI/finbert", device=device, truncation=True, max_length=512 ) self.roberta = pipeline( "sentiment-analysis", model="cardiffnlp/twitter-roberta-base-sentiment", device=device, truncation=True, max_length=512 ) self.finbert_tone = pipeline( "sentiment-analysis", model="yiyanghkust/finbert-tone", device=device, truncation=True, max_length=512 ) # Initialize MT5 model self.model_name = "google/mt5-small" self.tokenizer = AutoTokenizer.from_pretrained( self.model_name, legacy=True ) self.model = AutoModelForSeq2SeqLM.from_pretrained(self.model_name).to(device) # Initialize Groq if 'groq_key': self.groq = ChatOpenAI( base_url="https://api.groq.com/openai/v1", model="llama-3.1-70b-versatile", openai_api_key=groq_key, temperature=0.0 ) else: logger.warning("Groq API key not found, impact estimation will be limited") self.groq = None @spaces.GPU(duration=20) def _translate_text(self, text): """Translate Russian text to English""" try: if not text or not isinstance(text, str): return "" text = text.strip() if not text: return "" # Split into manageable chunks max_length = 450 chunks = [text[i:i + max_length] for i in range(0, len(text), max_length)] translated_chunks = [] for chunk in chunks: result = self.translator(chunk)[0]['translation_text'] translated_chunks.append(result) time.sleep(0.1) # Rate limiting return " ".join(translated_chunks) except Exception as e: logger.error(f"Translation error: {str(e)}") return text @spaces.GPU(duration=20) def analyze_sentiment(self, text): """Analyze sentiment of text (should be in English)""" try: if not text or not isinstance(text, str): return "Neutral" text = text.strip() if not text: return "Neutral" # Get predictions from all models finbert_result = self.finbert(text)[0] roberta_result = self.roberta(text)[0] finbert_tone_result = self.finbert_tone(text)[0] # Map labels to standard format def map_sentiment(result): label = result['label'].lower() if label in ['positive', 'pos', 'positive tone']: return "Positive" elif label in ['negative', 'neg', 'negative tone']: return "Negative" return "Neutral" # Get mapped sentiments sentiments = [ map_sentiment(finbert_result), map_sentiment(roberta_result), map_sentiment(finbert_tone_result) ] # Use majority voting sentiment_counts = pd.Series(sentiments).value_counts() if sentiment_counts.iloc[0] >= 2: return sentiment_counts.index[0] return "Neutral" except Exception as e: logger.error(f"Sentiment analysis error: {str(e)}") return "Neutral" def estimate_impact(self, text, entity): """Estimate impact using Groq for negative sentiment texts""" try: if not self.groq: return "Неопределенный эффект", "Groq API недоступен" template = """ You are a financial analyst. Analyze this news about {entity} and assess its potential impact. News: {news} Classify the impact into one of these categories: 1. "Значительный риск убытков" (Significant loss risk) 2. "Умеренный риск убытков" (Moderate loss risk) 3. "Незначительный риск убытков" (Minor loss risk) 4. "Вероятность прибыли" (Potential profit) 5. "Неопределенный эффект" (Uncertain effect) Format your response exactly as: Impact: [category] Reasoning: [explanation in 2-3 sentences] """ prompt = PromptTemplate(template=template, input_variables=["entity", "news"]) chain = prompt | self.groq response = chain.invoke({ "entity": entity, "news": text }) # Parse response response_text = response.content if hasattr(response, 'content') else str(response) if "Impact:" in response_text and "Reasoning:" in response_text: parts = response_text.split("Reasoning:") impact = parts[0].split("Impact:")[1].strip() reasoning = parts[1].strip() else: impact = "Неопределенный эффект" reasoning = "Не удалось определить влияние" return impact, reasoning except Exception as e: logger.error(f"Impact estimation error: {str(e)}") return "Неопределенный эффект", f"Ошибка анализа: {str(e)}" @spaces.GPU(duration=60) def process_text(self, text, entity): """Process text through translation, sentiment, and impact analysis""" try: # Translate text translated_text = self._translate_text(text) # Analyze sentiment sentiment = self.analyze_sentiment(translated_text) # Initialize impact and reasoning impact = "Неопределенный эффект" reasoning = "" # If sentiment is negative, estimate impact and produce impact reasoning translated back into russian if sentiment == "Negative": impact, reasoning = self.estimate_impact(translated_text, entity) reasoning = self.rutranslator(reasoning)[0]['translation_text'] # Detect events event_type, event_summary = self.detect_events(text, entity) #translate Reasoning back return { 'translated_text': translated_text, 'sentiment': sentiment, 'impact': impact, 'reasoning': reasoning, 'event_type': event_type, 'event_summary': event_summary } except Exception as e: logger.error(f"Text processing error: {str(e)}") return { 'translated_text': '', 'sentiment': 'Neutral', 'impact': 'Неопределенный эффект', 'reasoning': f'Ошибка обработки: {str(e)}', 'event_type': 'Нет', 'event_summary': '' } @spaces.GPU(duration=20) def detect_events(self, text, entity): """Rest of the detect_events method remains the same""" if not text or not entity: return "Нет", "Invalid input" try: text = str(text).strip() entity = str(entity).strip() if not text or not entity: return "Нет", "Empty input" # First check for keyword matches text_lower = text.lower() keywords = { 'Отчетность': ['отчет', 'выручка', 'прибыль', 'ebitda', 'финансов', 'результат', 'показател'], 'РЦБ': ['облигаци', 'купон', 'дефолт', 'реструктуризац', 'ценные бумаги', 'долг'], 'Суд': ['суд', 'иск', 'арбитраж', 'разбирательств', 'банкрот'] } # Check keywords first detected_event = None for event_type, terms in keywords.items(): if any(term in text_lower for term in terms): detected_event = event_type break if detected_event: # Prepare prompt for summary prompt = f"""Summarize this {detected_event} news about {entity}: Text: {text} Create a brief, factual summary focusing on the main points. Format: Summary: [2-3 sentence summary]""" # Generate summary inputs = self.tokenizer( prompt, return_tensors="pt", padding=True, truncation=True, max_length=512 ).to(self.device) outputs = self.model.generate( **inputs, max_length=200, num_return_sequences=1, do_sample=False, pad_token_id=self.tokenizer.pad_token_id, eos_token_id=self.tokenizer.eos_token_id, no_repeat_ngram_size=3 ) response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) # Extract summary if "Summary:" in response: summary = response.split("Summary:")[1].strip() summary = summary.replace('', '').replace('', '').strip() else: if detected_event == 'Отчетность': summary = f"Компания {entity} опубликовала финансовые показатели." elif detected_event == 'РЦБ': summary = f"Обнаружена информация о ценных бумагах компании {entity}." elif detected_event == 'Суд': summary = f"Компания {entity} участвует в судебном разбирательстве." return detected_event, summary return "Нет", "No significant event detected" except Exception as e: logger.error(f"Event detection error: {str(e)}") return "Нет", f"Error in event detection: {str(e)}" def cleanup(self): """Clean up GPU resources""" try: self.model = None self.translator = None self.finbert = None self.roberta = None self.finbert_tone = None torch.cuda.empty_cache() self.initialized = False logger.info("Cleaned up GPU resources") except Exception as e: logger.error(f"Error in cleanup: {str(e)}") def create_visualizations(df): if df is None or df.empty: return None, None try: sentiments = df['Sentiment'].value_counts() fig_sentiment = go.Figure(data=[go.Pie( labels=sentiments.index, values=sentiments.values, marker_colors=['#FF6B6B', '#4ECDC4', '#95A5A6'] )]) fig_sentiment.update_layout(title="Распределение тональности") events = df['Event_Type'].value_counts() fig_events = go.Figure(data=[go.Bar( x=events.index, y=events.values, marker_color='#2196F3' )]) fig_events.update_layout(title="Распределение событий") return fig_sentiment, fig_events except Exception as e: logger.error(f"Visualization error: {e}") return None, None @spaces.GPU def process_file(file_obj): try: logger.info("Starting to read Excel file...") df = pd.read_excel(file_obj, sheet_name='Публикации') logger.info(f"Successfully read Excel file. Shape: {df.shape}") # Deduplication original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) logger.info(f"Removed {original_count - len(df)} duplicate entries") detector = EventDetector() processed_rows = [] total = len(df) # Process in smaller batches with quota management BATCH_SIZE = 3 # Reduced batch size QUOTA_WAIT_TIME = 60 # Wait time when quota is exceeded for batch_start in range(0, total, BATCH_SIZE): try: batch_end = min(batch_start + BATCH_SIZE, total) batch = df.iloc[batch_start:batch_end] # Initialize models for batch if not detector.initialized: detector.initialize_models() time.sleep(1) # Wait after initialization for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')) if not text.strip(): continue entity = str(row.get('Объект', '')) if not entity.strip(): continue # Process with GPU quota management event_type = "Нет" event_summary = "" sentiment = "Neutral" try: event_type, event_summary = detector.detect_events(text, entity) time.sleep(1) # Wait between GPU operations sentiment = detector.analyze_sentiment(text) except Exception as e: if "GPU quota" in str(e): logger.warning("GPU quota exceeded, waiting...") time.sleep(QUOTA_WAIT_TIME) continue else: raise e processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Sentiment': sentiment, 'Event_Type': event_type, 'Event_Summary': event_summary, 'Текст': text[:1000] }) logger.info(f"Processed {idx + 1}/{total} rows") except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue # Create intermediate results if processed_rows: intermediate_df = pd.DataFrame(processed_rows) yield ( intermediate_df, None, None, f"Обработано {len(processed_rows)}/{total} строк" ) # Wait between batches time.sleep(2) # Cleanup GPU resources after each batch torch.cuda.empty_cache() except Exception as e: logger.error(f"Batch processing error: {str(e)}") if "GPU quota" in str(e): time.sleep(QUOTA_WAIT_TIME) continue # Final results if processed_rows: result_df = pd.DataFrame(processed_rows) fig_sentiment, fig_events = create_visualizations(result_df) return result_df, fig_sentiment, fig_events, "Обработка завершена!" else: return None, None, None, "Нет обработанных данных" except Exception as e: logger.error(f"File processing error: {str(e)}") raise def create_output_file(df, uploaded_file): """Create Excel file with multiple sheets from processed DataFrame""" try: wb = load_workbook("sample_file.xlsx") # 1. Update 'Публикации' sheet ws = wb['Публикации'] for r_idx, row in enumerate(dataframe_to_rows(df, index=False, header=True), start=1): for c_idx, value in enumerate(row, start=1): ws.cell(row=r_idx, column=c_idx, value=value) # 2. Update 'Мониторинг' sheet with events ws = wb['Мониторинг'] row_idx = 4 events_df = df[df['Event_Type'] != 'Нет'].copy() for _, row in events_df.iterrows(): ws.cell(row=row_idx, column=5, value=row['Объект']) ws.cell(row=row_idx, column=6, value=row['Заголовок']) ws.cell(row=row_idx, column=7, value=row['Event_Type']) ws.cell(row=row_idx, column=8, value=row['Event_Summary']) ws.cell(row=row_idx, column=9, value=row['Выдержки из текста']) row_idx += 1 # 3. Update 'Сводка' sheet ws = wb['Сводка'] unique_entities = df['Объект'].unique() entity_stats = [] for entity in unique_entities: entity_df = df[df['Объект'] == entity] stats = { 'Объект': entity, 'Всего': len(entity_df), 'Негативные': len(entity_df[entity_df['Sentiment'] == 'Negative']), 'Позитивные': len(entity_df[entity_df['Sentiment'] == 'Positive']) } # Get most severe impact for entity negative_df = entity_df[entity_df['Sentiment'] == 'Negative'] if len(negative_df) > 0: impacts = negative_df['Impact'].dropna() if len(impacts) > 0: stats['Impact'] = impacts.iloc[0] else: stats['Impact'] = 'Неопределенный эффект' else: stats['Impact'] = 'Неопределенный эффект' entity_stats.append(stats) # Sort by number of negative mentions entity_stats = sorted(entity_stats, key=lambda x: x['Негативные'], reverse=True) # Write to sheet row_idx = 4 # Starting row in Сводка sheet for stats in entity_stats: ws.cell(row=row_idx, column=5, value=stats['Объект']) ws.cell(row=row_idx, column=6, value=stats['Всего']) ws.cell(row=row_idx, column=7, value=stats['Негативные']) ws.cell(row=row_idx, column=8, value=stats['Позитивные']) ws.cell(row=row_idx, column=9, value=stats['Impact']) row_idx += 1 # 4. Update 'Значимые' sheet ws = wb['Значимые'] row_idx = 3 sentiment_df = df[df['Sentiment'].isin(['Negative', 'Positive'])].copy() for _, row in sentiment_df.iterrows(): ws.cell(row=row_idx, column=3, value=row['Объект']) ws.cell(row=row_idx, column=4, value='релевантно') ws.cell(row=row_idx, column=5, value=row['Sentiment']) ws.cell(row=row_idx, column=6, value=row.get('Impact', '-')) ws.cell(row=row_idx, column=7, value=row['Заголовок']) ws.cell(row=row_idx, column=8, value=row['Выдержки из текста']) row_idx += 1 # 5. Update 'Анализ' sheet ws = wb['Анализ'] row_idx = 4 negative_df = df[df['Sentiment'] == 'Negative'].copy() for _, row in negative_df.iterrows(): ws.cell(row=row_idx, column=5, value=row['Объект']) ws.cell(row=row_idx, column=6, value=row['Заголовок']) ws.cell(row=row_idx, column=7, value="Риск убытка") ws.cell(row=row_idx, column=8, value=row.get('Reasoning', '-')) ws.cell(row=row_idx, column=9, value=row['Выдержки из текста']) row_idx += 1 # 6. Update 'Тех.приложение' sheet if 'Тех.приложение' not in wb.sheetnames: wb.create_sheet('Тех.приложение') ws = wb['Тех.приложение'] tech_cols = ['Объект', 'Заголовок', 'Выдержки из текста', 'Translated', 'Sentiment', 'Impact', 'Reasoning'] tech_df = df[tech_cols].copy() for r_idx, row in enumerate(dataframe_to_rows(tech_df, index=False, header=True), start=1): for c_idx, value in enumerate(row, start=1): ws.cell(row=r_idx, column=c_idx, value=value) # Save workbook output = io.BytesIO() wb.save(output) output.seek(0) return output except Exception as e: logger.error(f"Error creating output file: {str(e)}") logger.error(f"DataFrame shape: {df.shape}") logger.error(f"Available columns: {df.columns.tolist()}") return None def create_interface(): control = ProcessControl() with gr.Blocks(theme=gr.themes.Soft()) as app: # Create state for file data current_file = gr.State(None) gr.Markdown("# AI-анализ мониторинга новостей v.1.51") with gr.Row(): file_input = gr.File( label="Загрузите Excel файл", file_types=[".xlsx"], type="binary" ) with gr.Row(): with gr.Column(scale=1): analyze_btn = gr.Button( "▶️ Начать анализ", variant="primary", size="lg" ) with gr.Column(scale=1): stop_btn = gr.Button( "⏹️ Остановить", variant="stop", size="lg" ) with gr.Row(): status_box = gr.Textbox( label="Статус дедупликации", interactive=False, value="" ) with gr.Row(): progress = gr.Textbox( label="Статус обработки", interactive=False, value="Ожидание файла..." ) with gr.Row(): stats = gr.DataFrame( label="Результаты анализа", interactive=False, wrap=True ) with gr.Row(): with gr.Column(scale=1): sentiment_plot = gr.Plot(label="Распределение тональности") with gr.Column(scale=1): events_plot = gr.Plot(label="Распределение событий") # Create a download row with file component only with gr.Row(): file_output = gr.File( label="Скачать результаты", visible=True, interactive=True ) def stop_processing(): control.request_stop() return "Остановка обработки..." @spaces.GPU(duration=300) def process_and_download(file_bytes): if file_bytes is None: gr.Warning("Пожалуйста, загрузите файл") return ( pd.DataFrame(), None, None, None, "Ожидание файла...", "" ) try: file_obj = io.BytesIO(file_bytes) logger.info("File loaded into BytesIO successfully") detector = EventDetector() # Read and deduplicate data df = pd.read_excel(file_obj, sheet_name='Публикации') original_count = len(df) df = fuzzy_deduplicate(df, 'Выдержки из текста', threshold=55) removed_count = original_count - len(df) dedup_message = f"Удалено {removed_count} дубликатов из {original_count} записей" logger.info(f"Removed {removed_count} duplicate entries") processed_rows = [] total = len(df) batch_size = 3 for batch_start in range(0, total, batch_size): if control.should_stop(): if processed_rows: result_df = pd.DataFrame(processed_rows) output_bytes_io = create_output_file(result_df, file_obj) if output_bytes_io: fig_sentiment, fig_events = create_visualizations(result_df) # Create temporary file temp_file = "partial_results.xlsx" with open(temp_file, "wb") as f: f.write(output_bytes_io.getvalue()) return ( result_df, fig_sentiment, fig_events, temp_file, # Return path to temporary file f"Обработка остановлена. Обработано {len(processed_rows)}/{total} строк", dedup_message ) break batch_end = min(batch_start + batch_size, total) batch = df.iloc[batch_start:batch_end] for idx, row in batch.iterrows(): try: text = str(row.get('Выдержки из текста', '')).strip() entity = str(row.get('Объект', '')).strip() if not text or not entity: continue # Process with GPU results = detector.process_text(text, entity) processed_rows.append({ 'Объект': entity, 'Заголовок': str(row.get('Заголовок', '')), 'Translated': results['translated_text'], 'Sentiment': results['sentiment'], 'Impact': results['impact'], 'Reasoning': results['reasoning'], 'Event_Type': results['event_type'], 'Event_Summary': results['event_summary'], 'Выдержки из текста': text }) except Exception as e: logger.error(f"Error processing row {idx}: {str(e)}") continue if processed_rows: result_df = pd.DataFrame(processed_rows) output_bytes_io = create_output_file(result_df, file_obj) fig_sentiment, fig_events = create_visualizations(result_df) if output_bytes_io: # Create temporary file temp_file = "results.xlsx" with open(temp_file, "wb") as f: f.write(output_bytes_io.getvalue()) return ( result_df, fig_sentiment, fig_events, temp_file, # Return path to temporary file "Обработка завершена!", dedup_message ) return ( pd.DataFrame(), None, None, None, "Нет обработанных данных", dedup_message ) except Exception as e: error_msg = f"Ошибка анализа: {str(e)}" logger.error(error_msg) gr.Error(error_msg) return ( pd.DataFrame(), None, None, None, error_msg, "" ) finally: if detector: detector.cleanup() stop_btn.click(fn=stop_processing, outputs=[progress]) # Main processing - simplified outputs analyze_btn.click( fn=process_and_download, inputs=[file_input], outputs=[ stats, sentiment_plot, events_plot, file_output, progress, status_box ] ) return app if __name__ == "__main__": app = create_interface() app.launch(share=True)