eaglelandsonce's picture
Update app.py
e5dc61f
raw
history blame
21.9 kB
import os
import time
import uuid
from typing import List, Tuple, Optional, Dict, Union
import google.generativeai as genai
import gradio as gr
from PIL import Image
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain, SequentialChain
from textwrap import dedent
import google.generativeai as genai
import yfinance as yf
from pypfopt.discrete_allocation import DiscreteAllocation, get_latest_prices
from pypfopt import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from pypfopt import plotting
import copy
import numpy as np
import pandas as pd
import plotly.express as px
import matplotlib.pyplot as plt
from datetime import datetime
import datetime
# Tool import
from crewai.tools.gemini_tools import GeminiSearchTools
from langchain.tools.yahoo_finance_news import YahooFinanceNewsTool
from crewai.tools.browser_tools import BrowserTools
from crewai.tools.sec_tools import SECTools
# Google Langchain
from langchain_google_genai import GoogleGenerativeAI
#Crew imports
from crewai import Agent, Task, Crew, Process
# Retrieve API Key from Environment Variable
GOOGLE_AI_STUDIO = os.environ.get('GOOGLE_API_KEY')
# Ensure the API key is available
if not GOOGLE_AI_STUDIO:
raise ValueError("API key not found. Please set the GOOGLE_AI_STUDIO2 environment variable.")
# LangChain function for company analysis
def company_analysis(api_key: str, company_name: str) -> dict:
os.environ['OPENAI_API_KEY'] = api_key # Set the OpenAI API key as an environment variable
llm = ChatOpenAI()
'''
# Identify the email's language
template1 = "Return the language this email is written in:\n{email}.\nONLY return the language it was written in."
prompt1 = ChatPromptTemplate.from_template(template1)
chain_1 = LLMChain(llm=llm, prompt=prompt1, output_key="language")
# Translate the email to English
template2 = "Translate this email from {language} to English. Here is the email:\n" + email
prompt2 = ChatPromptTemplate.from_template(template2)
chain_2 = LLMChain(llm=llm, prompt=prompt2, output_key="translated_email")
# Provide a summary in English
template3 = "Create a short summary of this email:\n{translated_email}"
prompt3 = ChatPromptTemplate.from_template(template3)
chain_3 = LLMChain(llm=llm, prompt=prompt3, output_key="summary")
# Provide a reply in English
template4 = "Reply to the sender of the email giving a plausible reply based on the {summary} and a promise to address issues"
prompt4 = ChatPromptTemplate.from_template(template4)
chain_4 = LLMChain(llm=llm, prompt=prompt4, output_key="reply")
# Provide a translation back to the original language
template5 = "Translate the {reply} back to the original {language} of the email."
prompt5 = ChatPromptTemplate.from_template(template5)
chain_5 = LLMChain(llm=llm, prompt=prompt5, output_key="translated_reply")
seq_chain = SequentialChain(chains=[chain_1, chain_2, chain_3, chain_4, chain_5],
input_variables=['email'],
output_variables=['language', 'translated_email', 'summary', 'reply', 'translated_reply'],
verbose=True)
'''
return seq_chain(email)
print("google-generativeai:", genai.__version__)
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
TITLE_INTRO = """<h1 align="center">Introduction to Financial Manager</h1>"""
TITLE1 = """<h1 align="center">Company Analysis</h1>"""
TITLE2 = """<h1 align="center">Investment Strategy</h1>"""
TITLE3 = """<h1 align="center">Profit Prophet</h1>"""
SUBTITLE = """<h2 align="center">Strategy Agent built with Gemini Pro and Gemini Pro Vision API</h2>"""
GETKEY = """
<div style="text-align: center; display: flex; justify-content: center; align-items: center;">
<span>Get an API key
<a href="https://makersuite.google.com/app/apikey">GOOGLE API KEY</a>.
</span>
</div>
"""
AVATAR_IMAGES = (
None,
"https://media.roboflow.com/spaces/gemini-icon.png"
)
movie_script_analysis = ""
IMAGE_CACHE_DIRECTORY = "/tmp"
IMAGE_WIDTH = 512
CHAT_HISTORY = List[Tuple[Optional[Union[Tuple[str], str]], Optional[str]]]
def preprocess_stop_sequences(stop_sequences: str) -> Optional[List[str]]:
if not stop_sequences:
return None
return [sequence.strip() for sequence in stop_sequences.split(",")]
def preprocess_image(image: Image.Image) -> Optional[Image.Image]:
image_height = int(image.height * IMAGE_WIDTH / image.width)
return image.resize((IMAGE_WIDTH, image_height))
def cache_pil_image(image: Image.Image) -> str:
image_filename = f"{uuid.uuid4()}.jpeg"
os.makedirs(IMAGE_CACHE_DIRECTORY, exist_ok=True)
image_path = os.path.join(IMAGE_CACHE_DIRECTORY, image_filename)
image.save(image_path, "JPEG")
return image_path
def preprocess_chat_history(
history: CHAT_HISTORY
) -> List[Dict[str, Union[str, List[str]]]]:
messages = []
for user_message, model_message in history:
if isinstance(user_message, tuple):
pass
elif user_message is not None:
messages.append({'role': 'user', 'parts': [user_message]})
if model_message is not None:
messages.append({'role': 'model', 'parts': [model_message]})
return messages
def upload(files: Optional[List[str]], chatbot: CHAT_HISTORY) -> CHAT_HISTORY:
for file in files:
image = Image.open(file).convert('RGB')
image = preprocess_image(image)
image_path = cache_pil_image(image)
chatbot.append(((image_path,), None))
return chatbot
def user(text_prompt: str, chatbot: CHAT_HISTORY):
if text_prompt:
chatbot.append((text_prompt, None))
return "", chatbot
def bot(
google_key: str,
files: Optional[List[str]],
temperature: float,
max_output_tokens: int,
stop_sequences: str,
top_k: int,
top_p: float,
chatbot: CHAT_HISTORY
):
if len(chatbot) == 0:
return chatbot
google_key = google_key if google_key else GOOGLE_API_KEY
if not google_key:
raise ValueError(
"GOOGLE_API_KEY is not set. "
"Please follow the instructions in the README to set it up.")
genai.configure(api_key=google_key)
generation_config = genai.types.GenerationConfig(
temperature=temperature,
max_output_tokens=max_output_tokens,
stop_sequences=preprocess_stop_sequences(stop_sequences=stop_sequences),
top_k=top_k,
top_p=top_p)
if files:
text_prompt = [chatbot[-1][0]] \
if chatbot[-1][0] and isinstance(chatbot[-1][0], str) \
else []
image_prompt = [Image.open(file).convert('RGB') for file in files]
model = genai.GenerativeModel('gemini-pro-vision')
response = model.generate_content(
text_prompt + image_prompt,
stream=True,
generation_config=generation_config)
else:
messages = preprocess_chat_history(chatbot)
model = genai.GenerativeModel('gemini-pro')
response = model.generate_content(
messages,
stream=True,
generation_config=generation_config)
# streaming effect
chatbot[-1][1] = ""
for chunk in response:
for i in range(0, len(chunk.text), 10):
section = chunk.text[i:i + 10]
chatbot[-1][1] += section
time.sleep(0.01)
yield chatbot
google_key_component = gr.Textbox(
label="GOOGLE API KEY",
value="",
type="password",
placeholder="...",
info="You have to provide your own GOOGLE_API_KEY for this app to function properly",
visible=GOOGLE_API_KEY is None
)
chatbot_component = gr.Chatbot(
label='Gemini Pro Vision',
bubble_full_width=False,
avatar_images=AVATAR_IMAGES,
scale=2,
height=400
)
text_prompt_component = gr.Textbox(value=movie_script_analysis,
show_label=False, autofocus=True, scale=8, lines=8
)
upload_button_component = gr.UploadButton(
label="Upload Images", file_count="multiple", file_types=["image"], scale=1
)
run_button_component = gr.Button(value="Run", variant="primary", scale=1)
run_button_analysis = gr.Button(value="Run", variant="primary", scale=1)
temperature_component = gr.Slider(
minimum=0,
maximum=1.0,
value=0.4,
step=0.05,
label="Temperature",
info=(
"Temperature controls the degree of randomness in token selection. Lower "
"temperatures are good for prompts that expect a true or correct response, "
"while higher temperatures can lead to more diverse or unexpected results. "
))
max_output_tokens_component = gr.Slider(
minimum=1,
maximum=2048,
value=1024,
step=1,
label="Token limit",
info=(
"Token limit determines the maximum amount of text output from one prompt. A "
"token is approximately four characters. The default value is 2048."
))
stop_sequences_component = gr.Textbox(
label="Add stop sequence",
value="",
type="text",
placeholder="STOP, END",
info=(
"A stop sequence is a series of characters (including spaces) that stops "
"response generation if the model encounters it. The sequence is not included "
"as part of the response. You can add up to five stop sequences."
))
top_k_component = gr.Slider(
minimum=1,
maximum=40,
value=32,
step=1,
label="Top-K",
info=(
"Top-k changes how the model selects tokens for output. A top-k of 1 means the "
"selected token is the most probable among all tokens in the model’s "
"vocabulary (also called greedy decoding), while a top-k of 3 means that the "
"next token is selected from among the 3 most probable tokens (using "
"temperature)."
))
top_p_component = gr.Slider(
minimum=0,
maximum=1,
value=1,
step=0.01,
label="Top-P",
info=(
"Top-p changes how the model selects tokens for output. Tokens are selected "
"from most probable to least until the sum of their probabilities equals the "
"top-p value. For example, if tokens A, B, and C have a probability of .3, .2, "
"and .1 and the top-p value is .5, then the model will select either A or B as "
"the next token (using temperature). "
))
user_inputs = [
text_prompt_component,
chatbot_component
]
bot_inputs = [
google_key_component,
upload_button_component,
temperature_component,
max_output_tokens_component,
stop_sequences_component,
top_k_component,
top_p_component,
chatbot_component
]
# Gmix ++++++++++++++++++++++++++++++++++++++++++++++++
#Crew imports
from crewai import Agent, Task, Crew, Process
run_button_crewai = gr.Button(value="Run", variant="primary", scale=1)
research_topic_component = gr.Textbox(lines=2, placeholder="Enter Research Topic Here...")
# Set gemini_llm
gemini_llm = GoogleGenerativeAI(model="gemini-pro", google_api_key=GOOGLE_AI_STUDIO)
def crewai_process(research_topic):
# Define your agents with roles and goals
researcher = Agent(
role='Senior Research Analyst',
goal=f'Uncover cutting-edge developments in {research_topic}',
backstory="""You are a Senior Research Analyst at a leading think tank.
Your expertise lies in identifying emerging trends. You have a knack for dissecting complex data and presenting
actionable insights.""",
verbose=True,
allow_delegation=False,
llm = gemini_llm,
tools=[
GeminiSearchTools.gemini_search
]
)
writer = Agent(
role='Tech Content Strategist',
goal='Craft compelling content on tech advancements',
backstory="""You are a renowned Tech Social Media Content Writer and Strategist, known for your insightful
and engaging articles on technology and innovation. With a deep understanding of
the tech industry and how people are impacted by it, you transform complex concepts into compelling narratives.""",
verbose=True,
allow_delegation=True,
llm = gemini_llm
# Add tools and other optional parameters as needed
)
# Create tasks for your agents
task1 = Task(
description=f"""Conduct a comprehensive analysis of the latest advancements in {research_topic}.
Compile your findings in a detailed report. Your final answer MUST be a full analysis report""",
agent=researcher
)
task2 = Task(
description="""Using the insights from the researcher's report, develop an engaging blog
post that highlights the most significant advancements.
Your post should be informative yet accessible, catering to a tech-savvy audience.
Aim for a narrative that captures the essence of these breakthroughs and their
implications for the future. Your final answer MUST be the full blog post of at least 3 paragraphs.""",
agent=writer
)
# Instantiate your crew with a sequential process
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
verbose=2,
process=Process.sequential
)
# Get your crew to work!
result = crew.kickoff()
return result
# Portfolio Analysis +++++++++++++++++++++++++++++++++++
def plot_cum_returns(data, title):
daily_cum_returns = 1 + data.dropna().pct_change()
daily_cum_returns = daily_cum_returns.cumprod()*100
fig = px.line(daily_cum_returns, title=title)
return fig
def plot_efficient_frontier_and_max_sharpe(mu, S):
# Optimize portfolio for max Sharpe ratio and plot it out with efficient frontier curve
ef = EfficientFrontier(mu, S)
fig, ax = plt.subplots(figsize=(6,4))
ef_max_sharpe = copy.deepcopy(ef)
plotting.plot_efficient_frontier(ef, ax=ax, show_assets=False)
# Find the max sharpe portfolio
ef_max_sharpe.max_sharpe(risk_free_rate=0.02)
ret_tangent, std_tangent, _ = ef_max_sharpe.portfolio_performance()
ax.scatter(std_tangent, ret_tangent, marker="*", s=100, c="r", label="Max Sharpe")
# Generate random portfolios with random weights
n_samples = 1000
w = np.random.dirichlet(np.ones(ef.n_assets), n_samples)
rets = w.dot(ef.expected_returns)
stds = np.sqrt(np.diag(w @ ef.cov_matrix @ w.T))
sharpes = rets / stds
ax.scatter(stds, rets, marker=".", c=sharpes, cmap="viridis_r")
# Output
ax.legend()
return fig
def output_results(start_date, end_date, tickers_string):
tickers = tickers_string.split(',')
# Get Stock Prices
stocks_df = yf.download(tickers, start=start_date, end=end_date)['Adj Close']
# Plot Individual Stock Prices
fig_indiv_prices = px.line(stocks_df, title='Price of Individual Stocks')
# Plot Individual Cumulative Returns
fig_cum_returns = plot_cum_returns(stocks_df, 'Cumulative Returns of Individual Stocks Starting with $100')
# Calculatge and Plot Correlation Matrix between Stocks
corr_df = stocks_df.corr().round(2)
fig_corr = px.imshow(corr_df, text_auto=True, title = 'Correlation between Stocks')
# Calculate expected returns and sample covariance matrix for portfolio optimization later
mu = expected_returns.mean_historical_return(stocks_df)
S = risk_models.sample_cov(stocks_df)
# Plot efficient frontier curve
fig_efficient_frontier = plot_efficient_frontier_and_max_sharpe(mu, S)
# Get optimized weights
ef = EfficientFrontier(mu, S)
ef.max_sharpe(risk_free_rate=0.04)
weights = ef.clean_weights()
expected_annual_return, annual_volatility, sharpe_ratio = ef.portfolio_performance()
expected_annual_return, annual_volatility, sharpe_ratio = '{}%'.format((expected_annual_return*100).round(2)), \
'{}%'.format((annual_volatility*100).round(2)), \
'{}%'.format((sharpe_ratio*100).round(2))
weights_df = pd.DataFrame.from_dict(weights, orient = 'index')
weights_df = weights_df.reset_index()
weights_df.columns = ['Tickers', 'Weights']
# Calculate returns of portfolio with optimized weights
stocks_df['Optimized Portfolio'] = 0
for ticker, weight in weights.items():
stocks_df['Optimized Portfolio'] += stocks_df[ticker]*weight
# Plot Cumulative Returns of Optimized Portfolio
fig_cum_returns_optimized = plot_cum_returns(stocks_df['Optimized Portfolio'], 'Cumulative Returns of Optimized Portfolio Starting with $100')
return fig_cum_returns_optimized, weights_df, fig_efficient_frontier, fig_corr, \
expected_annual_return, annual_volatility, sharpe_ratio, fig_indiv_prices, fig_cum_returns
# Interface =============================================
with gr.Blocks() as demo:
with gr.Tab("Introduction"):
gr.HTML(TITLE_INTRO)
with gr.Tab("Your Portfolio"):
run_button_crewai.click(
fn=crewai_process,
inputs=research_topic_component,
outputs=gr.Textbox(label="Stock Analysis and Stocks")
queue=False
)
with gr.Tab("Portfolio Analysis"):
with gr.Blocks() as app:
with gr.Row():
gr.HTML("<h1>Bohmian's Stock Portfolio Optimizer</h1>")
with gr.Row():
start_date = gr.Textbox("2013-01-01", label="Start Date")
end_date = gr.Textbox(datetime.datetime.now().date(), label="End Date")
with gr.Row():
tickers_string = gr.Textbox("MA,META,V,AMZN,JPM,BA",
label='Enter all stock tickers to be included in portfolio separated \
by commas WITHOUT spaces, e.g. "MA,META,V,AMZN,JPM,BA"')
btn = gr.Button("Get Optimized Portfolio")
with gr.Row():
gr.HTML("<h3>Optimizied Portfolio Metrics</h3>")
with gr.Row():
expected_annual_return = gr.Text(label="Expected Annual Return")
annual_volatility = gr.Text(label="Annual Volatility")
sharpe_ratio = gr.Text(label="Sharpe Ratio")
with gr.Row():
fig_cum_returns_optimized = gr.Plot(label="Cumulative Returns of Optimized Portfolio (Starting Price of $100)")
weights_df = gr.DataFrame(label="Optimized Weights of Each Ticker")
with gr.Row():
fig_efficient_frontier = gr.Plot(label="Efficient Frontier")
fig_corr = gr.Plot(label="Correlation between Stocks")
with gr.Row():
fig_indiv_prices = gr.Plot(label="Price of Individual Stocks")
fig_cum_returns = gr.Plot(label="Cumulative Returns of Individual Stocks Starting with $100")
btn.click(fn=output_results, inputs=[start_date, end_date, tickers_string],
outputs=[fig_cum_returns_optimized, weights_df, fig_efficient_frontier, fig_corr, \
expected_annual_return, annual_volatility, sharpe_ratio, fig_indiv_prices, fig_cum_returns])
with gr.Tab("Company Analysis"):
gr.HTML(TITLE1)
run_button_analysis.click(
fn=company_analysis,
inputs=[
gr.Textbox(label="Enter your OpenAI API Key:", type="password"),
gr.Textbox(label="Enter the Company Name:")
],
outputs=[
gr.Textbox(label="Language"),
gr.Textbox(label="Summary"),
gr.Textbox(label="Translated Email"),
gr.Textbox(label="Reply in English"),
gr.Textbox(label="Reply in Original Language")
]
)
with gr.Tab("Investment Strategy"):
gr.HTML(TITLE2)
gr.HTML(SUBTITLE)
gr.HTML(GETKEY)
with gr.Column():
google_key_component.render()
chatbot_component.render()
with gr.Row():
text_prompt_component.render()
upload_button_component.render()
run_button_component.render()
with gr.Accordion("Parameters", open=False):
temperature_component.render()
max_output_tokens_component.render()
stop_sequences_component.render()
with gr.Accordion("Advanced", open=False):
top_k_component.render()
top_p_component.render()
run_button_component.click(
fn=user,
inputs=user_inputs,
outputs=[text_prompt_component, chatbot_component],
queue=False
).then(
fn=bot, inputs=bot_inputs, outputs=[chatbot_component],
)
text_prompt_component.submit(
fn=user,
inputs=user_inputs,
outputs=[text_prompt_component, chatbot_component],
queue=False
).then(
fn=bot, inputs=bot_inputs, outputs=[chatbot_component],
)
upload_button_component.upload(
fn=upload,
inputs=[upload_button_component, chatbot_component],
outputs=[chatbot_component],
queue=False
)
with gr.Tab("Profit Prophet"):
gr.HTML(TITLE3)
with gr.Row():
with gr.Column(scale=1):
gr.Image(value = "resources/holder.png")
with gr.Column(scale=1):
gr.Image(value = "resources/holder.png")
demo.queue(max_size=99).launch(debug=False, show_error=True)