Shivam29rathore's picture
Update app.py
c818447
raw
history blame contribute delete
No virus
7.9 kB
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
import pickle
import torch
from transformers import PegasusTokenizer, PegasusForConditionalGeneration
import tensorflow as tf
from tensorflow.python.lib.io import file_io
from nltk.tokenize import sent_tokenize
import io
#contents = pickle.load(f) becomes...
#contents = CPU_Unpickler(f).load()
model_path = "finbert.sav"
#load model from drive
with open(model_path, "rb") as f:
model1= pickle.load(f)
tf.compat.v1.disable_eager_execution()
# Let's load the model and the tokenizer
model_name = "human-centered-summarization/financial-summarization-pegasus"
tokenizer = PegasusTokenizer.from_pretrained(model_name)
model2 = PegasusForConditionalGeneration.from_pretrained(model_name)
#tokenizer = AutoTokenizer.from_pretrained(checkpoint)
#model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint)
import nltk
from finbert_embedding.embedding import FinbertEmbedding
import pandas as pd
from nltk.cluster import KMeansClusterer
import numpy as np
import os
from scipy.spatial import distance_matrix
from tensorflow.python.lib.io import file_io
import pickle
nltk.download('punkt')
def finbert(word):
# Instantiate path to store each text Datafile in dataframe
data_path = "/tmp/"
if not os.path.exists(data_path):
os.makedirs(data_path)
input_ = "/tmp/input.txt"
# Write file to disk so we can convert each datapoint to a txt file
with open(input_, "w") as file:
file.write(word)
# read the written txt into a variable to start clustering
with open(input_ , 'r') as f:
text = f.read()
# Create tokens from the txt file
tokens = nltk.sent_tokenize(text)
# Strip out trailing and leading white spaces from tokens
sentences = [word.strip() for word in tokens]
#Create a DataFrame from the tokens
data = pd.DataFrame(sentences)
# Assign name Sentences to the column containing text tokens
data.columns = ['Sentences']
# Function to create numerical embeddings for each text tokens in dataframe
def get_sentence_embeddings():
# Create empty list for sentence embeddings
sentence_list = []
# Loop through all sentences and append sentence embeddings to list
for i in tokens:
sentence_embedding = model1.sentence_vector(i)
sentence_list.append(sentence_embedding)
# Create empty list for ndarray
sentence_array=[]
# Loop through sentence list and change data type from tensor to array
for i in sentence_list:
sentence_array.append(i.numpy())
# return sentence embeddings as list
return sentence_array
# Apply get_sentence_embeddings to dataframe to create column Embeddings
data['Embeddings'] = get_sentence_embeddings()
#Number of expected sentences
NUM_CLUSTERS = 10
iterations = 8
# Convert Embeddings into an array and store in variable X
X = np.array(data['Embeddings'].to_list())
#Build k-means cluster algorithm
Kclusterer = KMeansClusterer(
NUM_CLUSTERS,
distance = nltk.cluster.util.cosine_distance,
repeats = iterations, avoid_empty_clusters = True)
# if length of text is too short, K means would return an error
# use the try except block to return the text as result if it is too short.
try:
assigned_clusters = Kclusterer.cluster(X,assign_clusters=True)
# Apply Kmean Cluster to DataFrame and create new columns Clusters and Centroid
data['Cluster'] = pd.Series(assigned_clusters, index = data.index)
data['Centroid'] = data['Cluster'].apply(lambda x: Kclusterer.means()[x])
# return the text if clustering algorithm catches an exceptiona and move to the next text file
except ValueError:
return text
# function that computes the distance of each embeddings from the centroid of the cluster
def distance_from_centroid(row):
return distance_matrix([row['Embeddings']], [row['Centroid'].tolist()])[0][0]
# apply distance_from_centroid function to data
data['Distance_From_Centroid'] = data.apply(distance_from_centroid, axis =1)
## Return Final Summary
summary = " ".join(data.sort_values(
'Distance_From_Centroid',
ascending = True).groupby('Cluster').head(1).sort_index()['Sentences'].tolist())
import re
words = list()
for text in summary.split():
text = re.sub(r'\n','',text)
text = re.sub(r'\s$','',text)
words.append(text)
summary = " ".join(words)
return (summary," Length of Input:---->"+str(len(word))," Length of Output:----> "+str(len(summary)))
def pegasus(text):
'''A function to obtain summaries for each tokenized sentence.
It returns a summarized document as output'''
import nltk
nltk.download('punkt')
import os
data_path = "/tmp/"
if not os.path.exists(data_path):
os.makedirs(data_path)
input_ = "/tmp/input.txt"
with open(input_, "w") as file:
file.write(text)
# read the written txt into a variable
with open(input_ , 'r') as f:
text_ = f.read()
def tokenized_sentences(file):
'''A function to generate chunks of sentences and texts.
Returns tokenized texts'''
# Create empty arrays
tokenized_sentences = []
sentences = []
length = 0
for sentence in sent_tokenize(file):
length += len(sentence)
# 512 is the maximum input length for the Pegasus model
if length < 512:
sentences.append(sentence)
else:
tokenized_sentences.append(sentences)
sentences = [sentence]
length = len(sentence)
sentences = [sentence.strip() for sentence in sentences]
# Append all tokenized sentences
if sentences:
tokenized_sentences.append(sentences)
return tokenized_sentences
tokenized = tokenized_sentences(text_)
# Use GPU if available
device = 'cuda' if torch.cuda.is_available() else 'cpu'
global summary
# Create an empty array for all summaries
summary = []
# Loop to encode tokens, to generate abstractive summary and finally decode tokens
for token in tokenized:
# Encoding
inputs = tokenizer.encode(' '.join(token), truncation=True, return_tensors='pt')
# Use CPU or GPU
inputs = inputs.to(device)
# Get summaries from transformer model
all_summary = model2.to(device).generate(inputs,do_sample=True,
max_length=50, top_k=50, top_p=0.95,
num_beams = 5, early_stopping=True)
# num_return_sequences=5)
# length_penalty=0.2, no_repeat_ngram_size=2
# min_length=10,
# max_length=50)
# Decoding
output = [tokenizer.decode(each_summary, skip_special_tokens=True, clean_up_tokenization_spaces=False) for each_summary in all_summary]
# Append each output to array
summary.append(output)
# Get final summary
summary = [sentence for each in summary for sentence in each]
final = "".join(summary)
return final
import gradio as gr
interface1 = gr.Interface(fn=finbert,
inputs =gr.inputs.Textbox(lines=15,placeholder="Enter your text !!",label='Input-10k Sections'),
outputs=gr.outputs.Textbox(label='Output')).launch()