Spaces:
Sleeping
Sleeping
# os.system("pip install langchain-openai") | |
from langchain_openai import AzureChatOpenAI | |
import os | |
import pdfplumber | |
from langchain.chains.mapreduce import MapReduceChain | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain_community.document_loaders import UnstructuredFileLoader | |
from langchain.prompts import PromptTemplate | |
import logging | |
import json | |
from typing import List | |
import mimetypes | |
import validators | |
import requests | |
import tempfile | |
from langchain.chains import create_extraction_chain | |
from GoogleNews import GoogleNews | |
import pandas as pd | |
import requests | |
import gradio as gr | |
import re | |
from langchain_community.document_loaders import WebBaseLoader | |
from langchain.chains.combine_documents.stuff import StuffDocumentsChain | |
from transformers import pipeline | |
import plotly.express as px | |
from langchain_community.document_loaders import CSVLoader | |
from langchain_community.chat_models import ChatOpenAI | |
from langchain.chains.llm import LLMChain | |
import yfinance as yf | |
import pandas as pd | |
import nltk | |
from nltk.tokenize import sent_tokenize | |
from openai import AzureOpenAI | |
from langchain.prompts import PromptTemplate | |
from langchain.chains import load_summarize_chain | |
from langchain.chat_models import AzureChatOpenAI | |
class KeyValueExtractor: | |
def __init__(self): | |
""" | |
Initialize the ContractSummarizer object. | |
Parameters: | |
pdf_file_path (str): The path to the input PDF file. | |
""" | |
self.model = "facebook/bart-large-mnli" | |
self.client = AzureOpenAI(api_key=os.getenv("AZURE_OPENAI_KEY"), | |
api_version="2024-02-01", | |
azure_endpoint = os.getenv("AZURE_OPENAI_ENDPOINT") | |
) | |
def get_url(self,keyword): | |
return f"https://finance.yahoo.com/quote/{keyword}?p={keyword}" | |
def get_each_link_summary(self,url): | |
loader = WebBaseLoader(url) | |
docs = loader.load() | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=3000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Prepare the prompt template for summarization | |
prompt_template = """The give text is Finance Stock Details for one company i want to get values for | |
Previous Close : [value] | |
Open : [value] | |
Bid : [value] | |
Ask : [value] | |
Day's Range : [value] | |
52 Week Range : [value] | |
Volume : [value] | |
Avg. Volume : [value] | |
Market Cap : [value] | |
Beta (5Y Monthly) : [value] | |
PE Ratio (TTM) : [value] | |
EPS (TTM) : [value] | |
Earnings Date : [value] | |
Forward Dividend & Yield : [value] | |
Ex-Dividend Date : [value] | |
1y Target Est : [value] | |
these details form that and Write a abractive summary about those details: | |
Given Text: {text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Prepare the template for refining the summary with additional context | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary" | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary" | |
"If the context isn't useful, return the original summary." | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Load the summarization chain using the ChatOpenAI language model | |
chain = load_summarize_chain( | |
llm = AzureChatOpenAI(azure_deployment = "GPT-4o"), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Generate the refined summary using the loaded summarization chain | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
print(result["output_text"]) | |
return result["output_text"] | |
def one_day_summary(self,content) -> None: | |
conversation = [ | |
{"role": "system", "content": "You are a helpful assistant."}, | |
{"role": "user", "content": f"i want detailed Summary from given finance details. i want information like what happen today comparing last day good or bad Bullish or Bearish like these details i want summary. content in backticks.```{content}```."} | |
] | |
# Call OpenAI GPT-3.5-turbo | |
chat_completion = self.client.chat.completions.create( | |
model = "GPT-4o", | |
messages = conversation, | |
max_tokens=1000, | |
temperature=0 | |
) | |
response = chat_completion.choices[0].message.content | |
return response | |
# # Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
# response = openai.Completion.create( | |
# engine="text-davinci-003", # You can choose a different engine as well | |
# temperature = 0, | |
# prompt=f"i want detailed Summary from given finance details. i want information like what happen today comparing last day good or bad Bullish or Bearish like these details i want summary. content in backticks.```{content}```.", | |
# max_tokens=1000 # You can adjust the length of the response | |
# ) | |
# # Extract and return the chatbot's reply | |
# result = response['choices'][0]['text'].strip() | |
# print(result) | |
# return result | |
def extract_key_value_pair(self,content) -> None: | |
""" | |
Extract key-value pairs from the refined summary. | |
Prints the extracted key-value pairs. | |
""" | |
try: | |
conversation = [ | |
{"role": "system", "content": "You are a helpful assistant."}, | |
{"role": "user", "content": f"Get maximum count meaningfull key value pairs. content in backticks.```{content}```."} | |
] | |
# Call OpenAI GPT-3.5-turbo | |
chat_completion = self.client.chat.completions.create( | |
model = "GPT-4o", | |
messages = conversation, | |
max_tokens=1000, | |
temperature=0 | |
) | |
response = chat_completion.choices[0].message.content | |
return response | |
except Exception as e: | |
# If an error occurs during the key-value extraction process, log the error | |
logging.error(f"Error while extracting key-value pairs: {e}") | |
print("Error:", e) | |
def analyze_sentiment_for_graph(self, text): | |
pipe = pipeline("zero-shot-classification", model=self.model) | |
labels=["Positive", "Negative", "Neutral"] | |
result = pipe(text, labels) | |
sentiment_scores = { | |
result['labels'][0]: result['scores'][0], | |
result['labels'][1]: result['scores'][1], | |
result['labels'][2]: result['scores'][2] | |
} | |
return sentiment_scores | |
def display_graph(self,text): | |
sentiment_scores = self.analyze_sentiment_for_graph(text) | |
labels = sentiment_scores.keys() | |
scores = sentiment_scores.values() | |
fig = px.bar(x=scores, y=labels, orientation='h', color=labels, color_discrete_map={"Negative": "red", "Positive": "green", "Neutral": "gray"}) | |
fig.update_traces(texttemplate='%{x:.2f}%', textposition='outside') | |
fig.update_layout(title="Sentiment Analysis",width=800) | |
formatted_pairs = [] | |
for key, value in sentiment_scores.items(): | |
formatted_value = round(value, 2) # Round the value to two decimal places | |
formatted_pairs.append(f"{key} : {formatted_value}") | |
result_string = '\t'.join(formatted_pairs) | |
return fig | |
def get_finance_data(self,symbol): | |
# Define the stock symbol and date range | |
start_date = '2022-08-19' | |
end_date = '2023-08-19' | |
# Fetch historical OHLC data using yfinance | |
data = yf.download(symbol, start=start_date, end=end_date) | |
# Select only the OHLC columns | |
ohlc_data = data[['Open', 'High', 'Low', 'Close']] | |
csv_path = "ohlc_data.csv" | |
# Save the OHLC data to a CSV file | |
ohlc_data.to_csv(csv_path) | |
return csv_path | |
def csv_to_dataframe(self,csv_path): | |
# Replace 'your_file.csv' with the actual path to your CSV file | |
csv_file_path = csv_path | |
# Read the CSV file into a DataFrame | |
df = pd.read_csv(csv_file_path) | |
# Now you can work with the 'df' DataFrame | |
return df # Display the first few rows of the DataFrame | |
def save_dataframe_in_text_file(self,df): | |
output_file_path = 'output.txt' | |
# Convert the DataFrame to a text file | |
df.to_csv(output_file_path, sep='\t', index=False) | |
return output_file_path | |
def csv_loader(self,output_file_path): | |
loader = UnstructuredFileLoader(output_file_path, strategy="fast") | |
docs = loader.load() | |
return docs | |
def document_text_spilliter(self,docs): | |
""" | |
Split documents into chunks for efficient processing. | |
Returns: | |
List[str]: List of split document chunks. | |
""" | |
# Initialize the text splitter with specified chunk size and overlap | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=1000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Return the list of split document chunks | |
return split_docs | |
def change_bullet_points(self,text): | |
nltk.download('punkt') # Download the sentence tokenizer data (only need to run this once) | |
# Example passage | |
passage = text | |
# Tokenize the passage into sentences | |
sentences = sent_tokenize(passage) | |
bullet_string = "" | |
# Print the extracted sentences | |
for sentence in sentences: | |
bullet_string+="* "+sentence+"\n" | |
return bullet_string | |
def one_year_summary(self, keyword): | |
try: | |
# Step 1: Get the finance data and convert to DataFrame | |
csv_path = self.get_finance_data(keyword) | |
print(f"CSV path: {csv_path}") # For debugging, ensure it's correct. | |
df = self.csv_to_dataframe(csv_path) | |
if df is None or df.empty: | |
raise ValueError("The DataFrame is empty. Please check the CSV content.") | |
# Step 2: Save the DataFrame to a text file | |
output_file_path = self.save_dataframe_in_text_file(df) | |
print(f"Output file saved at: {output_file_path}") | |
# Step 3: Load and split the document data | |
docs = self.csv_loader(output_file_path) | |
if not docs: | |
raise ValueError("No content was loaded from the CSV file.") | |
split_docs = self.document_text_spilliter(docs) | |
if not split_docs: | |
raise ValueError("Document splitting failed. No valid chunks were created.") | |
# Step 4: Prepare the summarization prompt | |
prompt_template = """Analyze the Financial Details and Write a brief and concise summary of how the company performed: | |
{text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Step 5: Prepare the refine prompt for summarization chain | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary " | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary. " | |
"If the context isn't useful, return the original summary." | |
"10 lines of summary are enough." | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Step 6: Load the summarization chain with Azure ChatGPT | |
chain = load_summarize_chain( | |
llm=AzureChatOpenAI(azure_deployment="GPT-4o"), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Step 7: Generate the summary | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
# Step 8: Process and return the summary | |
one_year_perfomance_summary = self.change_bullet_points(result["output_text"]) | |
# Log final summary | |
print(f"Generated Summary: {one_year_perfomance_summary}") | |
return one_year_perfomance_summary | |
except Exception as e: | |
print(f"Error during one_year_summary processing: {str(e)}") | |
return None | |
def main(self,keyword): | |
clean_url = self.get_url(keyword) | |
link_summary = self.get_each_link_summary(clean_url) | |
clean_summary = self.one_day_summary(link_summary) | |
key_value = self.extract_key_value_pair(clean_summary) | |
return clean_summary, key_value | |
def company_names(self,input_text): | |
words = input_text.split("-") | |
return words[1] | |
def gradio_interface(self): | |
with gr.Blocks(css="style.css",theme='SherlockRamos/Feliz') as app: | |
gr.HTML(""" | |
<style> | |
.footer { | |
display: none !important; | |
} | |
footer { | |
display: none !important; | |
} | |
#foot { | |
display: none !important; | |
} | |
.svelte-1fzp3xt { | |
display: none !important; | |
} | |
#root > div > div > div { | |
padding-bottom: 0 !important; | |
} | |
.custom-footer { | |
text-align: center; | |
padding: 10px; | |
font-size: 14px; | |
color: #333; | |
} | |
</style> | |
""") | |
gr.HTML("""<div><center><img src="https://seeklogo.com/images/I/intercontinental-exchange-logo-5117BA0846-seeklogo.com.png" alt="Broadridge" style="width:100px;height:100px;"></center></div>""") | |
gr.HTML("""<center class="darkblue" text-align:center;padding:30px;'><center> | |
<center><h1 class ="center" style="color:#fff"></h1></center> | |
<br><center><h1 style="color:#000">Finance Tool for Investors</h1></center>""") | |
# gr.HTML("""<center class="darkblue" style='background-color:rgb(0,1,36); text-align:center;padding:25px;'><center><h1 class ="center"> | |
# <img src="file=logo.png" height="110px" width="280px"></h1></center> | |
# <br><h1 style="color:#fff"> </h1></center>""") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150, ): | |
input_news = gr.Textbox(label="Company Name") | |
with gr.Accordion("List_of_Companies", open = False): | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150 ): | |
gr.Examples( | |
[["Apple Inc. - AAPL"], ["Microsoft Corporation - MSFT"],["Amazon.com Inc. - AMZN"],["Facebook Inc. - FB"],["Tesla Inc. - TSLA"]], | |
[input_news], | |
input_news, | |
fn=self.company_names, | |
cache_examples=True, | |
) | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
analyse = gr.Button("Analyse") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=0.50, min_width=150): | |
result_summary = gr.Textbox(label="Summary For Last Day Perfomance", lines = 12) | |
with gr.Column(scale=0.50, min_width=150): | |
key_value_pair_result = gr.Textbox(label="Discussed Topics", lines = 12) | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=0): | |
plot_for_day =gr.Plot(label="Sentiment for Last Day") | |
plot_for_day.width = 500 | |
plot_for_day.height = 600 | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
analyse_sentiment = gr.Button("Analyse Sentiment For Last Day") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150, ): | |
one_year_summary = gr.Textbox(label="Summary For One Year Performance",lines = 12) | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
one_year = gr.Button("Analyse One Year Summary") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=0): | |
plot_for_year =gr.Plot(label="Sentiment for One Year") | |
plot_for_day.width = 500 | |
plot_for_day.height = 600 | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
analyse_sentiment_for_year = gr.Button("Analyse Sentiment For One Year") | |
analyse.click(self.main, input_news, [result_summary,key_value_pair_result]) | |
analyse_sentiment.click(self.display_graph,result_summary,[plot_for_day]) | |
one_year.click(self.one_year_summary,input_news,one_year_summary) | |
analyse_sentiment_for_year.click(self.display_graph,one_year_summary,[plot_for_year]) | |
app.launch(debug=True) | |
if __name__ == "__main__": | |
text_process = KeyValueExtractor() | |
text_process.gradio_interface() |