|
from youtube_transcript_api import YouTubeTranscriptApi |
|
import re |
|
from PIL import Image |
|
import base64 |
|
|
|
|
|
|
|
|
|
|
|
def get_id_from_link(link): |
|
video_id = "" |
|
|
|
if "v=" in link: |
|
video_id = link.split("v=")[1].split("&")[0] |
|
return video_id |
|
elif len(link)==11: |
|
return video_id |
|
else: |
|
return "Error: Invalid Link." |
|
|
|
|
|
|
|
def get_json_transcript(video_id,rpunkt_switch): |
|
|
|
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id) |
|
|
|
|
|
raw_transcript = 'empty' |
|
type_transcript = [] |
|
if rpunkt_switch: |
|
try: |
|
transcript = transcript_list.find_generated_transcript(['en']) |
|
raw_transcript = transcript.fetch() |
|
type_transcript = ['en','generated'] |
|
except: |
|
transcript = transcript_list.find_transcript(['de']) |
|
raw_transcript = transcript.translate('en').fetch() |
|
type_transcript = ['en','translated'] |
|
else: |
|
transcript = transcript_list.find_transcript(['en','de']) |
|
raw_transcript = transcript.fetch() |
|
type_transcript = ['den','manual'] |
|
|
|
return raw_transcript, type_transcript |
|
|
|
|
|
def get_timestamps(transcript_raw): |
|
transcript_timestamps = '\n'.join([str(i['start']) for i in transcript_raw]) |
|
return transcript_timestamps.split('\n') |
|
|
|
|
|
def get_caption(transcript_raw): |
|
transcript_text = '\n'.join([i['text'].replace('\n',' ') for i in transcript_raw]) |
|
return transcript_text |
|
|
|
def replacePunctuatedText(raw_transcript, caption): |
|
list_caption = caption.split('\n') |
|
pnct_raw_transcript = raw_transcript |
|
|
|
for (idx, line) in enumerate(pnct_raw_transcript): |
|
line['text']=list_caption[idx] |
|
|
|
return pnct_raw_transcript |
|
|
|
def getSentences(raw_transcript): |
|
|
|
|
|
frm_cap = '' |
|
for (idx, line) in enumerate(raw_transcript, start=1): |
|
frm_cap = frm_cap+' #'+str(idx)+'# '+line['text'].replace('\n',' ').replace('\n',' ') |
|
|
|
|
|
dict_sentences = {} |
|
sentences = frm_cap.strip().split('. ') |
|
|
|
|
|
|
|
for idx,item in enumerate(sentences): |
|
m = re.search(r"#[^#]*#", item) |
|
if m is not None: |
|
match = m.group(0) |
|
frm = match.replace('#','') |
|
clean_match = re.sub('\s*#[^#]*#\s*',' ',item) + '.' |
|
if len(clean_match) > 20: |
|
dict_sentences[frm] = clean_match.strip() |
|
|
|
|
|
return dict_sentences |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def convertToJSON(dsl): |
|
workdir = './workdir/' |
|
cnt=1 |
|
json_rows = '[' |
|
for (key,val) in dsl.items(): |
|
image='frame_'+f"{int(cnt):04d}"+'.jpg' |
|
|
|
|
|
with open(workdir+image, 'rb') as open_file: |
|
byte_content = open_file.read() |
|
base64_bytes = base64.b64encode(byte_content) |
|
base64_string = base64_bytes.decode('utf-8') |
|
|
|
sentence = val |
|
row = '{"image_id": "'+str(cnt)+'",' |
|
row = row + '"timestamp": "'+key+'",' |
|
row = row + '"image": "'+base64_string+'",' |
|
row = row + '"caption": "'+sentence+'"},' |
|
json_rows = json_rows + row |
|
cnt = cnt+1 |
|
|
|
json_rows = json_rows[:-1] + ']' |
|
|
|
return json_rows |
|
|
|
|
|
def convertToHTML(dsl): |
|
|
|
workdir = '../workdir/' |
|
cnt=1 |
|
html_rows = '<table border=1>' |
|
html_rows = html_rows + '<tr><td>Image Nr.</td><td>Timestamp [sec]</td><td>Image</td><td>Caption</td>' |
|
for (key,val) in dsl.items(): |
|
image='frame_'+f"{int(cnt):04d}"+'.jpg' |
|
sentence = val |
|
row = '<tr><td>'+str(cnt)+'</td>' |
|
|
|
row = row +'<td>'+key+'</td>' |
|
row = row +'<td><a href='+workdir+image+'><img src="'+workdir+image+'" width=500></a></td>' |
|
row = row +'<td>'+sentence+'</td></tr>\n' |
|
html_rows = html_rows + row |
|
cnt = cnt+1 |
|
html_rows = html_rows + '</table>' |
|
|
|
|
|
filename='./workdir/output.html' |
|
with open(filename, 'w') as the_file: |
|
the_file.write(html_rows) |
|
|
|
return html_rows |
|
|
|
def getImages(dsl): |
|
images = [] |
|
workdir = 'workdir/' |
|
cnt=1 |
|
for (key,val) in dsl.items(): |
|
image='frame_'+f"{int(cnt):04d}"+'.jpg' |
|
image_path = workdir+image |
|
pil_im = Image.open(image_path) |
|
images.append(pil_im) |
|
cnt=cnt+1 |
|
|
|
return images |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def getTimestampAtFrameFromSummary(raw_transcript, dict_sentences,list_summary): |
|
dict_summary = {} |
|
for key, value in dict_sentences.items(): |
|
for sentence in list_summary: |
|
if str(sentence) in value: |
|
dict_summary[key]=value |
|
|
|
|
|
if len(list_summary) != len(dict_summary): |
|
err_msg = 'Error: Number of summarized sentences '+str(len(list_summary)) +' is not equal to the identified sentences '+str(len(dict_summary))+'.' |
|
print(err_msg) |
|
return err_msg |
|
|
|
dict_frame_timestamp = {} |
|
for (idx, line) in enumerate(raw_transcript, start=1): |
|
dict_frame_timestamp[str(idx)] = str(line['start']) |
|
|
|
sum_timestamps = [] |
|
for key in dict_summary.keys(): |
|
sum_timestamps.append(dict_frame_timestamp.get(key)) |
|
|
|
dict_timestamp_summary = {} |
|
for (idx,value) in enumerate(list_summary): |
|
timestamp = sum_timestamps[idx] |
|
dict_timestamp_summary[timestamp] = str(value) |
|
|
|
return dict_timestamp_summary |
|
|
|
|
|
def restore_cr(input_text, output_text): |
|
|
|
srt_file = input_text |
|
punctuated = output_text |
|
|
|
srt_file_strip=srt_file.strip() |
|
srt_file_sub=re.sub('\s*\n\s*','# ',srt_file_strip) |
|
srt_file_array=srt_file_sub.split(' ') |
|
pcnt_file_array=punctuated.split(' ') |
|
|
|
|
|
|
|
if len(srt_file_array)!=len(pcnt_file_array): |
|
return "AssertError: The length of the transcript and the punctuated file should be the same: ",len(srt_file_array),len(pcnt_file_array) |
|
pcnt_file_array_hash = [] |
|
for idx, item in enumerate(srt_file_array): |
|
if item.endswith('#'): |
|
pcnt_file_array_hash.append(pcnt_file_array[idx]+'#') |
|
else: |
|
pcnt_file_array_hash.append(pcnt_file_array[idx]) |
|
|
|
|
|
pcnt_file_cr=' '.join(pcnt_file_array_hash).replace('#','\n') |
|
|
|
return pcnt_file_cr |
|
|