File size: 8,119 Bytes
837fdb6 8b34532 837fdb6 ee90b6b 837fdb6 d0ccaa2 ee90b6b 77eaf9b 837fdb6 d0ccaa2 ee90b6b d0ccaa2 ee90b6b 837fdb6 6045325 23c56a3 6045325 23c56a3 c05de66 23c56a3 6045325 161984b 23c56a3 161984b 6045325 837fdb6 67a102e 8c0bdb2 837fdb6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 |
from youtube_transcript_api import YouTubeTranscriptApi
import re
from PIL import Image
import base64
#transcript_list = YouTubeTranscriptApi.list_transcripts('ReHGSGwV4-A')
#transcript = transcript_list.find_transcript(['en','de'])
def get_id_from_link(link):
video_id = ""
if "v=" in link:
video_id = link.split("v=")[1].split("&")[0]
return video_id
elif len(link)==11:
return link
else:
return "Error: Invalid Link."
# step 1: download the json transcript for youtube video
def get_json_transcript(video_id,rpunkt_switch):
transcript_list = YouTubeTranscriptApi.list_transcripts(video_id)
# get the auto-generated english text
# if it is not available translate to en
raw_transcript = 'empty'
type_transcript = []
if rpunkt_switch:
try:
transcript = transcript_list.find_generated_transcript(['en'])
raw_transcript = transcript.fetch()
type_transcript = ['en','generated']
except:
transcript = transcript_list.find_transcript(['de'])
raw_transcript = transcript.translate('en').fetch()
type_transcript = ['en','translated']
else:
transcript = transcript_list.find_transcript(['en','de'])
raw_transcript = transcript.fetch()
type_transcript = ['den','manual']
return raw_transcript, type_transcript
# step 2: extract timestamps from json transcript
def get_timestamps(transcript_raw):
transcript_timestamps = '\n'.join([str(i['start']) for i in transcript_raw])
return transcript_timestamps.split('\n')
# step 3: extract text from transcript
def get_caption(transcript_raw):
transcript_text = '\n'.join([i['text'].replace('\n',' ') for i in transcript_raw])
return transcript_text
def replacePunctuatedText(raw_transcript, caption):
list_caption = caption.split('\n')
pnct_raw_transcript = raw_transcript
for (idx, line) in enumerate(pnct_raw_transcript):
line['text']=list_caption[idx]
return pnct_raw_transcript
def getSentences(raw_transcript):
# walk over each frame and extract the time stamp and the text
# the time stamp is wrapped in hash tag signs
frm_cap = ''
for (idx, line) in enumerate(raw_transcript, start=1):
frm_cap = frm_cap+' #'+str(idx)+'# '+line['text'].replace('\n',' ').replace('\n',' ')
dict_sentences = {}
sentences = frm_cap.strip().split('. ')
# small sentences that do not have an own frame are dropped
# sentences that are less than 20 letters large are dropped, too
# this is useful, so that lexrank does not picks the short sentences
for idx,item in enumerate(sentences):
m = re.search(r"#[^#]*#", item)
if m is not None:
match = m.group(0)
frm = match.replace('#','')
clean_match = re.sub('\s*#[^#]*#\s*',' ',item) + '.'
if len(clean_match) > 20:
dict_sentences[frm] = clean_match.strip()
return dict_sentences
# split all sentences into an array
# remove all timestamps in the middle of the sentences
# leave only the timestamps at the beginning of each sentence
# restore the full-stop sign at the end of each sentence, that was removed in the split step
#chops = ''
#for item in sl.strip().split('. '):
# chops = chops + re.sub('\s*#[^#]*#\s*',' ',item) + '. '
#chops
# remove all remaining hash tags
#dsl={}
#for item in chops.split('. #'):
# elem= item.split('# ')
# idx = elem[0].replace('#','')
# sentence = elem[1]+'.'
# dsl[idx] = sentence
#return dsl
def convertToJSON(dsl):
workdir = './workdir/'
cnt=1
json_rows = '['
for (key,val) in dsl.items():
image='frame_'+f"{int(cnt):04d}"+'.jpg'
# open image and convert it to base64 image
with open(workdir+image, 'rb') as open_file:
byte_content = open_file.read()
base64_bytes = base64.b64encode(byte_content)
base64_string = base64_bytes.decode('utf-8')
sentence = val
row = '{"image_id": "'+str(cnt)+'",'
row = row + '"timestamp": "'+key+'",'
row = row + '"image": "'+base64_string+'",'
row = row + '"caption": "'+sentence+'"},'
json_rows = json_rows + row
cnt = cnt+1
# remove the comma from the last item
json_rows = json_rows[:-1] + ']'
return json_rows
def convertToHTML(dsl):
#workdir = 'file/workdir/'
workdir = '../workdir/'
cnt=1
html_rows = '<table border=1>'
html_rows = html_rows + '<tr><td>Image Nr.</td><td>Timestamp [sec]</td><td>Image</td><td>Caption</td>'
for (key,val) in dsl.items():
image='frame_'+f"{int(cnt):04d}"+'.jpg'
sentence = val
row = '<tr><td>'+str(cnt)+'</td>'
#row = row +'<td>'+f"{int(key):04d}"+'</td>'
row = row +'<td>'+key+'</td>'
row = row +'<td><a href='+workdir+image+'><img src="'+workdir+image+'" width=500></a></td>'
row = row +'<td>'+sentence+'</td></tr>\n'
html_rows = html_rows + row
cnt = cnt+1
html_rows = html_rows + '</table>'
filename='./workdir/output.html'
with open(filename, 'w') as the_file:
the_file.write(html_rows)
return html_rows
def getImages(dsl):
images = []
workdir = 'workdir/'
cnt=1
for (key,val) in dsl.items():
image='frame_'+f"{int(cnt):04d}"+'.jpg'
image_path = workdir+image
pil_im = Image.open(image_path)
images.append(pil_im)
cnt=cnt+1
return images
# 1.
# dict_sentences contains all sentences with the frame-nr
# list_summary contains the summed sentences
# the task is to find for all summarized sentences the corresponding frame-nr
# 2.
# dict_frame_timestamp contains a mapping of frames to the timestamps
# 3.
# it is used to construct the sum_timestamps list of the timestamps for each summarized sentence
def getTimestampAtFrameFromSummary(raw_transcript, dict_sentences,list_summary):
dict_summary = {}
for key, value in dict_sentences.items():
for sentence in list_summary:
if str(sentence) in value:
dict_summary[key]=value
# sanity check, if the number of summarized sentences was found
if len(list_summary) != len(dict_summary):
err_msg = 'Error: Number of summarized sentences '+str(len(list_summary)) +' is not equal to the identified sentences '+str(len(dict_summary))+'.'
print(err_msg)
return err_msg
dict_frame_timestamp = {}
for (idx, line) in enumerate(raw_transcript, start=1):
dict_frame_timestamp[str(idx)] = str(line['start'])
sum_timestamps = []
for key in dict_summary.keys():
sum_timestamps.append(dict_frame_timestamp.get(key))
dict_timestamp_summary = {}
for (idx,value) in enumerate(list_summary):
timestamp = sum_timestamps[idx]
dict_timestamp_summary[timestamp] = str(value)
return dict_timestamp_summary
def restore_cr(input_text, output_text):
# restore the carrige returns
srt_file = input_text
punctuated = output_text
srt_file_strip=srt_file.strip()
srt_file_sub=re.sub('\s*\n\s*','# ',srt_file_strip)
srt_file_array=srt_file_sub.split(' ')
pcnt_file_array=punctuated.split(' ')
# goal: restore the break points i.e. the same number of lines as the srt file
# this is necessary, because each line in the srt file corresponds to a frame from the video
if len(srt_file_array)!=len(pcnt_file_array):
return "AssertError: The length of the transcript and the punctuated file should be the same: ",len(srt_file_array),len(pcnt_file_array)
pcnt_file_array_hash = []
for idx, item in enumerate(srt_file_array):
if item.endswith('#'):
pcnt_file_array_hash.append(pcnt_file_array[idx]+'#')
else:
pcnt_file_array_hash.append(pcnt_file_array[idx])
# assemble the array back to a string
pcnt_file_cr=' '.join(pcnt_file_array_hash).replace('#','\n')
return pcnt_file_cr
|