Spaces:
Sleeping
Sleeping
import openai | |
import os | |
import pdfplumber | |
from langchain.chains.mapreduce import MapReduceChain | |
from langchain.text_splitter import CharacterTextSplitter | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain.chat_models import ChatOpenAI | |
from langchain.document_loaders import UnstructuredFileLoader | |
from langchain.prompts import PromptTemplate | |
import logging | |
import json | |
from typing import List | |
import mimetypes | |
import validators | |
import requests | |
import tempfile | |
from bs4 import BeautifulSoup | |
from langchain.chains import create_extraction_chain | |
from GoogleNews import GoogleNews | |
import pandas as pd | |
import requests | |
import gradio as gr | |
import re | |
from langchain.document_loaders import WebBaseLoader | |
from langchain.chains.llm import LLMChain | |
from langchain.chains.combine_documents.stuff import StuffDocumentsChain | |
from transformers import pipeline | |
import plotly.express as px | |
import yfinance as yf | |
import pandas as pd | |
import nltk | |
from nltk.tokenize import sent_tokenize | |
class KeyValueExtractor: | |
def __init__(self): | |
""" | |
Initialize the ContractSummarizer object. | |
Parameters: | |
pdf_file_path (str): The path to the input PDF file. | |
""" | |
self.model = "facebook/bart-large-mnli" | |
def get_news(self,keyword): | |
googlenews = GoogleNews(lang='en', region='US', period='1d', encode='utf-8') | |
googlenews.clear() | |
googlenews.search(keyword) | |
googlenews.get_page(2) | |
news_result = googlenews.result(sort=True) | |
news_data_df = pd.DataFrame.from_dict(news_result) | |
news_data_df.info() | |
# Display header of dataframe. | |
news_data_df.head() | |
tot_news_link = [] | |
for index, headers in news_data_df.iterrows(): | |
news_link = str(headers['link']) | |
tot_news_link.append(news_link) | |
return tot_news_link | |
def url_format(self,urls): | |
tot_url_links = [] | |
for url_text in urls: | |
# Define a regex pattern to match URLs starting with 'http' or 'https' | |
pattern = r'(https?://[^\s]+)' | |
# Search for the URL in the text using the regex pattern | |
match = re.search(pattern, url_text) | |
if match: | |
extracted_url = match.group(1) | |
tot_url_links.append(extracted_url) | |
else: | |
print("No URL found in the given text.") | |
return tot_url_links | |
def clear_error_ulr(self,urls): | |
error_url = [] | |
for url in urls: | |
if validators.url(url): | |
headers = {'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/114.0.0.0 Safari/537.36',} | |
r = requests.get(url,headers=headers) | |
if r.status_code != 200: | |
# raise ValueError("Check the url of your file; returned status code %s" % r.status_code) | |
print(f"Error fetching {url}:") | |
error_url.append(url) | |
continue | |
cleaned_list_url = [item for item in urls if item not in error_url] | |
return cleaned_list_url | |
def get_each_link_summary(self,urls): | |
each_link_summary = "" | |
for url in urls: | |
loader = WebBaseLoader(url) | |
docs = loader.load() | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=3000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Prepare the prompt template for summarization | |
prompt_template = """Write a concise summary of the following: | |
{text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Prepare the template for refining the summary with additional context | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary" | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary" | |
"If the context isn't useful, return the original summary." | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Load the summarization chain using the ChatOpenAI language model | |
chain = load_summarize_chain( | |
llm = ChatOpenAI(temperature=0), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Generate the refined summary using the loaded summarization chain | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
print(result["output_text"]) | |
# Return the refined summary | |
each_link_summary = each_link_summary + result["output_text"] | |
return each_link_summary | |
def save_text_to_file(self,each_link_summary) -> str: | |
""" | |
Load the text from the saved file and split it into documents. | |
Returns: | |
List[str]: List of document texts. | |
""" | |
# Get the path to the text file where the extracted text will be saved | |
file_path = "extracted_text.txt" | |
try: | |
with open(file_path, 'w') as file: | |
# Write the extracted text into the text file | |
file.write(each_link_summary) | |
# Return the file path of the saved text file | |
return file_path | |
except IOError as e: | |
# If an IOError occurs during the file saving process, log the error | |
logging.error(f"Error while saving text to file: {e}") | |
def document_loader(self,file_path) -> List[str]: | |
""" | |
Load the text from the saved file and split it into documents. | |
Returns: | |
List[str]: List of document texts. | |
""" | |
# Initialize the UnstructuredFileLoader | |
loader = UnstructuredFileLoader(file_path, strategy="fast") | |
# Load the documents from the file | |
docs = loader.load() | |
# Return the list of loaded document texts | |
return docs | |
def document_text_spilliter(self,docs) -> List[str]: | |
""" | |
Split documents into chunks for efficient processing. | |
Returns: | |
List[str]: List of split document chunks. | |
""" | |
# Initialize the text splitter with specified chunk size and overlap | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=3000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Return the list of split document chunks | |
return split_docs | |
def extract_key_value_pair_for_news(self,content) -> None: | |
""" | |
Extract key-value pairs from the refined summary. | |
Prints the extracted key-value pairs. | |
""" | |
try: | |
# Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
response = openai.Completion.create( | |
engine="text-davinci-003", # You can choose a different engine as well | |
temperature = 0, | |
prompt=f"Get maximum count meaningfull key value pairs. content in backticks.```{content}```.", | |
max_tokens=1000 # You can adjust the length of the response | |
) | |
# Extract and return the chatbot's reply | |
result = response['choices'][0]['text'].strip() | |
return result | |
except Exception as e: | |
# If an error occurs during the key-value extraction process, log the error | |
logging.error(f"Error while extracting key-value pairs: {e}") | |
print("Error:", e) | |
def refine_summary(self,split_docs) -> str: | |
""" | |
Refine the summary using the provided context. | |
Returns: | |
str: Refined summary. | |
""" | |
# Prepare the prompt template for summarization | |
prompt_template = """Write a detalied broad abractive summary of the following: | |
{text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Prepare the template for refining the summary with additional context | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary" | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary" | |
"If the context isn't useful, return the original summary." | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Load the summarization chain using the ChatOpenAI language model | |
chain = load_summarize_chain( | |
llm = ChatOpenAI(temperature=0), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Generate the refined summary using the loaded summarization chain | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
key_value_pair = self.extract_key_value_pair_for_news(result["output_text"]) | |
# Return the refined summary | |
return result["output_text"],key_value_pair | |
def analyze_sentiment_for_graph(self, text): | |
pipe = pipeline("zero-shot-classification", model=self.model) | |
label=["Positive", "Negative", "Neutral"] | |
result = pipe(text, label) | |
sentiment_scores = { | |
result['labels'][0]: result['scores'][0], | |
result['labels'][1]: result['scores'][1], | |
result['labels'][2]: result['scores'][2] | |
} | |
return sentiment_scores | |
def display_graph_for_news(self,text): | |
sentiment_scores = self.analyze_sentiment_for_graph(text) | |
labels = sentiment_scores.keys() | |
scores = sentiment_scores.values() | |
fig = px.bar(x=scores, y=labels, orientation='h', color=labels, color_discrete_map={"Negative": "red", "Positive": "green", "Neutral": "gray"}) | |
fig.update_traces(texttemplate='%{x:.1%}', textposition='outside',textfont=dict(size=6)) | |
fig.update_layout(title="Sentiment Analysis",width=600) | |
formatted_pairs = [] | |
for key, value in sentiment_scores.items(): | |
formatted_value = round(value, 2) # Round the value to two decimal places | |
formatted_pairs.append(f"{key} : {formatted_value}") | |
result_string = '\t'.join(formatted_pairs) | |
return fig | |
def main_for_news(self,keyword): | |
try: | |
urls = self.get_news(keyword) | |
tot_urls = self.url_format(urls) | |
clean_url = self.clear_error_ulr(tot_urls) | |
each_link_summary = self.get_each_link_summary(clean_url) | |
print("half") | |
file_path = self.save_text_to_file(each_link_summary) | |
docs = self.document_loader(file_path) | |
split_docs = self.document_text_spilliter(docs) | |
print("half1") | |
result_summary_for_news,key_value_pair_for_news = self.refine_summary(split_docs) | |
fig = self.display_graph_for_news(result_summary_for_news) | |
return result_summary_for_news,key_value_pair_for_news,fig | |
except: | |
return "Sorry No URL Found!! Please Try Again","",None | |
def get_url(self,keyword): | |
return f"https://finance.yahoo.com/quote/{keyword}?p={keyword}" | |
def get_link_summary_for_finance(self,url): | |
loader = WebBaseLoader(url) | |
docs = loader.load() | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=3000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Prepare the prompt template for summarization | |
prompt_template = """The give text is Finance Stock Details for one company i want to get values for | |
Previous Close : [value] | |
Open : [value] | |
Bid : [value] | |
Ask : [value] | |
Day's Range : [value] | |
52 Week Range : [value] | |
Volume : [value] | |
Avg. Volume : [value] | |
Market Cap : [value] | |
Beta (5Y Monthly) : [value] | |
PE Ratio (TTM) : [value] | |
EPS (TTM) : [value] | |
Earnings Date : [value] | |
Forward Dividend & Yield : [value] | |
Ex-Dividend Date : [value] | |
1y Target Est : [value] | |
these details form that and Write a abractive summary about those details: | |
Given Text: {text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Prepare the template for refining the summary with additional context | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary" | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary" | |
"If the context isn't useful, return the original summary." | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Load the summarization chain using the ChatOpenAI language model | |
chain = load_summarize_chain( | |
llm = ChatOpenAI(temperature=0), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Generate the refined summary using the loaded summarization chain | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
print(result["output_text"]) | |
return result["output_text"] | |
def one_day_summary_finance(self,content) -> None: | |
# Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
response = openai.Completion.create( | |
engine="text-davinci-003", # You can choose a different engine as well | |
temperature = 0, | |
prompt=f"i want detailed Summary from given finance details. i want information like what happen today comparing last day good or bad Bullish or Bearish like these details i want summary. content in backticks.```{content}```.", | |
max_tokens=1000 # You can adjust the length of the response | |
) | |
# Extract and return the chatbot's reply | |
result = response['choices'][0]['text'].strip() | |
print(result) | |
return result | |
def extract_key_value_pair_for_finance(self,content) -> None: | |
""" | |
Extract key-value pairs from the refined summary. | |
Prints the extracted key-value pairs. | |
""" | |
try: | |
# Use OpenAI's Completion API to analyze the text and extract key-value pairs | |
response = openai.Completion.create( | |
engine="text-davinci-003", # You can choose a different engine as well | |
temperature = 0, | |
prompt=f"Get maximum count meaningfull key value pairs. content in backticks.```{content}```.", | |
max_tokens=1000 # You can adjust the length of the response | |
) | |
# Extract and return the chatbot's reply | |
result = response['choices'][0]['text'].strip() | |
return result | |
except Exception as e: | |
# If an error occurs during the key-value extraction process, log the error | |
logging.error(f"Error while extracting key-value pairs: {e}") | |
print("Error:", e) | |
def analyze_sentiment_for_graph_finance(self, text): | |
pipe = pipeline("zero-shot-classification", model=self.model) | |
label=["Positive", "Negative", "Neutral"] | |
result = pipe(text, label) | |
sentiment_scores = { | |
result['labels'][0]: result['scores'][0], | |
result['labels'][1]: result['scores'][1], | |
result['labels'][2]: result['scores'][2] | |
} | |
return sentiment_scores | |
def display_graph_for_finance(self,text): | |
sentiment_scores = self.analyze_sentiment_for_graph_finance(text) | |
labels = sentiment_scores.keys() | |
scores = sentiment_scores.values() | |
fig = px.bar(x=scores, y=labels, orientation='h', color=labels, color_discrete_map={"Negative": "red", "Positive": "green", "Neutral": "gray"}) | |
fig.update_traces(texttemplate='%{x:.1%}', textposition='outside',textfont=dict(size=6)) | |
fig.update_layout(title="Sentiment Analysis",width=600) | |
formatted_pairs = [] | |
for key, value in sentiment_scores.items(): | |
formatted_value = round(value, 2) # Round the value to two decimal places | |
formatted_pairs.append(f"{key} : {formatted_value}") | |
result_string = '\t'.join(formatted_pairs) | |
return fig | |
def get_finance_data(self,symbol): | |
# Define the stock symbol and date range | |
start_date = '2022-08-19' | |
end_date = '2023-08-19' | |
# Fetch historical OHLC data using yfinance | |
data = yf.download(symbol, start=start_date, end=end_date) | |
# Select only the OHLC columns | |
ohlc_data = data[['Open', 'High', 'Low', 'Close']] | |
csv_path = "ohlc_data.csv" | |
# Save the OHLC data to a CSV file | |
ohlc_data.to_csv(csv_path) | |
return csv_path | |
def csv_to_dataframe(self,csv_path): | |
# Replace 'your_file.csv' with the actual path to your CSV file | |
csv_file_path = csv_path | |
# Read the CSV file into a DataFrame | |
df = pd.read_csv(csv_file_path) | |
# Now you can work with the 'df' DataFrame | |
return df # Display the first few rows of the DataFrame | |
def save_dataframe_in_text_file(self,df): | |
output_file_path = 'output.txt' | |
# Convert the DataFrame to a text file | |
df.to_csv(output_file_path, sep='\t', index=False) | |
return output_file_path | |
def csv_loader(self,output_file_path): | |
loader = UnstructuredFileLoader(output_file_path, strategy="fast") | |
docs = loader.load() | |
return docs | |
def document_text_spilliter_finance(self,docs): | |
""" | |
Split documents into chunks for efficient processing. | |
Returns: | |
List[str]: List of split document chunks. | |
""" | |
# Initialize the text splitter with specified chunk size and overlap | |
text_splitter = CharacterTextSplitter.from_tiktoken_encoder( | |
chunk_size=1000, chunk_overlap=200 | |
) | |
# Split the documents into chunks | |
split_docs = text_splitter.split_documents(docs) | |
# Return the list of split document chunks | |
return split_docs | |
def change_bullet_points(self,text): | |
nltk.download('punkt') # Download the sentence tokenizer data (only need to run this once) | |
# Example passage | |
passage = text | |
# Tokenize the passage into sentences | |
sentences = sent_tokenize(passage) | |
bullet_string = "" | |
# Print the extracted sentences | |
for sentence in sentences: | |
bullet_string+="* "+sentence+"\n" | |
return bullet_string | |
def one_year_summary_for_finance(self,keyword): | |
csv_path = self.get_finance_data(keyword) | |
df = self.csv_to_dataframe(csv_path) | |
output_file_path = self.save_dataframe_in_text_file(df) | |
docs = self.csv_loader(output_file_path) | |
split_docs = self.document_text_spilliter(docs) | |
prompt_template = """Analyze the Financial Details and Write a abractive quick short summary how the company perform up and down,Bullish/Bearish of the following: | |
{text} | |
CONCISE SUMMARY:""" | |
prompt = PromptTemplate.from_template(prompt_template) | |
# Prepare the template for refining the summary with additional context | |
refine_template = ( | |
"Your job is to produce a final summary\n" | |
"We have provided an existing summary up to a certain point: {existing_answer}\n" | |
"We have the opportunity to refine the existing summary" | |
"(only if needed) with some more context below.\n" | |
"------------\n" | |
"{text}\n" | |
"------------\n" | |
"Given the new context, refine the original summary" | |
"If the context isn't useful, return the original summary." | |
"10 line summary is enough" | |
) | |
refine_prompt = PromptTemplate.from_template(refine_template) | |
# Load the summarization chain using the ChatOpenAI language model | |
chain = load_summarize_chain( | |
llm = ChatOpenAI(temperature=0), | |
chain_type="refine", | |
question_prompt=prompt, | |
refine_prompt=refine_prompt, | |
return_intermediate_steps=True, | |
input_key="input_documents", | |
output_key="output_text", | |
) | |
# Generate the refined summary using the loaded summarization chain | |
result = chain({"input_documents": split_docs}, return_only_outputs=True) | |
one_year_perfomance_summary = self.change_bullet_points(result["output_text"]) | |
plot_for_year = self.display_graph_for_finance(one_year_perfomance_summary) | |
# Return the refined summary | |
return one_year_perfomance_summary, plot_for_year | |
def main_for_finance_tool(self,keyword): | |
clean_url = self.get_url(keyword) | |
link_summary = self.get_link_summary_for_finance(clean_url) | |
clean_summary = self.one_day_summary_finance(link_summary) | |
key_value = self.extract_key_value_pair_for_finance(clean_summary) | |
sentiment_plot_for_one_day = self.display_graph_for_finance(clean_summary) | |
return clean_summary, key_value, sentiment_plot_for_one_day | |
def company_names(self,input_text): | |
words = input_text.split("-") | |
return words[1] | |
def clear(self,input_news,result_summary_for_news,key_value_pair_result_for_news,sentiment_plot): | |
input_news = None | |
result_summary_for_news = None | |
key_value_pair_result_for_news = None | |
sentiment_plot = None | |
return input_news,result_summary_for_news,key_value_pair_result_for_news,sentiment_plot | |
def gradio_interface(self): | |
with gr.Blocks(css="style.css",theme= 'karthikeyan-adople/hudsonhayes-gray') as app: | |
gr.HTML("""<center class="darkblue" style='background-color:rgb(0,1,36); text-align:center;padding:25px;'><center> | |
<h1 style="color:#fff" class ="center">ADOPLE AI</h1></center> | |
<br><h1 style="color:#fff">Company performance summarisation and sentiment analysis</h1></center>""") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150, ): | |
input_news = gr.Textbox(label="Company Name") | |
with gr.Accordion("Sample Inputs", open = True): | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150 ): | |
gr.Examples( | |
[["Apple Inc. - AAPL"], ["Microsoft Corporation - MSFT"],["Amazon.com Inc. - AMZN"],["Tesla Inc. - TSLA"],["Alphabet Inc. - GOOG"],[" NVIDIA Corporation - NVDA"]], | |
[input_news], | |
input_news, | |
fn=self.company_names, | |
cache_examples=True, | |
) | |
with gr.Tabs(): | |
with gr.TabItem("Last Day Analysis"): | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
analyse_summary_for_finance = gr.Button("Analyse") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1, min_width=150): | |
result_summary = gr.Textbox(label="Summary", lines = 10) | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=0.50, min_width=0): | |
key_value_pair_result = gr.Textbox(label="Topic Reflected", lines = 10) | |
with gr.Column(scale=0.50, min_width=0): | |
plot_for_one_day =gr.Plot(label="Sentiment", size=(500, 500)) | |
with gr.TabItem("One Year Analyis"): | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150): | |
one_year = gr.Button("Analyse") | |
with gr.Row(elem_id="col-container"): | |
with gr.Column(scale=1.0, min_width=150, ): | |
one_year_summary = gr.Textbox(label="Summary Of One Year Perfomance",lines = 20) | |
with gr.Column(scale=1.0, min_width=0): | |
plot_for_year =gr.Plot(label="Sentiment", size=(500, 500)) | |
analyse_summary_for_finance.click(self.main_for_finance_tool, input_news, [result_summary,key_value_pair_result,plot_for_one_day]) | |
one_year.click(self.one_year_summary_for_finance,input_news,[one_year_summary,plot_for_year]) | |
app.launch(debug = True) | |
if __name__ == "__main__": | |
text_process = KeyValueExtractor() | |
text_process.gradio_interface() |