Spaces:
Sleeping
Sleeping
import gradio as gr | |
from huggingface_hub import InferenceClient | |
from GoogleNews import GoogleNews | |
import logging | |
import warnings | |
import textwrap | |
from tabulate import tabulate | |
import yfinance as yf | |
import pandas as pd | |
import numpy as np | |
import matplotlib.pyplot as plt | |
import requests | |
from fuzzywuzzy import process | |
import re | |
# Suppress warnings | |
warnings.filterwarnings("ignore", category=UserWarning, module="fuzzywuzzy") | |
# Set up logging | |
logging.basicConfig( | |
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" | |
) | |
class FinancialAnalyzer: | |
def __init__(self): | |
# Load the DeepSeek model directly from Hugging Face Hub | |
self.client = InferenceClient("deepseek-ai/DeepSeek-R1-Distill-Qwen-32B") | |
self.ta_config = { | |
'rsi_window': 14, | |
'macd_fast': 12, | |
'macd_slow': 26, | |
'macd_signal': 9, | |
'bollinger_window': 20, | |
'sma_windows': [20, 50, 200], | |
'ema_windows': [12, 26], | |
'volatility_window': 30 | |
} | |
logging.info("Initialized Financial Analyzer") | |
def resolve_ticker_symbol(self, query: str) -> str: | |
"""Convert company names to valid Yahoo Finance tickers""" | |
logging.info(f"Resolving ticker symbol for query: {query}") | |
url = "https://query2.finance.yahoo.com/v1/finance/search" | |
headers = {"User-Agent": "Mozilla/5.0"} | |
params = {"q": query, "quotesCount": 5, "country": "India"} | |
try: | |
response = requests.get(url, headers=headers, params=params, timeout=10) | |
response.raise_for_status() | |
data = response.json() | |
if not data.get("quotes"): | |
raise ValueError(f"No ticker found for: {query}") | |
quotes = data["quotes"] | |
names = [quote.get("longname") or quote.get("shortname", "") for quote in quotes] | |
best_match, score = process.extractOne(query, names) | |
if not best_match or score < 60: | |
raise ValueError(f"No matching ticker found for: {query}") | |
index = names.index(best_match) | |
best_quote = quotes[index] | |
resolved_ticker = best_quote["symbol"] | |
exchange_code = best_quote.get("exchange", "").upper() | |
exchange_suffix_map = { | |
"NSI": ".NS", # NSE | |
"BOM": ".BO", # BSE | |
"BSE": ".BO", | |
"NSE": ".NS", | |
} | |
suffix = exchange_suffix_map.get(exchange_code, ".NS") | |
if not resolved_ticker.endswith(suffix): | |
resolved_ticker += suffix | |
logging.info(f"Resolved ticker symbol: {resolved_ticker}") | |
return resolved_ticker | |
except Exception as e: | |
logging.error(f"Ticker resolution failed: {str(e)}") | |
raise | |
def fetch_stock_data(self, ticker): | |
"""Fetch historical data and technical indicators""" | |
logging.info(f"Fetching stock data for ticker: {ticker}") | |
try: | |
stock = yf.Ticker(ticker) | |
history = stock.history(period="1y", interval="1d") | |
if history.empty: | |
logging.error(f"No data found for {ticker}") | |
return {"error": f"No data found for {ticker}"} | |
logging.info(f"Successfully fetched stock data for {ticker}") | |
return { | |
'history': history, | |
'current_price': history['Close'].iloc[-1], | |
'indicators': self.calculate_technical_indicators(history), | |
'info': stock.info | |
} | |
except Exception as e: | |
logging.error(f"Error fetching stock data: {str(e)}") | |
return {"error": str(e)} | |
def calculate_technical_indicators(self, history): | |
"""Calculate technical analysis metrics""" | |
logging.info("Calculating technical indicators") | |
ta = {} | |
# RSI | |
delta = history['Close'].diff() | |
gain = delta.where(delta > 0, 0) | |
loss = -delta.where(delta < 0, 0) | |
avg_gain = gain.rolling(self.ta_config['rsi_window']).mean() | |
avg_loss = loss.rolling(self.ta_config['rsi_window']).mean() | |
rs = avg_gain / avg_loss | |
ta['rsi'] = 100 - (100 / (1 + rs)).iloc[-1] | |
# MACD | |
ema_fast = history['Close'].ewm(span=self.ta_config['macd_fast'], adjust=False).mean() | |
ema_slow = history['Close'].ewm(span=self.ta_config['macd_slow'], adjust=False).mean() | |
macd = ema_fast - ema_slow | |
signal = macd.ewm(span=self.ta_config['macd_signal'], adjust=False).mean() | |
ta['macd'] = macd.iloc[-1] | |
ta['macd_signal'] = signal.iloc[-1] | |
# Bollinger Bands | |
sma = history['Close'].rolling(self.ta_config['bollinger_window']).mean() | |
std = history['Close'].rolling(self.ta_config['bollinger_window']).std() | |
ta['bollinger_upper'] = (sma + 2 * std).iloc[-1] | |
ta['bollinger_lower'] = (sma - 2 * std).iloc[-1] | |
logging.info("Technical indicators calculated") | |
return ta | |
def generate_price_chart(self, history): | |
"""Generate interactive price chart""" | |
logging.info("Generating price chart") | |
try: | |
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 6), sharex=True) | |
# Price plot | |
history['Close'].plot(ax=ax1, label='Price') | |
ax1.set_title('Price Trend') | |
ax1.legend() | |
# Volume plot | |
history['Volume'].plot(ax=ax2, kind='bar', color='skyblue') | |
ax2.set_title('Trading Volume') | |
plt.tight_layout() | |
logging.info("Price chart generated") | |
return fig | |
except Exception as e: | |
logging.error(f"Chart generation failed: {str(e)}") | |
return self.create_error_plot("Chart unavailable") | |
def create_error_plot(self, message): | |
"""Create a placeholder plot for error messages""" | |
fig, ax = plt.subplots(figsize=(10, 2)) | |
ax.text(0.5, 0.5, message, | |
ha='center', va='center', | |
fontsize=12, color='red') | |
ax.axis('off') | |
return fig | |
def fetch_articles(self, query): | |
"""Fetch news articles from Google News""" | |
logging.info(f"Fetching news articles for query: {query}") | |
try: | |
googlenews = GoogleNews(lang="en") | |
googlenews.search(query) | |
articles = googlenews.result() | |
logging.info(f"Fetched {len(articles)} news articles") | |
return articles[:5] # Limit to 5 articles | |
except Exception as e: | |
logging.error(f"Error fetching articles: {str(e)}") | |
return [] | |
def analyze_article_sentiment(self, article): | |
"""Analyze article sentiment using DeepSeek model with improved parsing""" | |
logging.info(f"Analyzing sentiment for article: {article['title']}") | |
prompt = f""" | |
Analyze the sentiment and provide a brief analysis of this news article about a financial asset. | |
Respond EXACTLY in this format: | |
SENTIMENT: [POSITIVE/NEGATIVE/NEUTRAL] | |
ANALYSIS: [2-3 sentence analysis] | |
Title: {article['title']} | |
Description: {article['desc']} | |
""" | |
try: | |
response = self.client.chat.completions.create( | |
model="deepseek-ai/DeepSeek-R1-Distill-Qwen-32B", | |
messages=[{"role": "user", "content": prompt}], | |
temperature=0.1, | |
max_tokens=150 | |
) | |
response_text = response.choices[0].message.content.strip() | |
# Improved parsing using regular expressions | |
sentiment_match = re.search(r"SENTIMENT:\s*(POSITIVE|NEGATIVE|NEUTRAL)", response_text, re.IGNORECASE) | |
analysis_match = re.search(r"ANALYSIS:\s*(.+)$", response_text, re.DOTALL) | |
sentiment = "neutral" # Default value | |
if sentiment_match: | |
sentiment = sentiment_match.group(1).lower() | |
else: | |
logging.warning(f"Failed to parse sentiment from response: {response_text}") | |
analysis = "Sentiment analysis unavailable" | |
if analysis_match: | |
analysis = analysis_match.group(1).strip() | |
# Validate sentiment value | |
if sentiment not in ['positive', 'negative', 'neutral']: | |
sentiment = 'neutral' | |
logging.warning(f"Invalid sentiment value: {sentiment}") | |
logging.info(f"Sentiment analysis complete: {sentiment}") | |
return { | |
**article, | |
"sentiment": sentiment, | |
"analysis": analysis | |
} | |
except Exception as e: | |
logging.error(f"Sentiment analysis failed: {str(e)}") | |
return { | |
**article, | |
"sentiment": "neutral", | |
"analysis": "Sentiment analysis failed" | |
} | |
def generate_recommendation(self, articles, stock_data): | |
"""Generate investment recommendation with fallback values""" | |
logging.info("Generating investment recommendation") | |
# Initialize sentiment scores with default values | |
sentiment_scores = { | |
'positive': 0, | |
'negative': 0, | |
'neutral': 0 | |
} | |
for article in articles: | |
sentiment = article.get('sentiment', 'neutral') | |
if sentiment in sentiment_scores: | |
sentiment_scores[sentiment] += 1 | |
# Technical analysis with fallback values | |
ta = stock_data.get('indicators', {}) | |
price_change = stock_data['history']['Close'].pct_change().iloc[-1] if not stock_data['history'].empty else 0 | |
# Recommendation logic with safeguards | |
recommendation = "HOLD" | |
reasons = [] | |
try: | |
rsi = ta.get('rsi', 50) | |
if rsi < 30 and sentiment_scores['positive'] > sentiment_scores['negative']: | |
recommendation = "BUY" | |
reasons.append("Oversold condition with positive news sentiment") | |
elif rsi > 70 and sentiment_scores['negative'] > sentiment_scores['positive']: | |
recommendation = "SELL" | |
reasons.append("Overbought condition with negative news sentiment") | |
elif price_change > 0.05 and sentiment_scores['positive'] > 3: | |
recommendation = "STRONG BUY" | |
reasons.append("Strong positive momentum and news sentiment") | |
elif price_change < -0.05 and sentiment_scores['negative'] > 3: | |
recommendation = "STRONG SELL" | |
reasons.append("Significant downward pressure and negative news") | |
except Exception as e: | |
logging.error(f"Recommendation logic failed: {str(e)}") | |
recommendation = "HOLD" | |
reasons.append("Analysis incomplete due to data issues") | |
logging.info(f"Recommendation generated: {recommendation}") | |
return { | |
"recommendation": recommendation, | |
"reasons": reasons, | |
"sentiment_distribution": sentiment_scores, | |
"technical_indicators": ta | |
} | |
def format_analysis_output(analyzer, articles, stock_data, recommendation): | |
"""Format all analysis components for display with error handling""" | |
logging.info("Formatting analysis output") | |
try: | |
# News table | |
news_table = [] | |
for article in articles: | |
news_table.append([ | |
article.get('date', 'N/A'), | |
textwrap.fill(article.get('title', 'No title'), 40), | |
textwrap.fill(article.get('analysis', 'No analysis'), 60), | |
"π’" if article.get('sentiment') == 'positive' else "π΄" if article.get('sentiment') == 'negative' else "βͺ" | |
]) | |
# Stock info with fallback values | |
info = stock_data.get('info', {}) | |
stock_info = f""" | |
<div style="padding: 20px; background: #f8f9fa; border-radius: 10px;"> | |
<h3>{info.get('longName', 'N/A')} ({info.get('symbol', 'N/A')})</h3> | |
<p>Price: ${stock_data.get('current_price', 0):.2f}</p> | |
<p>Market Cap: {info.get('marketCap', 'N/A')}</p> | |
<p>PE Ratio: {info.get('trailingPE', 'N/A')}</p> | |
</div> | |
""" | |
# Recommendation styling | |
rec_style = { | |
"BUY": ("#d4edda", "π’"), | |
"STRONG BUY": ("#d4edda", "π’"), | |
"SELL": ("#f8d7da", "π΄"), | |
"STRONG SELL": ("#f8d7da", "π΄"), | |
"HOLD": ("#fff3cd", "βͺ") | |
}.get(recommendation['recommendation'].split()[0], ("#ffffff", "βͺ")) | |
rec_html = f""" | |
<div style="padding: 20px; background: {rec_style[0]}; border-radius: 10px;"> | |
<h2>{rec_style[1]} Recommendation: {recommendation['recommendation']}</h2> | |
<ul> | |
{"".join(f'<li>{reason}</li>' for reason in recommendation.get('reasons', ['No analysis available']))} | |
</ul> | |
</div> | |
""" | |
# Generate chart | |
chart = analyzer.generate_price_chart(stock_data['history']) | |
return { | |
"news_table": tabulate(news_table, headers=["Date", "Title", "Analysis", "Sentiment"], tablefmt="html"), | |
"stock_info": stock_info, | |
"recommendation": rec_html, | |
"chart": chart | |
} | |
except Exception as e: | |
logging.error(f"Formatting failed: {str(e)}") | |
return { | |
"error": f"Output formatting failed: {str(e)}" | |
} | |
def analyze_asset(asset_input): | |
logging.info(f"Analyzing asset: {asset_input}") | |
analyzer = FinancialAnalyzer() | |
try: | |
# Resolve ticker symbol | |
ticker = analyzer.resolve_ticker_symbol(asset_input) | |
# Fetch data | |
stock_data = analyzer.fetch_stock_data(ticker) | |
if 'error' in stock_data: | |
raise ValueError(stock_data['error']) | |
articles = analyzer.fetch_articles(asset_input) | |
analyzed_articles = [analyzer.analyze_article_sentiment(a) for a in articles] | |
# Generate recommendation | |
recommendation = analyzer.generate_recommendation(analyzed_articles, stock_data) | |
# Format output | |
results = format_analysis_output(analyzer, analyzed_articles, stock_data, recommendation) | |
logging.info(f"Analysis complete for asset: {asset_input}") | |
return results | |
except Exception as e: | |
logging.error(f"Analysis failed: {str(e)}") | |
return {"error": f"Analysis failed: {str(e)}"} | |
def main(): | |
with gr.Blocks(theme=gr.themes.Default()) as app: | |
gr.Markdown("# Advanced Stock Analysis Suite") | |
with gr.Row(): | |
asset_input = gr.Textbox(label="Stock/Company Name", placeholder="Enter stock name or symbol...") | |
analyze_btn = gr.Button("Analyze", variant="primary") | |
with gr.Tabs(): | |
with gr.Tab("News Sentiment"): | |
news_table = gr.HTML(label="News Analysis") | |
with gr.Tab("Technical Analysis"): | |
stock_info = gr.HTML() | |
price_chart = gr.Plot() | |
with gr.Tab("Recommendation"): | |
recommendation = gr.HTML() | |
def update_analysis(asset): | |
logging.info(f"Update analysis triggered for asset: {asset}") | |
results = analyze_asset(asset) | |
if 'error' in results: | |
logging.error(f"Error in analysis: {results['error']}") | |
return [f"<div style='color: red'>{results['error']}</div>"]*4 | |
logging.info(f"Analysis results returned for asset: {asset}") | |
return [ | |
results["news_table"], | |
results["stock_info"], | |
results["chart"], | |
results["recommendation"] | |
] | |
app.launch() | |
if __name__ == "__main__": | |
main() |