FIN-VISION / app.py
CosmickVisions's picture
Update app.py
d5b9a02 verified
import gradio as gr
import groq
import os
import tempfile
import uuid
import yfinance as yf
import pandas as pd
import plotly.graph_objects as go
from dotenv import load_dotenv
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
import fitz # PyMuPDF
import base64
from PIL import Image
import io
import requests
import json
# Load environment variables
load_dotenv()
client = groq.Client(api_key=os.getenv("GROQ_LEGAL_API_KEY"))
embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
# Directory to store FAISS indexes
FAISS_INDEX_DIR = "faiss_indexes_finance"
if not os.path.exists(FAISS_INDEX_DIR):
os.makedirs(FAISS_INDEX_DIR)
# Dictionary to store user-specific vectorstores
user_vectorstores = {}
# Custom CSS for Finance theme
custom_css = """
:root {
--primary-color: #FFD700; /* Gold */
--secondary-color: #008000; /* Dark Green */
--light-background: #F0FFF0; /* Honeydew */
--dark-text: #333333;
--white: #FFFFFF;
--border-color: #E5E7EB;
}
body { background-color: var(--light-background); font-family: 'Inter', sans-serif; }
.container { max-width: 1200px !important; margin: 0 auto !important; padding: 10px; }
.header { background-color: var(--white); border-bottom: 2px solid var(--border-color); padding: 15px 0; margin-bottom: 20px; border-radius: 12px 12px 0 0; box-shadow: 0 2px 4px rgba(0,0,0,0.05); }
.header-title { color: var(--secondary-color); font-size: 1.8rem; font-weight: 700; text-align: center; }
.header-subtitle { color: var(--dark-text); font-size: 1rem; text-align: center; margin-top: 5px; }
.chat-container { border-radius: 12px !important; box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; background-color: var(--white) !important; border: 1px solid var(--border-color) !important; min-height: 500px; }
.message-user { background-color: var(--primary-color) !important; color: var(--dark-text) !important; border-radius: 18px 18px 4px 18px !important; padding: 12px 16px !important; margin-left: auto !important; max-width: 80% !important; }
.message-bot { background-color: #F0F0F0 !important; color: var(--dark-text) !important; border-radius: 18px 18px 18px 4px !important; padding: 12px 16px !important; margin-right: auto !important; max-width: 80% !important; }
.input-area { background-color: var(--white) !important; border-top: 1px solid var(--border-color) !important; padding: 12px !important; border-radius: 0 0 12px 12px !important; }
.input-box { border: 1px solid var(--border-color) !important; border-radius: 24px !important; padding: 12px 16px !important; box-shadow: 0 2px 4px rgba(0,0,0,0.05) !important; }
.send-btn { background-color: var(--secondary-color) !important; border-radius: 24px !important; color: var(--white) !important; padding: 10px 20px !important; font-weight: 500 !important; }
.clear-btn { background-color: #F0F0F0 !important; border: 1px solid var(--border-color) !important; border-radius: 24px !important; color: var(--dark-text) !important; padding: 8px 16px !important; font-weight: 500 !important; }
.pdf-viewer-container { border-radius: 12px !important; box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; background-color: var(--white) !important; border: 1px solid var(--border-color) !important; padding: 20px; }
.pdf-viewer-image { max-width: 100%; height: auto; border: 1px solid var(--border-color); border-radius: 12px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); }
.stats-box { background-color: #E6F2E6; padding: 10px; border-radius: 8px; margin-top: 10px; }
.tool-container { background-color: var(--white); border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); padding: 15px; margin-bottom: 20px; }
.tool-title { color: var(--secondary-color); font-size: 1.2rem; font-weight: 600; margin-bottom: 10px; }
.chart-container { height: 400px; width: 100%; border-radius: 8px; overflow: hidden; }
"""
# Function to process PDF files (unchanged)
def process_pdf(pdf_file):
if pdf_file is None:
return None, "No file uploaded", {"page_images": [], "total_pages": 0, "total_words": 0}
try:
session_id = str(uuid.uuid4())
with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file:
temp_file.write(pdf_file)
pdf_path = temp_file.name
doc = fitz.open(pdf_path)
texts = [page.get_text() for page in doc]
page_images = []
for page in doc:
pix = page.get_pixmap()
img_bytes = pix.tobytes("png")
img_base64 = base64.b64encode(img_bytes).decode("utf-8")
page_images.append(img_base64)
total_pages = len(doc)
total_words = sum(len(text.split()) for text in texts)
doc.close()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
chunks = text_splitter.create_documents(texts)
vectorstore = FAISS.from_documents(chunks, embeddings)
index_path = os.path.join(FAISS_INDEX_DIR, session_id)
vectorstore.save_local(index_path)
user_vectorstores[session_id] = vectorstore
os.unlink(pdf_path)
pdf_state = {"page_images": page_images, "total_pages": total_pages, "total_words": total_words}
return session_id, f"✅ Successfully processed {len(chunks)} text chunks from your PDF", pdf_state
except Exception as e:
if "pdf_path" in locals() and os.path.exists(pdf_path):
os.unlink(pdf_path)
return None, f"Error processing PDF: {str(e)}", {"page_images": [], "total_pages": 0, "total_words": 0}
# Function to generate chatbot responses with Finance theme
def generate_response(message, session_id, model_name, history):
if not message:
return history
try:
context = ""
if session_id and session_id in user_vectorstores:
vectorstore = user_vectorstores[session_id]
docs = vectorstore.similarity_search(message, k=3)
if docs:
context = "\n\nRelevant information from uploaded PDF:\n" + "\n".join(f"- {doc.page_content}" for doc in docs)
# Check if it's a stock ticker query
if message.startswith("$") and len(message) > 1 and len(message) <= 6:
ticker = message[1:].upper()
try:
stock_data = get_stock_data(ticker)
response = f"**Stock Information for {ticker}**\n\n"
response += f"Current Price: ${stock_data['current_price']}\n"
response += f"52-Week High: ${stock_data['52wk_high']}\n"
response += f"Market Cap: ${stock_data['market_cap']:,}\n"
response += f"P/E Ratio: {stock_data['pe_ratio']}\n"
response += f"More data available in the Stock Analysis tab."
history.append((message, response))
return history
except Exception as e:
history.append((message, f"Error retrieving stock data for {ticker}: {str(e)}"))
return history
system_prompt = "You are a financial assistant specializing in analyzing financial reports, statements, and market trends."
system_prompt += " You can help with stock market information, financial terminology, ratio analysis, and investment concepts."
if context:
system_prompt += " Use the following context to answer the question if relevant: " + context
completion = client.chat.completions.create(
model=model_name,
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": message}
],
temperature=0.7,
max_tokens=1024
)
response = completion.choices[0].message.content
history.append((message, response))
return history
except Exception as e:
history.append((message, f"Error generating response: {str(e)}"))
return history
# Functions to update PDF viewer (unchanged)
def update_pdf_viewer(pdf_state):
if not pdf_state["total_pages"]:
return 0, None, "No PDF uploaded yet"
try:
img_data = base64.b64decode(pdf_state["page_images"][0])
img = Image.open(io.BytesIO(img_data))
return pdf_state["total_pages"], img, f"**Total Pages:** {pdf_state['total_pages']}\n**Total Words:** {pdf_state['total_words']}"
except Exception as e:
print(f"Error decoding image: {e}")
return 0, None, "Error displaying PDF"
def update_image(page_num, pdf_state):
if not pdf_state["total_pages"] or page_num < 1 or page_num > pdf_state["total_pages"]:
return None
try:
img_data = base64.b64decode(pdf_state["page_images"][page_num - 1])
img = Image.open(io.BytesIO(img_data))
return img
except Exception as e:
print(f"Error decoding image: {e}")
return None
# New Finance-specific tools
def get_stock_data(ticker):
"""Tool to fetch latest stock data for a given ticker"""
try:
stock = yf.Ticker(ticker)
info = stock.info
return {
"current_price": info.get("currentPrice", info.get("regularMarketPrice", "N/A")),
"52wk_high": info.get("fiftyTwoWeekHigh", "N/A"),
"market_cap": info.get("marketCap", "N/A"),
"pe_ratio": info.get("trailingPE", "N/A"),
"dividend_yield": info.get("dividendYield", "N/A"),
"beta": info.get("beta", "N/A"),
"average_volume": info.get("averageVolume", "N/A")
}
except Exception as e:
print(f"Error fetching stock data: {e}")
raise e
def get_stock_history(ticker, period="1y"):
"""Get historical data for charting"""
try:
stock = yf.Ticker(ticker)
hist = stock.history(period=period)
return hist
except Exception as e:
print(f"Error fetching stock history: {e}")
return pd.DataFrame()
def get_fred_data(indicator):
"""Get economic data from FRED API"""
api_key = os.getenv("FRED_API_KEY", "")
if not api_key:
return "FRED API key not configured"
base_url = "https://api.stlouisfed.org/fred/series/observations"
params = {
"series_id": indicator,
"api_key": api_key,
"file_type": "json",
"sort_order": "desc",
"limit": 100
}
try:
response = requests.get(base_url, params=params)
data = response.json()
return data.get("observations", [])
except Exception as e:
print(f"Error fetching FRED data: {e}")
return []
def create_stock_chart(ticker, period="1y"):
"""Create an interactive stock chart using Plotly"""
try:
df = get_stock_history(ticker, period)
if df.empty:
return None
fig = go.Figure()
# Add candlestick chart
fig.add_trace(
go.Candlestick(
x=df.index,
open=df['Open'],
high=df['High'],
low=df['Low'],
close=df['Close'],
name=ticker
)
)
# Add volume as bar chart on secondary y-axis
fig.add_trace(
go.Bar(
x=df.index,
y=df['Volume'],
name='Volume',
marker_color='rgba(0, 128, 0, 0.3)',
yaxis='y2'
)
)
# Update layout for dual y-axis
fig.update_layout(
title=f'{ticker} Stock Price',
yaxis_title='Price (USD)',
xaxis_title='Date',
template='plotly_white',
yaxis=dict(
domain=[0.3, 1.0]
),
yaxis2=dict(
domain=[0, 0.2],
title='Volume'
),
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
height=500
)
return fig
except Exception as e:
print(f"Error creating stock chart: {e}")
return None
def analyze_ticker(ticker_input, period):
"""Process the ticker input and return analysis"""
if not ticker_input:
return None, "Please enter a valid ticker symbol", None
ticker = ticker_input.strip().upper()
if ticker.startswith("$"):
ticker = ticker[1:]
try:
stock_data = get_stock_data(ticker)
chart = create_stock_chart(ticker, period)
# Create a formatted summary
summary = f"""
### {ticker} Analysis
**Current Price:** ${stock_data['current_price']}
**52-Week High:** ${stock_data['52wk_high']}
**Market Cap:** ${stock_data['market_cap']:,}
**P/E Ratio:** {stock_data['pe_ratio']}
**Dividend Yield:** {stock_data['dividend_yield'] * 100 if stock_data['dividend_yield'] != 'N/A' else 'N/A'}%
**Beta:** {stock_data['beta']}
**Avg Volume:** {stock_data['average_volume']:,}
"""
return chart, summary, ticker
except Exception as e:
return None, f"Error analyzing ticker {ticker}: {str(e)}", None
# Gradio interface
with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo:
current_session_id = gr.State(None)
pdf_state = gr.State({"page_images": [], "total_pages": 0, "total_words": 0})
current_ticker = gr.State(None)
gr.HTML("""
<div class="header">
<div class="header-title">Fin-Vision</div>
<div class="header-subtitle">Analyze financial documents with Groq's LLM API.</div>
</div>
""")
with gr.Row(elem_classes="container"):
with gr.Column(scale=1, min_width=300):
pdf_file = gr.File(label="Upload PDF Document", file_types=[".pdf"], type="binary")
upload_button = gr.Button("Process PDF", variant="primary")
pdf_status = gr.Markdown("No PDF uploaded yet")
model_dropdown = gr.Dropdown(
choices=["llama3-70b-8192", "llama3-8b-8192", "mixtral-8x7b-32768", "gemma-7b-it"],
value="llama3-70b-8192",
label="Select Groq Model"
)
# Finance Tools Section
gr.Markdown("### Financial Tools", elem_classes="tool-title")
with gr.Group(elem_classes="tool-container"):
with gr.Tabs():
with gr.TabItem("Stock Analysis"):
ticker_input = gr.Textbox(label="Enter Ticker Symbol (e.g., AAPL)", placeholder="AAPL")
period_dropdown = gr.Dropdown(
choices=["1mo", "3mo", "6mo", "1y", "2y", "5y", "max"],
value="1y",
label="Time Period"
)
analyze_button = gr.Button("Analyze Stock")
with gr.Column(scale=2, min_width=600):
with gr.Tabs():
with gr.TabItem("PDF Viewer"):
with gr.Column(elem_classes="pdf-viewer-container"):
page_slider = gr.Slider(minimum=1, maximum=1, step=1, label="Page Number", value=1)
pdf_image = gr.Image(label="PDF Page", type="pil", elem_classes="pdf-viewer-image")
stats_display = gr.Markdown("No PDF uploaded yet", elem_classes="stats-box")
with gr.TabItem("Stock Analysis"):
with gr.Column(elem_classes="pdf-viewer-container"):
stock_chart = gr.Plot(label="Stock Price Chart", elem_classes="chart-container")
stock_summary = gr.Markdown("Enter a ticker symbol to see analysis")
with gr.Row(elem_classes="container"):
with gr.Column(scale=2, min_width=600):
chatbot = gr.Chatbot(height=500, bubble_full_width=False, show_copy_button=True, elem_classes="chat-container")
with gr.Row():
msg = gr.Textbox(show_label=False, placeholder="Ask about your financial document or type $TICKER for stock info...", scale=5)
send_btn = gr.Button("Send", scale=1)
clear_btn = gr.Button("Clear Conversation")
# Event Handlers
upload_button.click(
process_pdf,
inputs=[pdf_file],
outputs=[current_session_id, pdf_status, pdf_state]
).then(
update_pdf_viewer,
inputs=[pdf_state],
outputs=[page_slider, pdf_image, stats_display]
)
msg.submit(
generate_response,
inputs=[msg, current_session_id, model_dropdown, chatbot],
outputs=[chatbot]
).then(lambda: "", None, [msg])
send_btn.click(
generate_response,
inputs=[msg, current_session_id, model_dropdown, chatbot],
outputs=[chatbot]
).then(lambda: "", None, [msg])
clear_btn.click(
lambda: ([], None, "No PDF uploaded yet", {"page_images": [], "total_pages": 0, "total_words": 0}, 0, None, "No PDF uploaded yet", None),
None,
[chatbot, current_session_id, pdf_status, pdf_state, page_slider, pdf_image, stats_display, current_ticker]
)
page_slider.change(
update_image,
inputs=[page_slider, pdf_state],
outputs=[pdf_image]
)
# Stock analysis handler
analyze_button.click(
analyze_ticker,
inputs=[ticker_input, period_dropdown],
outputs=[stock_chart, stock_summary, current_ticker]
)
# Add footer with attribution
gr.HTML("""
<div style="text-align: center; margin-top: 20px; padding: 10px; color: #666; font-size: 0.8rem; border-top: 1px solid #eee;">
Created by Calvin Allen Crawford
</div>
""")
# Launch the app
if __name__ == "__main__":
demo.launch()