import copy import subprocess from pytube import YouTube from scipy.signal import resample import gradio as gr import numpy as np import pytsmod as tsm from moviepy.audio.AudioClip import AudioArrayClip from moviepy.editor import * from moviepy.video.fx.speedx import speedx from sentence_transformers import SentenceTransformer, util from transformers import pipeline, BertTokenizer, BertForNextSentencePrediction import torch import whisper subprocess.run(['apt-get', '-y', 'install', 'imagemagick']) transcriber = whisper.load_model("medium") sentence_transformer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') tokenizer = BertTokenizer.from_pretrained("bert-base-cased") next_sentence_predict = BertForNextSentencePrediction.from_pretrained("bert-base-cased").eval() summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum") root_dir = '/home/user/app/video' def get_youtube(video_url): # YouTubeの動画をダウンロード print("Start download video") yt = YouTube(video_url) abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download(filename='download.mp4', output_path='movies/') print("Success download video") print(abs_video_path) return abs_video_path def two_chnnel_to_one_channel(sample): # 音声を2チャンネルから1チャンネルに変換 left_channel = sample[:, 0] right_channel = sample[:, 1] mono_sample = (left_channel + right_channel) / 2 return mono_sample def convert_sample_rate(data, original_sr, target_sr): # 音声データのサンプリング周波数を変更 target_length = int(len(data) * target_sr / original_sr) return resample(data, target_length) def summarize_video(video_path, ratio_sum, playback_speed): print("Start summarize video") output_path = os.path.join(os.path.dirname(video_path), 'output.mp4') movie_clip = VideoFileClip(video_path) audio_sampling_rate = movie_clip.audio.fps clip_audio = np.array(movie_clip.audio.to_soundarray()) # 文字の書き起こし print("Start transcribing text") audio_fp32 = convert_sample_rate(clip_audio, audio_sampling_rate, 16000) audio_fp32 = two_chnnel_to_one_channel(audio_fp32).astype(np.float32) transcription_results = transcriber.transcribe(audio_fp32) # 文の句切れごとにテキスト/発話時間をまとめる print("Start summarizing text/speech time") periods = ('.', '!', '?') clip_sentences = [] head_sentence = True for r in transcription_results['segments']: if head_sentence: start_time = r['start'] clip_sentences.append({'sentence':'', 'sentences':[], 'duration':[r['start'], None], 'durations':[]}) head_sentence = False clip_sentences[-1]['sentence'] += r['text'] clip_sentences[-1]['sentences'].append(r['text']) clip_sentences[-1]['durations'].append([r['start'], r['end']]) if r['text'].endswith(periods): clip_sentences[-1]['duration'][1] = r['end'] head_sentence = True # 文字の要約 print("Start summarizing sentences") transcription = transcription_results['text'] summary_text = summarizer(transcription, max_length=int(len(transcription)*0.1), min_length=int(len(transcription)*0.05), do_sample=False)[0]['summary_text'] print(summary_text) # 要約文と一致する文を判別 print("Start deleting sentences that match the summary sentence") summary_embedings = [sentence_transformer.encode(s, convert_to_tensor=True) for s in summary_text.split('.')] important_sentence_idxs = [False]*len(clip_sentences) for s, clip_sentence in enumerate(clip_sentences): embedding = sentence_transformer.encode(clip_sentence['sentence'], convert_to_tensor=True) for s_e in summary_embedings: if util.pytorch_cos_sim(embedding, s_e) > ratio_sum: important_sentence_idxs[s] = True # となりの文と接続する文を判別 print("Start identifying sentences that are connected to the sentence next to it") def next_prob(prompt, next_sentence, b=1.2): encoding = tokenizer(prompt, next_sentence, return_tensors="pt") logits = next_sentence_predict(**encoding, labels=torch.LongTensor([1])).logits pos = b ** logits[0, 0] neg = b ** logits[0, 1] return float(pos / (pos + neg)) connection_idxs = [False]*(len(clip_sentences)-1) for s in range(len(clip_sentences)-1): if next_prob(clip_sentences[s]['sentence'], clip_sentences[s+1]['sentence']) > 0.88: connection_idxs[s] = True # 要約後の文章のみ残す def combine_arrays(A, B): C = copy.deepcopy(A) for i in range(len(A)): if A[i]: j = i while j < len(B) and B[j]: C[j+1] = True j += 1 j = i while j > 0 and B[j-1]: C[j] = True j -= 1 return C important_idxs = combine_arrays(important_sentence_idxs, connection_idxs) # 要約後の文章がどこかを可視化 html_text = "

Full Transcription

" for idx in range(len(important_sentence_idxs)): seconds = clip_sentences[idx]['duration'][0] * (1/playback_speed) minutes, seconds = divmod(seconds, 60) if important_idxs[idx]: html_text += '

' + f"{int(minutes)}:{int(seconds):02} | {clip_sentences[idx]['sentence']}

" else: html_text += f"{int(minutes)}:{int(seconds):02} | {clip_sentences[idx]['sentence']}

" print(html_text) # 動画を結合 print("Start combine movies") clips = [] for i in range(len(important_idxs)): if important_idxs[i]: tmp_clips = [] for j in range(len(clip_sentences[i]['sentences'])): start_time, end_time = clip_sentences[i]['durations'][j][0], clip_sentences[i]['durations'][j][1] if end_time > movie_clip.duration: end_time = movie_clip.duration if start_time > movie_clip.duration: continue clip = movie_clip.subclip(start_time, end_time) clip = clip.set_pos("center").set_duration(end_time-start_time) tmp_clips.append(clip) clips.append(concatenate_videoclips(tmp_clips)) # クリップをクロスディゾルブで結合 # for c in range(len(clips)-1): # fade_duration = 2 # clips[c] = clips[c].crossfadeout(fade_duration).audio_fadeout(fade_duration) # clips[c+1] = clips[c+1].crossfadein(fade_duration).audio_fadein(fade_duration) # 動画を結合し再生速度を変化させる final_video = concatenate_videoclips(clips, method="chain") final_video_audio = np.array(final_video.audio.to_soundarray(fps=audio_sampling_rate)) if playback_speed != 1: final_video_audio_fixed = tsm.wsola(final_video_audio, 1/playback_speed).T else: final_video_audio_fixed = final_video_audio final_video = speedx(final_video, factor=playback_speed) final_video = final_video.set_audio(AudioArrayClip(final_video_audio_fixed, fps=audio_sampling_rate)) # if final_video.duration > 30: # final_video = final_video.subclip(0, 30) final_video.write_videofile(output_path) print(output_path) print("Success summarize video") return output_path, summary_text, html_text # ---- Gradio Layout ----- youtube_url_in = gr.Textbox(label="Youtube url", lines=1, interactive=True) video_in = gr.Video(label="Input Video", mirror_webcam=False, interactive=True) video_out = gr.Video(label="Output Video") summary_text = gr.Textbox(label="Video Transcription Summary") transcription_text = gr.HTML(label="Full Transcription") demo = gr.Blocks() demo.encrypt = False with demo: gr.Markdown('''

FastPerson: Video summarization applied with transcription and text summarization

Video Summarization
''') with gr.Row(): gr.Markdown(''' ### Summarize video ##### Step 1a. Download video from youtube ##### Step 1b. You also can upload video directly ##### Step 2. Enter summary rate and playback speed ##### Step 3. Generating summarized video. ''') with gr.Row(): gr.Markdown(''' ### You can test by following examples: ''') examples = gr.Examples(examples= [ "https://www.youtube.com/watch?v=QghjaS0WQQU", "https://www.youtube.com/watch?v=cUS_22_lDiM", "https://www.youtube.com/watch?v=80yqL2KzBVw"], label="Examples", inputs=[youtube_url_in]) with gr.Column(): youtube_url_in.render() download_youtube_btn = gr.Button("Download Youtube video") download_youtube_btn.click(get_youtube, [youtube_url_in], [video_in]) print(video_in) with gr.Row(): ratio_sum = gr.Slider(label="Summarize Ratio", minimum=0.3, maximum=0.8, step=0.05, value=0.6) playback_speed = gr.Slider(label="Playback Speed", minimum=0.5, maximum=2.0, step=0.25, value=1.0) with gr.Row(): upload_output_video_btn = gr.Button("Summarize Video") upload_output_video_btn.click(summarize_video, [video_in, ratio_sum, playback_speed], [video_out, summary_text, transcription_text]) with gr.Row(): video_in.render() video_out.render() with gr.Row(): summary_text.render() with gr.Row(): transcription_text.render() demo.launch(debug=True)