|
from fastapi import FastAPI, HTTPException, Query |
|
from fastapi.middleware.cors import CORSMiddleware |
|
from pydantic import BaseModel |
|
from typing import List, Optional, Dict, Any |
|
import asyncio |
|
import logging |
|
from datetime import datetime |
|
import json |
|
|
|
|
|
from scraper import NewsletterScraper |
|
from nlp import SentimentAnalyzer, KeywordExtractor |
|
from summarizer import TextSummarizer |
|
from translator import MultilingualTranslator |
|
from tts import AudioGenerator |
|
from utils import setup_logging, cache_results |
|
|
|
|
|
setup_logging() |
|
logger = logging.getLogger(__name__) |
|
|
|
|
|
app = FastAPI( |
|
title="Global Business News Intelligence API", |
|
description="Advanced news analysis with sentiment, summarization, and multilingual support", |
|
version="1.0.0" |
|
) |
|
|
|
|
|
app.add_middleware( |
|
CORSMiddleware, |
|
allow_origins=["*"], |
|
allow_credentials=True, |
|
allow_methods=["*"], |
|
allow_headers=["*"], |
|
) |
|
|
|
class AnalysisRequest(BaseModel): |
|
query: str |
|
num_articles: int = 20 |
|
languages: List[str] = ["English"] |
|
include_audio: bool = True |
|
sentiment_models: List[str] = ["VADER", "Loughran-McDonald", "FinBERT"] |
|
|
|
class AnalysisResponse(BaseModel): |
|
query: str |
|
total_articles: int |
|
processing_time: float |
|
average_sentiment: float |
|
sentiment_distribution: Dict[str, int] |
|
articles: List[Dict[str, Any]] |
|
keywords: List[Dict[str, Any]] |
|
summary: Dict[str, Any] |
|
languages: List[str] |
|
audio_files: Optional[Dict[str, str]] = None |
|
|
|
class NewsAnalyzer: |
|
"""Main news analysis orchestrator""" |
|
|
|
def __init__(self): |
|
self.scraper = NewsletterScraper() |
|
self.sentiment_analyzer = SentimentAnalyzer() |
|
self.keyword_extractor = KeywordExtractor() |
|
self.summarizer = TextSummarizer() |
|
self.translator = MultilingualTranslator() |
|
self.audio_generator = AudioGenerator() |
|
|
|
logger.info("NewsAnalyzer initialized successfully") |
|
|
|
async def analyze_news_async(self, config: Dict[str, Any], progress_callback=None) -> Dict[str, Any]: |
|
"""Async version of analyze_news""" |
|
return self.analyze_news(config, progress_callback) |
|
|
|
def analyze_news(self, config: Dict[str, Any], progress_callback=None) -> Dict[str, Any]: |
|
"""Main analysis pipeline""" |
|
start_time = datetime.now() |
|
|
|
try: |
|
query = config['query'] |
|
num_articles = config.get('num_articles', 20) |
|
languages = config.get('languages', ['English']) |
|
include_audio = config.get('include_audio', True) |
|
sentiment_models = config.get('sentiment_models', ['VADER', 'Loughran-McDonald', 'FinBERT']) |
|
|
|
logger.info(f"Starting analysis for query: {query}") |
|
|
|
if progress_callback: |
|
progress_callback(10, "Scraping articles...") |
|
|
|
|
|
articles = self.scraper.scrape_news(query, num_articles) |
|
logger.info(f"Scraped {len(articles)} articles") |
|
|
|
if not articles: |
|
raise ValueError("No articles found for the given query") |
|
|
|
if progress_callback: |
|
progress_callback(30, "Analyzing sentiment...") |
|
|
|
|
|
for article in articles: |
|
article['sentiment'] = self.sentiment_analyzer.analyze_sentiment( |
|
article['content'], |
|
models=sentiment_models |
|
) |
|
|
|
if progress_callback: |
|
progress_callback(50, "Extracting keywords...") |
|
|
|
|
|
all_text = ' '.join([article['content'] for article in articles]) |
|
keywords = self.keyword_extractor.extract_keywords(all_text) |
|
|
|
if progress_callback: |
|
progress_callback(60, "Generating summaries...") |
|
|
|
|
|
for article in articles: |
|
article['summary'] = self.summarizer.summarize(article['content']) |
|
|
|
|
|
if len(languages) > 1: |
|
article['summaries'] = {} |
|
for lang in languages: |
|
if lang != 'English': |
|
article['summaries'][lang] = self.translator.translate( |
|
article['summary'], |
|
target_lang=lang |
|
) |
|
else: |
|
article['summaries'][lang] = article['summary'] |
|
|
|
if progress_callback: |
|
progress_callback(80, "Generating audio...") |
|
|
|
|
|
audio_files = {} |
|
if include_audio and languages: |
|
|
|
overall_summary = self.create_overall_summary(articles, keywords) |
|
|
|
for lang in languages: |
|
if lang in ['English', 'Hindi', 'Tamil']: |
|
try: |
|
if lang != 'English': |
|
summary_text = self.translator.translate(overall_summary, target_lang=lang) |
|
else: |
|
summary_text = overall_summary |
|
|
|
audio_file = self.audio_generator.generate_audio( |
|
summary_text, |
|
language=lang, |
|
output_file=f"summary_{lang.lower()}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.mp3" |
|
) |
|
audio_files[lang] = audio_file |
|
except Exception as e: |
|
logger.error(f"Error generating audio for {lang}: {str(e)}") |
|
|
|
if progress_callback: |
|
progress_callback(90, "Finalizing results...") |
|
|
|
|
|
sentiments = [article['sentiment']['compound'] for article in articles] |
|
average_sentiment = sum(sentiments) / len(sentiments) if sentiments else 0.0 |
|
|
|
sentiment_distribution = { |
|
'Positive': sum(1 for s in sentiments if s > 0.1), |
|
'Negative': sum(1 for s in sentiments if s < -0.1), |
|
'Neutral': sum(1 for s in sentiments if -0.1 <= s <= 0.1) |
|
} |
|
|
|
|
|
processing_time = (datetime.now() - start_time).total_seconds() |
|
|
|
results = { |
|
'query': query, |
|
'total_articles': len(articles), |
|
'processing_time': processing_time, |
|
'average_sentiment': average_sentiment, |
|
'sentiment_distribution': sentiment_distribution, |
|
'articles': articles, |
|
'keywords': keywords, |
|
'languages': languages, |
|
'audio_files': audio_files, |
|
'summary': { |
|
'average_sentiment': average_sentiment, |
|
'total_articles': len(articles), |
|
'sources': len(set([article['source'] for article in articles])), |
|
'date_range': self.get_date_range(articles) |
|
} |
|
} |
|
|
|
if progress_callback: |
|
progress_callback(100, "Analysis complete!") |
|
|
|
logger.info(f"Analysis completed successfully in {processing_time:.2f} seconds") |
|
return results |
|
|
|
except Exception as e: |
|
logger.error(f"Error in analysis pipeline: {str(e)}") |
|
raise e |
|
|
|
def create_overall_summary(self, articles: List[Dict], keywords: List[Dict]) -> str: |
|
"""Create an overall summary for audio generation""" |
|
try: |
|
|
|
top_keywords = [kw['keyword'] for kw in keywords[:10]] |
|
|
|
|
|
positive_count = sum(1 for article in articles if article['sentiment']['compound'] > 0.1) |
|
negative_count = sum(1 for article in articles if article['sentiment']['compound'] < -0.1) |
|
neutral_count = len(articles) - positive_count - negative_count |
|
|
|
|
|
summary = f"Analysis of {len(articles)} articles reveals " |
|
|
|
if positive_count > negative_count: |
|
summary += f"predominantly positive sentiment with {positive_count} positive, {negative_count} negative, and {neutral_count} neutral articles. " |
|
elif negative_count > positive_count: |
|
summary += f"predominantly negative sentiment with {negative_count} negative, {positive_count} positive, and {neutral_count} neutral articles. " |
|
else: |
|
summary += f"mixed sentiment with balanced coverage. " |
|
|
|
if top_keywords: |
|
summary += f"Key topics include: {', '.join(top_keywords[:5])}. " |
|
|
|
|
|
top_positive = sorted(articles, key=lambda x: x['sentiment']['compound'], reverse=True)[:2] |
|
top_negative = sorted(articles, key=lambda x: x['sentiment']['compound'])[:2] |
|
|
|
if top_positive[0]['sentiment']['compound'] > 0.1: |
|
summary += f"Most positive coverage: {top_positive[0]['title'][:100]}. " |
|
|
|
if top_negative[0]['sentiment']['compound'] < -0.1: |
|
summary += f"Most concerning coverage: {top_negative[0]['title'][:100]}. " |
|
|
|
return summary |
|
|
|
except Exception as e: |
|
logger.error(f"Error creating overall summary: {str(e)}") |
|
return f"Analysis of {len(articles)} articles completed successfully." |
|
|
|
def get_date_range(self, articles: List[Dict]) -> Dict[str, str]: |
|
"""Get the date range of articles""" |
|
try: |
|
dates = [article['date'] for article in articles if 'date' in article and article['date']] |
|
if dates: |
|
dates = [d for d in dates if d is not None] |
|
if dates: |
|
min_date = min(dates) |
|
max_date = max(dates) |
|
return { |
|
'start': str(min_date), |
|
'end': str(max_date) |
|
} |
|
return {'start': 'Unknown', 'end': 'Unknown'} |
|
except Exception as e: |
|
logger.error(f"Error getting date range: {str(e)}") |
|
return {'start': 'Unknown', 'end': 'Unknown'} |
|
|
|
|
|
analyzer = NewsAnalyzer() |
|
|
|
|
|
@app.get("/", response_model=Dict[str, str]) |
|
async def root(): |
|
"""API root endpoint""" |
|
return { |
|
"message": "Global Business News Intelligence API", |
|
"version": "1.0.0", |
|
"docs": "/docs" |
|
} |
|
|
|
@app.get("/health", response_model=Dict[str, str]) |
|
async def health_check(): |
|
"""Health check endpoint""" |
|
return {"status": "healthy", "timestamp": datetime.now().isoformat()} |
|
|
|
@app.get("/api/analyze", response_model=AnalysisResponse) |
|
async def analyze_news_endpoint( |
|
query: str = Query(..., description="Company name, ticker, or keyword to analyze"), |
|
num_articles: int = Query(20, description="Number of articles to analyze (5-50)", ge=5, le=50), |
|
languages: List[str] = Query(["English"], description="Languages for summaries"), |
|
include_audio: bool = Query(True, description="Generate audio summaries"), |
|
sentiment_models: List[str] = Query(["VADER", "Loughran-McDonald", "FinBERT"], description="Sentiment models to use") |
|
): |
|
"""Main analysis endpoint""" |
|
try: |
|
config = { |
|
'query': query, |
|
'num_articles': num_articles, |
|
'languages': languages, |
|
'include_audio': include_audio, |
|
'sentiment_models': sentiment_models |
|
} |
|
|
|
results = await analyzer.analyze_news_async(config) |
|
|
|
return AnalysisResponse(**results) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in analyze endpoint: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.post("/api/analyze", response_model=AnalysisResponse) |
|
async def analyze_news_post(request: AnalysisRequest): |
|
"""POST version of analysis endpoint""" |
|
try: |
|
config = request.dict() |
|
results = await analyzer.analyze_news_async(config) |
|
return AnalysisResponse(**results) |
|
|
|
except Exception as e: |
|
logger.error(f"Error in analyze POST endpoint: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
@app.get("/api/sources", response_model=List[str]) |
|
async def get_available_sources(): |
|
"""Get list of available news sources""" |
|
return analyzer.scraper.get_available_sources() |
|
|
|
@app.get("/api/models", response_model=Dict[str, List[str]]) |
|
async def get_available_models(): |
|
"""Get list of available models""" |
|
return { |
|
"sentiment_models": ["VADER", "Loughran-McDonald", "FinBERT"], |
|
"summarization_models": ["distilbart-cnn-12-6"], |
|
"translation_models": ["Helsinki-NLP/opus-mt-en-hi", "Helsinki-NLP/opus-mt-en-fi"], |
|
"audio_languages": ["English", "Hindi", "Tamil"] |
|
} |
|
|
|
@app.get("/api/keywords/{query}", response_model=List[Dict[str, Any]]) |
|
async def extract_keywords_endpoint( |
|
query: str, |
|
num_keywords: int = Query(20, description="Number of keywords to extract", ge=5, le=50) |
|
): |
|
"""Extract keywords from a query or text""" |
|
try: |
|
|
|
articles = analyzer.scraper.scrape_news(query, 5) |
|
if not articles: |
|
raise HTTPException(status_code=404, detail="No articles found for query") |
|
|
|
all_text = ' '.join([article['content'] for article in articles]) |
|
keywords = analyzer.keyword_extractor.extract_keywords(all_text, num_keywords=num_keywords) |
|
|
|
return keywords |
|
|
|
except Exception as e: |
|
logger.error(f"Error in keywords endpoint: {str(e)}") |
|
raise HTTPException(status_code=500, detail=str(e)) |
|
|
|
if __name__ == "__main__": |
|
import uvicorn |
|
uvicorn.run(app, host="0.0.0.0", port=8000) |