import os
import time
import uuid
from typing import List, Tuple, Optional, Dict, Union
import google.generativeai as genai
import gradio as gr
from PIL import Image
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.chains import LLMChain, SequentialChain
from textwrap import dedent
import google.generativeai as genai
import yfinance as yf
from pypfopt.discrete_allocation import DiscreteAllocation, get_latest_prices
from pypfopt import EfficientFrontier
from pypfopt import risk_models
from pypfopt import expected_returns
from pypfopt import plotting
import copy
import numpy as np
import pandas as pd
import plotly.express as px
import matplotlib.pyplot as plt
from datetime import datetime
import datetime
# Tool import
from crewai.tools.gemini_tools import GeminiSearchTools
from langchain.tools.yahoo_finance_news import YahooFinanceNewsTool
from crewai.tools.browser_tools import BrowserTools
from crewai.tools.sec_tools import SECTools
# Google Langchain
from langchain_google_genai import GoogleGenerativeAI
#Crew imports
from crewai import Agent, Task, Crew, Process
# Retrieve API Key from Environment Variable
GOOGLE_AI_STUDIO = os.environ.get('GOOGLE_API_KEY')
# Ensure the API key is available
if not GOOGLE_AI_STUDIO:
raise ValueError("API key not found. Please set the GOOGLE_AI_STUDIO2 environment variable.")
# LangChain function for company analysis
def company_analysis(api_key: str, company_name: str) -> dict:
os.environ['OPENAI_API_KEY'] = api_key # Set the OpenAI API key as an environment variable
llm = ChatOpenAI()
'''
# Identify the email's language
template1 = "Return the language this email is written in:\n{email}.\nONLY return the language it was written in."
prompt1 = ChatPromptTemplate.from_template(template1)
chain_1 = LLMChain(llm=llm, prompt=prompt1, output_key="language")
# Translate the email to English
template2 = "Translate this email from {language} to English. Here is the email:\n" + email
prompt2 = ChatPromptTemplate.from_template(template2)
chain_2 = LLMChain(llm=llm, prompt=prompt2, output_key="translated_email")
# Provide a summary in English
template3 = "Create a short summary of this email:\n{translated_email}"
prompt3 = ChatPromptTemplate.from_template(template3)
chain_3 = LLMChain(llm=llm, prompt=prompt3, output_key="summary")
# Provide a reply in English
template4 = "Reply to the sender of the email giving a plausible reply based on the {summary} and a promise to address issues"
prompt4 = ChatPromptTemplate.from_template(template4)
chain_4 = LLMChain(llm=llm, prompt=prompt4, output_key="reply")
# Provide a translation back to the original language
template5 = "Translate the {reply} back to the original {language} of the email."
prompt5 = ChatPromptTemplate.from_template(template5)
chain_5 = LLMChain(llm=llm, prompt=prompt5, output_key="translated_reply")
seq_chain = SequentialChain(chains=[chain_1, chain_2, chain_3, chain_4, chain_5],
input_variables=['email'],
output_variables=['language', 'translated_email', 'summary', 'reply', 'translated_reply'],
verbose=True)
'''
return seq_chain(email)
print("google-generativeai:", genai.__version__)
GOOGLE_API_KEY = os.environ.get("GOOGLE_API_KEY")
TITLE_INTRO = """
Introduction to Financial Manager
"""
TITLE1 = """Company Analysis
"""
TITLE2 = """Investment Strategy
"""
TITLE3 = """Profit Prophet
"""
SUBTITLE = """Strategy Agent built with Gemini Pro and Gemini Pro Vision API
"""
GETKEY = """
"""
AVATAR_IMAGES = (
None,
"https://media.roboflow.com/spaces/gemini-icon.png"
)
movie_script_analysis = ""
IMAGE_CACHE_DIRECTORY = "/tmp"
IMAGE_WIDTH = 512
CHAT_HISTORY = List[Tuple[Optional[Union[Tuple[str], str]], Optional[str]]]
def preprocess_stop_sequences(stop_sequences: str) -> Optional[List[str]]:
if not stop_sequences:
return None
return [sequence.strip() for sequence in stop_sequences.split(",")]
def preprocess_image(image: Image.Image) -> Optional[Image.Image]:
image_height = int(image.height * IMAGE_WIDTH / image.width)
return image.resize((IMAGE_WIDTH, image_height))
def cache_pil_image(image: Image.Image) -> str:
image_filename = f"{uuid.uuid4()}.jpeg"
os.makedirs(IMAGE_CACHE_DIRECTORY, exist_ok=True)
image_path = os.path.join(IMAGE_CACHE_DIRECTORY, image_filename)
image.save(image_path, "JPEG")
return image_path
def preprocess_chat_history(
history: CHAT_HISTORY
) -> List[Dict[str, Union[str, List[str]]]]:
messages = []
for user_message, model_message in history:
if isinstance(user_message, tuple):
pass
elif user_message is not None:
messages.append({'role': 'user', 'parts': [user_message]})
if model_message is not None:
messages.append({'role': 'model', 'parts': [model_message]})
return messages
def upload(files: Optional[List[str]], chatbot: CHAT_HISTORY) -> CHAT_HISTORY:
for file in files:
image = Image.open(file).convert('RGB')
image = preprocess_image(image)
image_path = cache_pil_image(image)
chatbot.append(((image_path,), None))
return chatbot
def user(text_prompt: str, chatbot: CHAT_HISTORY):
if text_prompt:
chatbot.append((text_prompt, None))
return "", chatbot
def bot(
google_key: str,
files: Optional[List[str]],
temperature: float,
max_output_tokens: int,
stop_sequences: str,
top_k: int,
top_p: float,
chatbot: CHAT_HISTORY
):
if len(chatbot) == 0:
return chatbot
google_key = google_key if google_key else GOOGLE_API_KEY
if not google_key:
raise ValueError(
"GOOGLE_API_KEY is not set. "
"Please follow the instructions in the README to set it up.")
genai.configure(api_key=google_key)
generation_config = genai.types.GenerationConfig(
temperature=temperature,
max_output_tokens=max_output_tokens,
stop_sequences=preprocess_stop_sequences(stop_sequences=stop_sequences),
top_k=top_k,
top_p=top_p)
if files:
text_prompt = [chatbot[-1][0]] \
if chatbot[-1][0] and isinstance(chatbot[-1][0], str) \
else []
image_prompt = [Image.open(file).convert('RGB') for file in files]
model = genai.GenerativeModel('gemini-pro-vision')
response = model.generate_content(
text_prompt + image_prompt,
stream=True,
generation_config=generation_config)
else:
messages = preprocess_chat_history(chatbot)
model = genai.GenerativeModel('gemini-pro')
response = model.generate_content(
messages,
stream=True,
generation_config=generation_config)
# streaming effect
chatbot[-1][1] = ""
for chunk in response:
for i in range(0, len(chunk.text), 10):
section = chunk.text[i:i + 10]
chatbot[-1][1] += section
time.sleep(0.01)
yield chatbot
google_key_component = gr.Textbox(
label="GOOGLE API KEY",
value="",
type="password",
placeholder="...",
info="You have to provide your own GOOGLE_API_KEY for this app to function properly",
visible=GOOGLE_API_KEY is None
)
chatbot_component = gr.Chatbot(
label='Gemini Pro Vision',
bubble_full_width=False,
avatar_images=AVATAR_IMAGES,
scale=2,
height=400
)
text_prompt_component = gr.Textbox(value=movie_script_analysis,
show_label=False, autofocus=True, scale=8, lines=8
)
upload_button_component = gr.UploadButton(
label="Upload Images", file_count="multiple", file_types=["image"], scale=1
)
run_button_component = gr.Button(value="Run", variant="primary", scale=1)
run_button_analysis = gr.Button(value="Run", variant="primary", scale=1)
temperature_component = gr.Slider(
minimum=0,
maximum=1.0,
value=0.4,
step=0.05,
label="Temperature",
info=(
"Temperature controls the degree of randomness in token selection. Lower "
"temperatures are good for prompts that expect a true or correct response, "
"while higher temperatures can lead to more diverse or unexpected results. "
))
max_output_tokens_component = gr.Slider(
minimum=1,
maximum=2048,
value=1024,
step=1,
label="Token limit",
info=(
"Token limit determines the maximum amount of text output from one prompt. A "
"token is approximately four characters. The default value is 2048."
))
stop_sequences_component = gr.Textbox(
label="Add stop sequence",
value="",
type="text",
placeholder="STOP, END",
info=(
"A stop sequence is a series of characters (including spaces) that stops "
"response generation if the model encounters it. The sequence is not included "
"as part of the response. You can add up to five stop sequences."
))
top_k_component = gr.Slider(
minimum=1,
maximum=40,
value=32,
step=1,
label="Top-K",
info=(
"Top-k changes how the model selects tokens for output. A top-k of 1 means the "
"selected token is the most probable among all tokens in the model’s "
"vocabulary (also called greedy decoding), while a top-k of 3 means that the "
"next token is selected from among the 3 most probable tokens (using "
"temperature)."
))
top_p_component = gr.Slider(
minimum=0,
maximum=1,
value=1,
step=0.01,
label="Top-P",
info=(
"Top-p changes how the model selects tokens for output. Tokens are selected "
"from most probable to least until the sum of their probabilities equals the "
"top-p value. For example, if tokens A, B, and C have a probability of .3, .2, "
"and .1 and the top-p value is .5, then the model will select either A or B as "
"the next token (using temperature). "
))
user_inputs = [
text_prompt_component,
chatbot_component
]
bot_inputs = [
google_key_component,
upload_button_component,
temperature_component,
max_output_tokens_component,
stop_sequences_component,
top_k_component,
top_p_component,
chatbot_component
]
# Gmix ++++++++++++++++++++++++++++++++++++++++++++++++
#Crew imports
from crewai import Agent, Task, Crew, Process
run_button_crewai = gr.Button(value="Run", variant="primary", scale=1)
research_topic_component = gr.Textbox(lines=2, placeholder="Enter Research Topic Here...")
# Set gemini_llm
gemini_llm = GoogleGenerativeAI(model="gemini-pro", google_api_key=GOOGLE_AI_STUDIO)
def crewai_process(research_topic):
# Define your agents with roles and goals
researcher = Agent(
role='Senior Research Analyst',
goal=f'Uncover cutting-edge developments in {research_topic}',
backstory="""You are a Senior Research Analyst at a leading think tank.
Your expertise lies in identifying emerging trends. You have a knack for dissecting complex data and presenting
actionable insights.""",
verbose=True,
allow_delegation=False,
llm = gemini_llm,
tools=[
GeminiSearchTools.gemini_search
]
)
writer = Agent(
role='Tech Content Strategist',
goal='Craft compelling content on tech advancements',
backstory="""You are a renowned Tech Social Media Content Writer and Strategist, known for your insightful
and engaging articles on technology and innovation. With a deep understanding of
the tech industry and how people are impacted by it, you transform complex concepts into compelling narratives.""",
verbose=True,
allow_delegation=True,
llm = gemini_llm
# Add tools and other optional parameters as needed
)
# Create tasks for your agents
task1 = Task(
description=f"""Conduct a comprehensive analysis of the latest advancements in {research_topic}.
Compile your findings in a detailed report. Your final answer MUST be a full analysis report""",
agent=researcher
)
task2 = Task(
description="""Using the insights from the researcher's report, develop an engaging blog
post that highlights the most significant advancements.
Your post should be informative yet accessible, catering to a tech-savvy audience.
Aim for a narrative that captures the essence of these breakthroughs and their
implications for the future. Your final answer MUST be the full blog post of at least 3 paragraphs.""",
agent=writer
)
# Instantiate your crew with a sequential process
crew = Crew(
agents=[researcher, writer],
tasks=[task1, task2],
verbose=2,
process=Process.sequential
)
# Get your crew to work!
result = crew.kickoff()
return result
# Portfolio Analysis +++++++++++++++++++++++++++++++++++
def plot_cum_returns(data, title):
daily_cum_returns = 1 + data.dropna().pct_change()
daily_cum_returns = daily_cum_returns.cumprod()*100
fig = px.line(daily_cum_returns, title=title)
return fig
def plot_efficient_frontier_and_max_sharpe(mu, S):
# Optimize portfolio for max Sharpe ratio and plot it out with efficient frontier curve
ef = EfficientFrontier(mu, S)
fig, ax = plt.subplots(figsize=(6,4))
ef_max_sharpe = copy.deepcopy(ef)
plotting.plot_efficient_frontier(ef, ax=ax, show_assets=False)
# Find the max sharpe portfolio
ef_max_sharpe.max_sharpe(risk_free_rate=0.02)
ret_tangent, std_tangent, _ = ef_max_sharpe.portfolio_performance()
ax.scatter(std_tangent, ret_tangent, marker="*", s=100, c="r", label="Max Sharpe")
# Generate random portfolios with random weights
n_samples = 1000
w = np.random.dirichlet(np.ones(ef.n_assets), n_samples)
rets = w.dot(ef.expected_returns)
stds = np.sqrt(np.diag(w @ ef.cov_matrix @ w.T))
sharpes = rets / stds
ax.scatter(stds, rets, marker=".", c=sharpes, cmap="viridis_r")
# Output
ax.legend()
return fig
def output_results(start_date, end_date, tickers_string):
tickers = tickers_string.split(',')
# Get Stock Prices
stocks_df = yf.download(tickers, start=start_date, end=end_date)['Adj Close']
# Plot Individual Stock Prices
fig_indiv_prices = px.line(stocks_df, title='Price of Individual Stocks')
# Plot Individual Cumulative Returns
fig_cum_returns = plot_cum_returns(stocks_df, 'Cumulative Returns of Individual Stocks Starting with $100')
# Calculatge and Plot Correlation Matrix between Stocks
corr_df = stocks_df.corr().round(2)
fig_corr = px.imshow(corr_df, text_auto=True, title = 'Correlation between Stocks')
# Calculate expected returns and sample covariance matrix for portfolio optimization later
mu = expected_returns.mean_historical_return(stocks_df)
S = risk_models.sample_cov(stocks_df)
# Plot efficient frontier curve
fig_efficient_frontier = plot_efficient_frontier_and_max_sharpe(mu, S)
# Get optimized weights
ef = EfficientFrontier(mu, S)
ef.max_sharpe(risk_free_rate=0.04)
weights = ef.clean_weights()
expected_annual_return, annual_volatility, sharpe_ratio = ef.portfolio_performance()
expected_annual_return, annual_volatility, sharpe_ratio = '{}%'.format((expected_annual_return*100).round(2)), \
'{}%'.format((annual_volatility*100).round(2)), \
'{}%'.format((sharpe_ratio*100).round(2))
weights_df = pd.DataFrame.from_dict(weights, orient = 'index')
weights_df = weights_df.reset_index()
weights_df.columns = ['Tickers', 'Weights']
# Calculate returns of portfolio with optimized weights
stocks_df['Optimized Portfolio'] = 0
for ticker, weight in weights.items():
stocks_df['Optimized Portfolio'] += stocks_df[ticker]*weight
# Plot Cumulative Returns of Optimized Portfolio
fig_cum_returns_optimized = plot_cum_returns(stocks_df['Optimized Portfolio'], 'Cumulative Returns of Optimized Portfolio Starting with $100')
return fig_cum_returns_optimized, weights_df, fig_efficient_frontier, fig_corr, \
expected_annual_return, annual_volatility, sharpe_ratio, fig_indiv_prices, fig_cum_returns
# Interface =============================================
with gr.Blocks() as demo:
with gr.Tab("Introduction"):
gr.HTML(TITLE_INTRO)
with gr.Tab("Your Portfolio"):
run_button_crewai.click(
fn=crewai_process,
inputs=research_topic_component,
outputs=gr.Textbox(label="Portfolio Analysis"),
queue=False
)
with gr.Tab("Portfolio Analysis"):
with gr.Blocks() as app:
with gr.Row():
gr.HTML("Bohmian's Stock Portfolio Optimizer
")
with gr.Row():
start_date = gr.Textbox("2013-01-01", label="Start Date")
end_date = gr.Textbox(datetime.datetime.now().date(), label="End Date")
with gr.Row():
tickers_string = gr.Textbox("MA,META,V,AMZN,JPM,BA",
label='Enter all stock tickers to be included in portfolio separated \
by commas WITHOUT spaces, e.g. "MA,META,V,AMZN,JPM,BA"')
btn = gr.Button("Get Optimized Portfolio")
with gr.Row():
gr.HTML("Optimizied Portfolio Metrics
")
with gr.Row():
expected_annual_return = gr.Text(label="Expected Annual Return")
annual_volatility = gr.Text(label="Annual Volatility")
sharpe_ratio = gr.Text(label="Sharpe Ratio")
with gr.Row():
fig_cum_returns_optimized = gr.Plot(label="Cumulative Returns of Optimized Portfolio (Starting Price of $100)")
weights_df = gr.DataFrame(label="Optimized Weights of Each Ticker")
with gr.Row():
fig_efficient_frontier = gr.Plot(label="Efficient Frontier")
fig_corr = gr.Plot(label="Correlation between Stocks")
with gr.Row():
fig_indiv_prices = gr.Plot(label="Price of Individual Stocks")
fig_cum_returns = gr.Plot(label="Cumulative Returns of Individual Stocks Starting with $100")
btn.click(fn=output_results, inputs=[start_date, end_date, tickers_string],
outputs=[fig_cum_returns_optimized, weights_df, fig_efficient_frontier, fig_corr, \
expected_annual_return, annual_volatility, sharpe_ratio, fig_indiv_prices, fig_cum_returns])
with gr.Tab("Company Analysis"):
gr.HTML(TITLE1)
run_button_analysis.click(
fn=company_analysis,
inputs=[
gr.Textbox(label="Enter your OpenAI API Key:", type="password"),
gr.Textbox(label="Enter the Company Name:")
],
outputs=[
gr.Textbox(label="Language"),
gr.Textbox(label="Summary"),
gr.Textbox(label="Translated Email"),
gr.Textbox(label="Reply in English"),
gr.Textbox(label="Reply in Original Language")
]
)
with gr.Tab("Investment Strategy"):
gr.HTML(TITLE2)
gr.HTML(SUBTITLE)
gr.HTML(GETKEY)
with gr.Column():
google_key_component.render()
chatbot_component.render()
with gr.Row():
text_prompt_component.render()
upload_button_component.render()
run_button_component.render()
with gr.Accordion("Parameters", open=False):
temperature_component.render()
max_output_tokens_component.render()
stop_sequences_component.render()
with gr.Accordion("Advanced", open=False):
top_k_component.render()
top_p_component.render()
run_button_component.click(
fn=user,
inputs=user_inputs,
outputs=[text_prompt_component, chatbot_component],
queue=False
).then(
fn=bot, inputs=bot_inputs, outputs=[chatbot_component],
)
text_prompt_component.submit(
fn=user,
inputs=user_inputs,
outputs=[text_prompt_component, chatbot_component],
queue=False
).then(
fn=bot, inputs=bot_inputs, outputs=[chatbot_component],
)
upload_button_component.upload(
fn=upload,
inputs=[upload_button_component, chatbot_component],
outputs=[chatbot_component],
queue=False
)
with gr.Tab("Profit Prophet"):
gr.HTML(TITLE3)
with gr.Row():
with gr.Column(scale=1):
gr.Image(value = "resources/holder.png")
with gr.Column(scale=1):
gr.Image(value = "resources/holder.png")
demo.queue(max_size=99).launch(debug=False, show_error=True)