test_sum / app.py
AkashKhamkar's picture
Update app.py
9ea7c41
raw
history blame
11.4 kB
import streamlit as st
import sentence_transformers
from transformers import AutoTokenizer
from youtube_transcript_api import YouTubeTranscriptApi
import os
import ast
import pandas as pd
from segmentation import SemanticTextSegmentation
import re
from symspellpy import SymSpell
import pkg_resources
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM
from torch import cuda
from transformers import pipeline
import nltk
nltk.download('stopwords')
from PIL import Image
from PIL import ImageDraw
from PIL import ImageFont
if not os.path.exists('./transcripts'):
os.mkdir('./transcripts')
device = 'cuda' if cuda.is_available() else 'cpu'
def clean_text(link,start,end):
tokenizer = AutoTokenizer.from_pretrained("t5-base")
sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
dictionary_path = pkg_resources.resource_filename(
"symspellpy", "frequency_dictionary_en_82_765.txt"
)
sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1)
def id_ts_grabber(link):
youtube_video = link.split("=")
video_id = youtube_video[1]
#print(f""" This is the video ID: {video_id} and this is the Timestamp: {time_stamp}""")
return video_id
#print(f""" This is the video ID: {video_id} and no Timestamp was found""")
def seg_getter(data,ts,es):
starts = []
for line in data:
ccs = ast.literal_eval(line)
starts.append(float(ccs['start']))
#print(starts)
#ts_ = float(ts.strip("s&end"))
#es_ = float(es.strip(es[-1]))
if not(es) :
e_val = starts[-1]
else:
e_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(es)))]
t_val = starts[min(range(len(starts)), key = lambda i: abs(starts[i]-float(ts)))]
tid = starts.index(t_val)
eid = starts.index(e_val)
ts_list_len = len(starts[tid:eid])
return tid, ts_list_len
def get_cc(video_id):
try:
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
try:
# filter for manually created transcripts
transcript = transcript_list.find_manually_created_transcript(['en','en-US','en-GB','en-IN'])
except Exception as e:
# print(e)
transcript = None
manual = True
if not transcript:
try:
# or automatically generated ones
transcript = transcript_list.find_generated_transcript(['en'])
manual = False
except Exception as e:
# print(e)
transcript = None
if transcript:
if manual: file_name = os.path.join('transcripts', str(video_id) + "_cc_manual" + ".txt")
else: file_name = os.path.join('transcripts', str(video_id) + "_cc_auto" + ".txt")
with open(file_name, 'w') as file:
for line in transcript.fetch():
file.write(str(line).replace(r'\xa0', ' ').replace(r'\n', '') + '\n')
# print(f"CC downloaded in {file_name}")
return file_name
else:
#print("No transcript found")
return None
except Exception as e:
#print(e)
return None
def transcript_creator(filename,timestamp,end_pt):
#print(filename)
with open(filename, 'r') as f:
data = f.readlines()
#print("This is data: ", data)
transcripts = []
#print("this is ts: ",timestamp)
if not(timestamp) and not(end_pt):
#print("executing 1 ")
for line in data:
ccs = ast.literal_eval(line)
transcripts.append(ccs['text'])
return transcripts
elif not(timestamp) and end_pt :
timestamp = 0
start,lenlist = seg_getter(data, timestamp, end_pt)
for t in range(lenlist):
ccs = ast.literal_eval(data[start+t])
transcripts.append(ccs['text'])
return transcripts
else :
#print("executing 2")
start,lenlist = seg_getter(data,timestamp,end_pt)
#print(f""" This is the ts list{ts_len}""")
for t in range(lenlist):
ccs = ast.literal_eval(data[start+t])
transcripts.append(ccs['text'])
return transcripts
def transcript_collector(link,ts,es):
vid = id_ts_grabber(link)
print(f""" Fetching the transcript """)
filename = get_cc(vid)
return transcript_creator(filename, ts, es), vid
transcript = pd.DataFrame(columns=['text', 'video_id'])
transcript.loc[0,'text'],transcript.loc[0,'video_id'] = transcript_collector(link,start,end)
def segment(corpus):
text_data = [re.sub(r'\[.*?\]', '', x).strip() for x in corpus]
text_data = [x for x in text_data if x != '']
df = pd.DataFrame(text_data, columns=["utterance"])
# remove new line, tab, return
df["utterance"] = df["utterance"].apply(lambda x: x.replace("\n", " ").replace("\r", " ").replace("\t", " "))
# remove Nan
df.dropna(inplace=True)
sts = SemanticTextSegmentation(df)
texts = sts.get_segments()
return texts
sf = pd.DataFrame(columns=['Segmented_Text','video_id'])
text = segment(transcript.at[0,'text'])
for i in range(len(text)):
sf.loc[i, 'Segmented_Text'] = text[i]
sf.loc[i, 'video_id'] = transcript.at[0,'video_id']
def word_seg(text):
text = text.replace("\n", " ").replace("\r", " ").replace("\t", " ").replace("\xa0", " ")
results = sym_spell.word_segmentation(text, max_edit_distance=0)
texts = results.segmented_string
#result = re.sub(r'[^\w\s]', '',texts).lower()
return texts
for i in range(len(sf)):
#st.write(sf.at[i, 'Segmented_Text'])
sf.loc[i, 'Segmented_Text'] = word_seg(sf.at[i, 'Segmented_Text'])
sf.loc[i, 'Lengths'] = len(tokenizer(sf.at[i, 'Segmented_Text'])['input_ids'])
texts = pd.DataFrame(columns=['texts'])
def segment_loader(dataframe):
flag = 0
for i in range(len(dataframe)):
if flag > 0:
flag -= 1
continue
m = 512
iter = 0
texts.loc[i, 'texts'] = dataframe.at[i+iter, 'Segmented_Text']
length = dataframe.at[i+iter, 'Lengths']
texts.loc[i,'video_id'] = dataframe.at[i, 'video_id']
while i+iter < len(dataframe)-1 and dataframe.at[i, 'video_id'] == dataframe.at[i+iter+1, 'video_id']:
if length + dataframe.at[i + iter + 1, 'Lengths'] <= m :
texts.loc[i,'texts'] += " " + dataframe.at[i+iter+1, 'Segmented_Text']
length += dataframe.at[i+iter + 1,'Lengths']
iter += 1
else:
break
flag = iter
return texts
cleaned_text = segment_loader(sf)
cleaned_text.reset_index(drop=True, inplace=True)
return cleaned_text
def t5_summarizer(link,start, end):
input_text = clean_text(link,start,end)
lst_outputs = []
tokenizer1 = AutoTokenizer.from_pretrained("CareerNinja/t5_large_3e-4_on_v2_dataset")
model1 = AutoModelForSeq2SeqLM.from_pretrained("CareerNinja/t5_large_3e-4_on_v2_dataset")
summarizer1 = pipeline("summarization", model=model1, tokenizer=tokenizer1)
print(f""" Entered summarizer ! """)
st.write('Below is the summary of the given URL: ')
for i in range(len(input_text)):
summary = summarizer1(input_text.at[i,'texts'], min_length=64, max_length=128)
sumry = list(summary[0].values())
input_text.loc[i,'Generated Summary'] = sumry[0]
lst_outputs.append(sumry[0])
st.write(input_text.at[i,'Generated Summary'])
if i != len(input_text) - 1:
st.write('=====================================================================================')
return lst_outputs
def card_creator(path, text, y_value):
img = Image.open(path)
def text_wrap(text, font, max_width):
"""Wrap text base on specified width.
This is to enable text of width more than the image width to be display
nicely.
@params:
text: str
text to wrap
font: obj
font of the text
max_width: int
width to split the text with
@return
lines: list[str]
list of sub-strings
"""
lines = []
# If the text width is smaller than the image width, then no need to split
# just add it to the line list and return
if font.getsize(text)[0] <= max_width:
lines.append(text)
else:
#split the line by spaces to get words
words = text.split(' ')
i = 0
# append every word to a line while its width is shorter than the image width
while i < len(words):
line = ''
while i < len(words) and font.getsize(line + words[i])[0] <= max_width:
line = line + words[i]+ " "
i += 1
if not line:
line = words[i]
i += 1
lines.append(line)
return lines
font_path = 'Montserrat-Regular.ttf'
font = ImageFont.truetype(font=font_path, size=22)
lines = text_wrap(text, font, img.size[0] - 44)
line_height = font.getsize('hg')[1]
draw = ImageDraw.Draw(img)
#Draw text on image
color = 'rgb(255,255,255)' # white color
x = 22
y = y_value
for line in lines:
draw.text((x,y), line, fill=color, font=font)
y = y + line_height # update y-axis for new line
img.save("card.png")
st.image(img, caption="Summary Card")
def main():
if 'submitted' not in st.session_state:
st.session_state.submitted = False
if 'opt' not in st.session_state:
st.session_state.opt = []
def callback():
st.session_state.submitted = True
st.title('Video Summarizer')
url = st.text_input('Enter the Video Link')
start_pt = st.text_input('Enter the Start point in secs')
end_pt = st.text_input('Enter the end point in secs')
if (st.button("Submit URL", on_click=callback) and url) :
opt = t5_summarizer(url,start_pt,end_pt)
st.session_state.opt = opt
#st.write(st.session_state)
#text = st.text_input('Enter the Summary here to make a Summary Card.')
#text = st.selectbox('Select the summary you want to creat a card of ', opt, key="text")
#st.write('You selected:', option)
if st.session_state.submitted and st.session_state.opt:
text = st.selectbox('Select the summary you want to creat a card of ', st.session_state.opt)
option = st.selectbox('Which color template would you like to use ?',('Elf Green','Dark Pastel Green'))
if st.button("Generate Summary Card") and text and option:
if option == 'Elf Green':
if len(text) > 380 :
st.error('Summary is too long !')
else:
card_creator('iteration5_empty.png',text,335)
else :
if len(text) > 430 :
st.error('Summary is too long !')
else :
card_creator('X-93.png',text,285)
with open("card.png", "rb") as file:
btn = st.download_button(
label="Download card",
data=file,
file_name="card.png",
mime="image/png"
)
main()