import gradio as gr import groq import os import tempfile import uuid import yfinance as yf import pandas as pd import plotly.graph_objects as go from dotenv import load_dotenv from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain.vectorstores import FAISS from langchain.embeddings import HuggingFaceEmbeddings import fitz # PyMuPDF import base64 from PIL import Image import io import requests import json # Load environment variables load_dotenv() client = groq.Client(api_key=os.getenv("GROQ_LEGAL_API_KEY")) embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2") # Directory to store FAISS indexes FAISS_INDEX_DIR = "faiss_indexes_finance" if not os.path.exists(FAISS_INDEX_DIR): os.makedirs(FAISS_INDEX_DIR) # Dictionary to store user-specific vectorstores user_vectorstores = {} # Custom CSS for Finance theme custom_css = """ :root { --primary-color: #FFD700; /* Gold */ --secondary-color: #008000; /* Dark Green */ --light-background: #F0FFF0; /* Honeydew */ --dark-text: #333333; --white: #FFFFFF; --border-color: #E5E7EB; } body { background-color: var(--light-background); font-family: 'Inter', sans-serif; } .container { max-width: 1200px !important; margin: 0 auto !important; padding: 10px; } .header { background-color: var(--white); border-bottom: 2px solid var(--border-color); padding: 15px 0; margin-bottom: 20px; border-radius: 12px 12px 0 0; box-shadow: 0 2px 4px rgba(0,0,0,0.05); } .header-title { color: var(--secondary-color); font-size: 1.8rem; font-weight: 700; text-align: center; } .header-subtitle { color: var(--dark-text); font-size: 1rem; text-align: center; margin-top: 5px; } .chat-container { border-radius: 12px !important; box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; background-color: var(--white) !important; border: 1px solid var(--border-color) !important; min-height: 500px; } .message-user { background-color: var(--primary-color) !important; color: var(--dark-text) !important; border-radius: 18px 18px 4px 18px !important; padding: 12px 16px !important; margin-left: auto !important; max-width: 80% !important; } .message-bot { background-color: #F0F0F0 !important; color: var(--dark-text) !important; border-radius: 18px 18px 18px 4px !important; padding: 12px 16px !important; margin-right: auto !important; max-width: 80% !important; } .input-area { background-color: var(--white) !important; border-top: 1px solid var(--border-color) !important; padding: 12px !important; border-radius: 0 0 12px 12px !important; } .input-box { border: 1px solid var(--border-color) !important; border-radius: 24px !important; padding: 12px 16px !important; box-shadow: 0 2px 4px rgba(0,0,0,0.05) !important; } .send-btn { background-color: var(--secondary-color) !important; border-radius: 24px !important; color: var(--white) !important; padding: 10px 20px !important; font-weight: 500 !important; } .clear-btn { background-color: #F0F0F0 !important; border: 1px solid var(--border-color) !important; border-radius: 24px !important; color: var(--dark-text) !important; padding: 8px 16px !important; font-weight: 500 !important; } .pdf-viewer-container { border-radius: 12px !important; box-shadow: 0 4px 6px rgba(0,0,0,0.1) !important; background-color: var(--white) !important; border: 1px solid var(--border-color) !important; padding: 20px; } .pdf-viewer-image { max-width: 100%; height: auto; border: 1px solid var(--border-color); border-radius: 12px; box-shadow: 0 2px 4px rgba(0,0,0,0.05); } .stats-box { background-color: #E6F2E6; padding: 10px; border-radius: 8px; margin-top: 10px; } .tool-container { background-color: var(--white); border-radius: 12px; box-shadow: 0 4px 6px rgba(0,0,0,0.1); padding: 15px; margin-bottom: 20px; } .tool-title { color: var(--secondary-color); font-size: 1.2rem; font-weight: 600; margin-bottom: 10px; } .chart-container { height: 400px; width: 100%; border-radius: 8px; overflow: hidden; } """ # Function to process PDF files (unchanged) def process_pdf(pdf_file): if pdf_file is None: return None, "No file uploaded", {"page_images": [], "total_pages": 0, "total_words": 0} try: session_id = str(uuid.uuid4()) with tempfile.NamedTemporaryFile(suffix=".pdf", delete=False) as temp_file: temp_file.write(pdf_file) pdf_path = temp_file.name doc = fitz.open(pdf_path) texts = [page.get_text() for page in doc] page_images = [] for page in doc: pix = page.get_pixmap() img_bytes = pix.tobytes("png") img_base64 = base64.b64encode(img_bytes).decode("utf-8") page_images.append(img_base64) total_pages = len(doc) total_words = sum(len(text.split()) for text in texts) doc.close() text_splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200) chunks = text_splitter.create_documents(texts) vectorstore = FAISS.from_documents(chunks, embeddings) index_path = os.path.join(FAISS_INDEX_DIR, session_id) vectorstore.save_local(index_path) user_vectorstores[session_id] = vectorstore os.unlink(pdf_path) pdf_state = {"page_images": page_images, "total_pages": total_pages, "total_words": total_words} return session_id, f"✅ Successfully processed {len(chunks)} text chunks from your PDF", pdf_state except Exception as e: if "pdf_path" in locals() and os.path.exists(pdf_path): os.unlink(pdf_path) return None, f"Error processing PDF: {str(e)}", {"page_images": [], "total_pages": 0, "total_words": 0} # Function to generate chatbot responses with Finance theme def generate_response(message, session_id, model_name, history): if not message: return history try: context = "" if session_id and session_id in user_vectorstores: vectorstore = user_vectorstores[session_id] docs = vectorstore.similarity_search(message, k=3) if docs: context = "\n\nRelevant information from uploaded PDF:\n" + "\n".join(f"- {doc.page_content}" for doc in docs) # Check if it's a stock ticker query if message.startswith("$") and len(message) > 1 and len(message) <= 6: ticker = message[1:].upper() try: stock_data = get_stock_data(ticker) response = f"**Stock Information for {ticker}**\n\n" response += f"Current Price: ${stock_data['current_price']}\n" response += f"52-Week High: ${stock_data['52wk_high']}\n" response += f"Market Cap: ${stock_data['market_cap']:,}\n" response += f"P/E Ratio: {stock_data['pe_ratio']}\n" response += f"More data available in the Stock Analysis tab." history.append((message, response)) return history except Exception as e: history.append((message, f"Error retrieving stock data for {ticker}: {str(e)}")) return history system_prompt = "You are a financial assistant specializing in analyzing financial reports, statements, and market trends." system_prompt += " You can help with stock market information, financial terminology, ratio analysis, and investment concepts." if context: system_prompt += " Use the following context to answer the question if relevant: " + context completion = client.chat.completions.create( model=model_name, messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": message} ], temperature=0.7, max_tokens=1024 ) response = completion.choices[0].message.content history.append((message, response)) return history except Exception as e: history.append((message, f"Error generating response: {str(e)}")) return history # Functions to update PDF viewer (unchanged) def update_pdf_viewer(pdf_state): if not pdf_state["total_pages"]: return 0, None, "No PDF uploaded yet" try: img_data = base64.b64decode(pdf_state["page_images"][0]) img = Image.open(io.BytesIO(img_data)) return pdf_state["total_pages"], img, f"**Total Pages:** {pdf_state['total_pages']}\n**Total Words:** {pdf_state['total_words']}" except Exception as e: print(f"Error decoding image: {e}") return 0, None, "Error displaying PDF" def update_image(page_num, pdf_state): if not pdf_state["total_pages"] or page_num < 1 or page_num > pdf_state["total_pages"]: return None try: img_data = base64.b64decode(pdf_state["page_images"][page_num - 1]) img = Image.open(io.BytesIO(img_data)) return img except Exception as e: print(f"Error decoding image: {e}") return None # New Finance-specific tools def get_stock_data(ticker): """Tool to fetch latest stock data for a given ticker""" try: stock = yf.Ticker(ticker) info = stock.info return { "current_price": info.get("currentPrice", info.get("regularMarketPrice", "N/A")), "52wk_high": info.get("fiftyTwoWeekHigh", "N/A"), "market_cap": info.get("marketCap", "N/A"), "pe_ratio": info.get("trailingPE", "N/A"), "dividend_yield": info.get("dividendYield", "N/A"), "beta": info.get("beta", "N/A"), "average_volume": info.get("averageVolume", "N/A") } except Exception as e: print(f"Error fetching stock data: {e}") raise e def get_stock_history(ticker, period="1y"): """Get historical data for charting""" try: stock = yf.Ticker(ticker) hist = stock.history(period=period) return hist except Exception as e: print(f"Error fetching stock history: {e}") return pd.DataFrame() def get_fred_data(indicator): """Get economic data from FRED API""" api_key = os.getenv("FRED_API_KEY", "") if not api_key: return "FRED API key not configured" base_url = "https://api.stlouisfed.org/fred/series/observations" params = { "series_id": indicator, "api_key": api_key, "file_type": "json", "sort_order": "desc", "limit": 100 } try: response = requests.get(base_url, params=params) data = response.json() return data.get("observations", []) except Exception as e: print(f"Error fetching FRED data: {e}") return [] def create_stock_chart(ticker, period="1y"): """Create an interactive stock chart using Plotly""" try: df = get_stock_history(ticker, period) if df.empty: return None fig = go.Figure() # Add candlestick chart fig.add_trace( go.Candlestick( x=df.index, open=df['Open'], high=df['High'], low=df['Low'], close=df['Close'], name=ticker ) ) # Add volume as bar chart on secondary y-axis fig.add_trace( go.Bar( x=df.index, y=df['Volume'], name='Volume', marker_color='rgba(0, 128, 0, 0.3)', yaxis='y2' ) ) # Update layout for dual y-axis fig.update_layout( title=f'{ticker} Stock Price', yaxis_title='Price (USD)', xaxis_title='Date', template='plotly_white', yaxis=dict( domain=[0.3, 1.0] ), yaxis2=dict( domain=[0, 0.2], title='Volume' ), legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), height=500 ) return fig except Exception as e: print(f"Error creating stock chart: {e}") return None def analyze_ticker(ticker_input, period): """Process the ticker input and return analysis""" if not ticker_input: return None, "Please enter a valid ticker symbol", None ticker = ticker_input.strip().upper() if ticker.startswith("$"): ticker = ticker[1:] try: stock_data = get_stock_data(ticker) chart = create_stock_chart(ticker, period) # Create a formatted summary summary = f""" ### {ticker} Analysis **Current Price:** ${stock_data['current_price']} **52-Week High:** ${stock_data['52wk_high']} **Market Cap:** ${stock_data['market_cap']:,} **P/E Ratio:** {stock_data['pe_ratio']} **Dividend Yield:** {stock_data['dividend_yield'] * 100 if stock_data['dividend_yield'] != 'N/A' else 'N/A'}% **Beta:** {stock_data['beta']} **Avg Volume:** {stock_data['average_volume']:,} """ return chart, summary, ticker except Exception as e: return None, f"Error analyzing ticker {ticker}: {str(e)}", None # Gradio interface with gr.Blocks(css=custom_css, theme=gr.themes.Soft()) as demo: current_session_id = gr.State(None) pdf_state = gr.State({"page_images": [], "total_pages": 0, "total_words": 0}) current_ticker = gr.State(None) gr.HTML("""
Fin-Vision
Analyze financial documents with Groq's LLM API.
""") with gr.Row(elem_classes="container"): with gr.Column(scale=1, min_width=300): pdf_file = gr.File(label="Upload PDF Document", file_types=[".pdf"], type="binary") upload_button = gr.Button("Process PDF", variant="primary") pdf_status = gr.Markdown("No PDF uploaded yet") model_dropdown = gr.Dropdown( choices=["llama3-70b-8192", "llama3-8b-8192", "mixtral-8x7b-32768", "gemma-7b-it"], value="llama3-70b-8192", label="Select Groq Model" ) # Finance Tools Section gr.Markdown("### Financial Tools", elem_classes="tool-title") with gr.Group(elem_classes="tool-container"): with gr.Tabs(): with gr.TabItem("Stock Analysis"): ticker_input = gr.Textbox(label="Enter Ticker Symbol (e.g., AAPL)", placeholder="AAPL") period_dropdown = gr.Dropdown( choices=["1mo", "3mo", "6mo", "1y", "2y", "5y", "max"], value="1y", label="Time Period" ) analyze_button = gr.Button("Analyze Stock") with gr.Column(scale=2, min_width=600): with gr.Tabs(): with gr.TabItem("PDF Viewer"): with gr.Column(elem_classes="pdf-viewer-container"): page_slider = gr.Slider(minimum=1, maximum=1, step=1, label="Page Number", value=1) pdf_image = gr.Image(label="PDF Page", type="pil", elem_classes="pdf-viewer-image") stats_display = gr.Markdown("No PDF uploaded yet", elem_classes="stats-box") with gr.TabItem("Stock Analysis"): with gr.Column(elem_classes="pdf-viewer-container"): stock_chart = gr.Plot(label="Stock Price Chart", elem_classes="chart-container") stock_summary = gr.Markdown("Enter a ticker symbol to see analysis") with gr.Row(elem_classes="container"): with gr.Column(scale=2, min_width=600): chatbot = gr.Chatbot(height=500, bubble_full_width=False, show_copy_button=True, elem_classes="chat-container") with gr.Row(): msg = gr.Textbox(show_label=False, placeholder="Ask about your financial document or type $TICKER for stock info...", scale=5) send_btn = gr.Button("Send", scale=1) clear_btn = gr.Button("Clear Conversation") # Event Handlers upload_button.click( process_pdf, inputs=[pdf_file], outputs=[current_session_id, pdf_status, pdf_state] ).then( update_pdf_viewer, inputs=[pdf_state], outputs=[page_slider, pdf_image, stats_display] ) msg.submit( generate_response, inputs=[msg, current_session_id, model_dropdown, chatbot], outputs=[chatbot] ).then(lambda: "", None, [msg]) send_btn.click( generate_response, inputs=[msg, current_session_id, model_dropdown, chatbot], outputs=[chatbot] ).then(lambda: "", None, [msg]) clear_btn.click( lambda: ([], None, "No PDF uploaded yet", {"page_images": [], "total_pages": 0, "total_words": 0}, 0, None, "No PDF uploaded yet", None), None, [chatbot, current_session_id, pdf_status, pdf_state, page_slider, pdf_image, stats_display, current_ticker] ) page_slider.change( update_image, inputs=[page_slider, pdf_state], outputs=[pdf_image] ) # Stock analysis handler analyze_button.click( analyze_ticker, inputs=[ticker_input, period_dropdown], outputs=[stock_chart, stock_summary, current_ticker] ) # Add footer with attribution gr.HTML("""
Created by Calvin Allen Crawford
""") # Launch the app if __name__ == "__main__": demo.launch()