from transformers import AutoTokenizer, AutoModelForSeq2SeqLM import pickle import torch from transformers import PegasusTokenizer, PegasusForConditionalGeneration import tensorflow as tf from tensorflow.python.lib.io import file_io from nltk.tokenize import sent_tokenize import io #contents = pickle.load(f) becomes... #contents = CPU_Unpickler(f).load() model_path = "finbert.sav" #load model from drive with open(model_path, "rb") as f: model1= pickle.load(f) tf.compat.v1.disable_eager_execution() # Let's load the model and the tokenizer model_name = "human-centered-summarization/financial-summarization-pegasus" tokenizer = PegasusTokenizer.from_pretrained(model_name) model2 = PegasusForConditionalGeneration.from_pretrained(model_name) #tokenizer = AutoTokenizer.from_pretrained(checkpoint) #model = AutoModelForSeq2SeqLM.from_pretrained(checkpoint) import nltk from finbert_embedding.embedding import FinbertEmbedding import pandas as pd from nltk.cluster import KMeansClusterer import numpy as np import os from scipy.spatial import distance_matrix from tensorflow.python.lib.io import file_io import pickle nltk.download('punkt') def finbert(word): # Instantiate path to store each text Datafile in dataframe data_path = "/tmp/" if not os.path.exists(data_path): os.makedirs(data_path) input_ = "/tmp/input.txt" # Write file to disk so we can convert each datapoint to a txt file with open(input_, "w") as file: file.write(word) # read the written txt into a variable to start clustering with open(input_ , 'r') as f: text = f.read() # Create tokens from the txt file tokens = nltk.sent_tokenize(text) # Strip out trailing and leading white spaces from tokens sentences = [word.strip() for word in tokens] #Create a DataFrame from the tokens data = pd.DataFrame(sentences) # Assign name Sentences to the column containing text tokens data.columns = ['Sentences'] # Function to create numerical embeddings for each text tokens in dataframe def get_sentence_embeddings(): # Create empty list for sentence embeddings sentence_list = [] # Loop through all sentences and append sentence embeddings to list for i in tokens: sentence_embedding = model1.sentence_vector(i) sentence_list.append(sentence_embedding) # Create empty list for ndarray sentence_array=[] # Loop through sentence list and change data type from tensor to array for i in sentence_list: sentence_array.append(i.numpy()) # return sentence embeddings as list return sentence_array # Apply get_sentence_embeddings to dataframe to create column Embeddings data['Embeddings'] = get_sentence_embeddings() #Number of expected sentences NUM_CLUSTERS = 10 iterations = 8 # Convert Embeddings into an array and store in variable X X = np.array(data['Embeddings'].to_list()) #Build k-means cluster algorithm Kclusterer = KMeansClusterer( NUM_CLUSTERS, distance = nltk.cluster.util.cosine_distance, repeats = iterations, avoid_empty_clusters = True) # if length of text is too short, K means would return an error # use the try except block to return the text as result if it is too short. try: assigned_clusters = Kclusterer.cluster(X,assign_clusters=True) # Apply Kmean Cluster to DataFrame and create new columns Clusters and Centroid data['Cluster'] = pd.Series(assigned_clusters, index = data.index) data['Centroid'] = data['Cluster'].apply(lambda x: Kclusterer.means()[x]) # return the text if clustering algorithm catches an exceptiona and move to the next text file except ValueError: return text # function that computes the distance of each embeddings from the centroid of the cluster def distance_from_centroid(row): return distance_matrix([row['Embeddings']], [row['Centroid'].tolist()])[0][0] # apply distance_from_centroid function to data data['Distance_From_Centroid'] = data.apply(distance_from_centroid, axis =1) ## Return Final Summary summary = " ".join(data.sort_values( 'Distance_From_Centroid', ascending = True).groupby('Cluster').head(1).sort_index()['Sentences'].tolist()) import re words = list() for text in summary.split(): text = re.sub(r'\n','',text) text = re.sub(r'\s$','',text) words.append(text) summary = " ".join(words) return (summary," Length of Input:---->"+str(len(word))," Length of Output:----> "+str(len(summary))) def pegasus(text): '''A function to obtain summaries for each tokenized sentence. It returns a summarized document as output''' import nltk nltk.download('punkt') import os data_path = "/tmp/" if not os.path.exists(data_path): os.makedirs(data_path) input_ = "/tmp/input.txt" with open(input_, "w") as file: file.write(text) # read the written txt into a variable with open(input_ , 'r') as f: text_ = f.read() def tokenized_sentences(file): '''A function to generate chunks of sentences and texts. Returns tokenized texts''' # Create empty arrays tokenized_sentences = [] sentences = [] length = 0 for sentence in sent_tokenize(file): length += len(sentence) # 512 is the maximum input length for the Pegasus model if length < 512: sentences.append(sentence) else: tokenized_sentences.append(sentences) sentences = [sentence] length = len(sentence) sentences = [sentence.strip() for sentence in sentences] # Append all tokenized sentences if sentences: tokenized_sentences.append(sentences) return tokenized_sentences tokenized = tokenized_sentences(text_) # Use GPU if available device = 'cuda' if torch.cuda.is_available() else 'cpu' global summary # Create an empty array for all summaries summary = [] # Loop to encode tokens, to generate abstractive summary and finally decode tokens for token in tokenized: # Encoding inputs = tokenizer.encode(' '.join(token), truncation=True, return_tensors='pt') # Use CPU or GPU inputs = inputs.to(device) # Get summaries from transformer model all_summary = model2.to(device).generate(inputs,do_sample=True, max_length=50, top_k=50, top_p=0.95, num_beams = 5, early_stopping=True) # num_return_sequences=5) # length_penalty=0.2, no_repeat_ngram_size=2 # min_length=10, # max_length=50) # Decoding output = [tokenizer.decode(each_summary, skip_special_tokens=True, clean_up_tokenization_spaces=False) for each_summary in all_summary] # Append each output to array summary.append(output) # Get final summary summary = [sentence for each in summary for sentence in each] final = "".join(summary) return final import gradio as gr interface1 = gr.Interface(fn=finbert, inputs =gr.inputs.Textbox(lines=15,placeholder="Enter your text !!",label='Input-10k Sections'), outputs=gr.outputs.Textbox(label='Output')).launch()