| import logging |
| import re |
| from typing import Dict, Any |
| from transformers import pipeline |
| import torch |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class SentimentPipeline: |
| def __init__(self, model_name: str = "ProsusAI/finbert"): |
| """ |
| Initialize NLP pipeline for sentiment analysis of news/tweets. |
| Using FinBERT as it's tuned for financial/market sentiment. |
| """ |
| self.model_name = model_name |
| self.device = 0 if torch.cuda.is_available() else ( |
| "mps" if torch.backends.mps.is_available() else -1 |
| ) |
| logger.info(f"Loading NLP Pipeline '{model_name}' on device '{self.device}'...") |
| |
| try: |
| self.classifier = pipeline( |
| "sentiment-analysis", |
| model=self.model_name, |
| device=self.device |
| ) |
| logger.info("NLP Pipeline loaded successfully.") |
| except Exception as e: |
| logger.error(f"Failed to load NLP model: {e}") |
| self.classifier = None |
|
|
| def preprocess_text(self, text: str) -> str: |
| """Clean up social media artifacts.""" |
| |
| text = re.sub(r'http\S+', '', text) |
| |
| text = re.sub(r'@\w+', '', text) |
| |
| text = ' '.join(text.split()) |
| return text |
|
|
| def analyze_sentiment(self, text: str) -> Dict[str, Any]: |
| """ |
| Analyze sentiment of a single text. |
| Returns score from -1.0 (Negative) to +1.0 (Positive) and raw confidence. |
| """ |
| if not self.classifier: |
| return {"score": 0.0, "confidence": 0.0, "label": "neutral"} |
| |
| clean_text = self.preprocess_text(text) |
| if not clean_text: |
| return {"score": 0.0, "confidence": 0.0, "label": "neutral"} |
| |
| |
| try: |
| result = self.classifier(clean_text)[0] |
| label = result['label'].lower() |
| confidence = result['score'] |
| |
| |
| if label == "positive": |
| score = confidence |
| elif label == "negative": |
| score = -confidence |
| else: |
| score = 0.0 |
| |
| return { |
| "score": score, |
| "confidence": confidence, |
| "label": label |
| } |
| except Exception as e: |
| logger.error(f"Sentiment analysis failed: {e}") |
| return {"score": 0.0, "confidence": 0.0, "label": "error"} |
|
|
| def aggregate_stream_sentiment(self, text_stream: list[str]) -> float: |
| """Calculate average sentiment from a batch of texts.""" |
| if not text_stream: return 0.0 |
| |
| scores = [] |
| for text in text_stream: |
| res = self.analyze_sentiment(text) |
| scores.append(res['score']) |
| |
| return sum(scores) / len(scores) |
|
|