import copy from pytube import YouTube from scipy.signal import resample import gradio as gr import numpy as np import pytsmod as tsm from moviepy.audio.AudioClip import AudioArrayClip from moviepy.editor import * from moviepy.video.fx.speedx import speedx from sentence_transformers import SentenceTransformer, util from transformers import pipeline, BertTokenizer, BertForNextSentencePrediction import torch import whisper transcriber = whisper.load_model("medium") sentence_transformer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') tokenizer = BertTokenizer.from_pretrained("bert-base-cased") next_sentence_predict = BertForNextSentencePrediction.from_pretrained("bert-base-cased").eval() summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum") def get_youtube(video_url): # YouTubeの動画をダウンロード print("Start download video") yt = YouTube(video_url) abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download(filename='download.mp4', output_path='./movies/') print("Success download video") print(abs_video_path) return abs_video_path def two_chnnel_to_one_channel(sample): # 音声を2チャンネルから1チャンネルに変換 left_channel = sample[:, 0] right_channel = sample[:, 1] mono_sample = (left_channel + right_channel) / 2 return mono_sample def convert_sample_rate(data, original_sr, target_sr): # 音声データのサンプリング周波数を変更 target_length = int(len(data) * target_sr / original_sr) return resample(data, target_length) def summarize_video(video_path, ratio_sum, playback_speed): print("Start summarize video") output_path = "./movies/output.mp4" movie_clip = VideoFileClip(video_path) audio_sampling_rate = movie_clip.audio.fps clip_audio = np.array(movie_clip.audio.to_soundarray()) # 文字の書き起こし audio_fp32 = convert_sample_rate(clip_audio, audio_sampling_rate, 16000) audio_fp32 = two_chnnel_to_one_channel(audio_fp32).astype(np.float32) transcription_results = transcriber.transcribe(audio_fp32) # 文の句切れごとにテキスト/発話時間をまとめる periods = ('.', '!', '?') clip_sentences = [] head_sentence = True for r in transcription_results['segments']: if head_sentence: start_time = r['start'] clip_sentences.append({'sentence':'', 'sentences':[], 'duration':[r['start'], None], 'durations':[]}) head_sentence = False clip_sentences[-1]['sentence'] += r['text'] clip_sentences[-1]['sentences'].append(r['text']) clip_sentences[-1]['durations'].append([r['start'], r['end']]) if r['text'].endswith(periods): clip_sentences[-1]['duration'][1] = r['end'] head_sentence = True # 文字の要約 transcription = transcription_results['text'] summary_text = summarizer(transcription, max_length=int(len(transcription)*0.1), min_length=int(len(transcription)*0.05), do_sample=False)[0]['summary_text'] print(summary_text) # 要約文と一致する文を判別 summary_embedings = [sentence_transformer.encode(s, convert_to_tensor=True) for s in summary_text.split('.')] important_sentence_idxs = [False]*len(clip_sentences) for s, clip_sentence in enumerate(clip_sentences): embedding = sentence_transformer.encode(clip_sentence['sentence'], convert_to_tensor=True) for s_e in summary_embedings: if util.pytorch_cos_sim(embedding, s_e) > ratio_sum: important_sentence_idxs[s] = True # となりの文と接続する文を判別 def next_prob(prompt, next_sentence, b=1.2): encoding = tokenizer(prompt, next_sentence, return_tensors="pt") logits = next_sentence_predict(**encoding, labels=torch.LongTensor([1])).logits pos = b ** logits[0, 0] neg = b ** logits[0, 1] return float(pos / (pos + neg)) connection_idxs = [False]*(len(clip_sentences)-1) for s in range(len(clip_sentences)-1): if next_prob(clip_sentences[s]['sentence'], clip_sentences[s+1]['sentence']) > 0.88: connection_idxs[s] = True # 要約後の文章のみ残す def combine_arrays(A, B): C = copy.deepcopy(A) for i in range(len(A)): if A[i]: j = i while j < len(B) and B[j]: C[j+1] = True j += 1 j = i while j > 0 and B[j-1]: C[j] = True j -= 1 return C important_idxs = combine_arrays(important_sentence_idxs, connection_idxs) # 要約後の文章がどこかを可視化 html_text = "

Full Transcription

" for idx in range(len(important_sentence_idxs)): seconds = clip_sentences[idx]['duration'][0] * (1/playback_speed) minutes = int(seconds // 60) remaining_seconds = str(seconds % 60) if important_idxs[idx]: html_text += '

' + f"{minutes}:{remaining_seconds[0]} | {clip_sentences[idx]['sentence']}

" else: html_text += f"

{minutes}:{remaining_seconds[0]} | {clip_sentences[idx]['sentence']}

" # 動画を結合 clips = [] for i in range(len(important_idxs)): if important_idxs[i]: tmp_clips = [] for j in range(len(clip_sentences[i]['sentences'])): start_time, end_time = clip_sentences[i]['durations'][j][0], clip_sentences[i]['durations'][j][1] if end_time > movie_clip.duration: end_time = movie_clip.duration clip = movie_clip.subclip(start_time, end_time) clip = clip.set_pos("center").set_duration(end_time-start_time) txt_clip = TextClip(clip_sentences[i]['sentences'][j], fontsize=int(movie_clip.w/40), color='white', bg_color='black', font='./fonts/Muller-Trial-Medium.ttf') txt_clip = txt_clip.set_duration(end_time-start_time).set_position(("center", "bottom")) clip = CompositeVideoClip([clip, txt_clip]) tmp_clips.append(clip) clips.append(concatenate_videoclips(tmp_clips)) # クリップをクロスディゾルブで結合 # for c in range(len(clips)-1): # fade_duration = 2 # clips[c] = clips[c].crossfadeout(fade_duration).audio_fadeout(fade_duration) # clips[c+1] = clips[c+1].crossfadein(fade_duration).audio_fadein(fade_duration) # 動画を結合し再生速度を変化させる final_video = concatenate_videoclips(clips, method="chain") final_video_audio = np.array(final_video.audio.to_soundarray(fps=audio_sampling_rate)) if playback_speed != 1: final_video_audio_fixed = tsm.wsola(final_video_audio, 1/playback_speed).T else: final_video_audio_fixed = final_video_audio final_video = speedx(final_video, factor=playback_speed) final_video = final_video.set_audio(AudioArrayClip(final_video_audio_fixed, fps=audio_sampling_rate)) # if final_video.duration > 30: # final_video = final_video.subclip(0, 30) final_video.write_videofile(output_path) print(output_path) print("Success summarize video") return output_path, summary_text, html_text # ---- Gradio Layout ----- youtube_url_in = gr.Textbox(label="Youtube url", lines=1, interactive=True) video_in = gr.Video(label="Input Video", mirror_webcam=False, interactive=True) video_out = gr.Video(label="Output Video") summary_text = gr.Textbox(label="Video Transcription Summary") transcription_text = gr.HTML(label="Full Transcription") demo = gr.Blocks() demo.encrypt = False with demo: gr.Markdown('''

FastPerson: Video summarization applied with transcription and text summarization

Video Summarization
''') with gr.Row(): gr.Markdown(''' ### Summarize video ##### Step 1a. Download video from youtube ##### Step 1b. You also can upload video directly ##### Step 2. Enter summary rate and playback speed ##### Step 3. Generating summarized video. ''') with gr.Row(): gr.Markdown(''' ### You can test by following examples: ''') examples = gr.Examples(examples= [ "https://www.youtube.com/watch?v=QghjaS0WQQU", "https://www.youtube.com/watch?v=cUS_22_lDiM", "https://www.youtube.com/watch?v=80yqL2KzBVw"], label="Examples", inputs=[youtube_url_in]) with gr.Column(): youtube_url_in.render() download_youtube_btn = gr.Button("Download Youtube video") download_youtube_btn.click(get_youtube, [youtube_url_in], [video_in]) print(video_in) with gr.Row(): ratio_sum = gr.Slider(label="Summarize Ratio", minimum=0.3, maximum=0.8, step=0.05, value=0.6) playback_speed = gr.Slider(label="Playback Speed", minimum=0.5, maximum=2.0, step=0.25, value=1.0) with gr.Row(): upload_output_video_btn = gr.Button("Summarize Video") upload_output_video_btn.click(summarize_video, [video_in, ratio_sum, playback_speed], [video_out, summary_text, transcription_text]) with gr.Row(): video_in.render() video_out.render() with gr.Row(): summary_text.render() with gr.Row(): transcription_text.render() demo.launch(debug=True, share=True)