Spaces:
Sleeping
Sleeping
import copy | |
from pytube import YouTube | |
from scipy.signal import resample | |
import gradio as gr | |
import numpy as np | |
import pytsmod as tsm | |
from moviepy.audio.AudioClip import AudioArrayClip | |
from moviepy.editor import * | |
from moviepy.video.fx.speedx import speedx | |
from sentence_transformers import SentenceTransformer, util | |
from transformers import pipeline, BertTokenizer, BertForNextSentencePrediction | |
import torch | |
import whisper | |
transcriber = whisper.load_model("medium") | |
sentence_transformer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2') | |
tokenizer = BertTokenizer.from_pretrained("bert-base-cased") | |
next_sentence_predict = BertForNextSentencePrediction.from_pretrained("bert-base-cased").eval() | |
summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum") | |
def get_youtube(video_url): | |
# YouTubeの動画をダウンロード | |
print("Start download video") | |
yt = YouTube(video_url) | |
abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download(filename='download.mp4', output_path='./movies/') | |
print("Success download video") | |
print(abs_video_path) | |
return abs_video_path | |
def two_chnnel_to_one_channel(sample): | |
# 音声を2チャンネルから1チャンネルに変換 | |
left_channel = sample[:, 0] | |
right_channel = sample[:, 1] | |
mono_sample = (left_channel + right_channel) / 2 | |
return mono_sample | |
def convert_sample_rate(data, original_sr, target_sr): | |
# 音声データのサンプリング周波数を変更 | |
target_length = int(len(data) * target_sr / original_sr) | |
return resample(data, target_length) | |
def summarize_video(video_path, ratio_sum, playback_speed): | |
print("Start summarize video") | |
output_path = "./movies/output.mp4" | |
movie_clip = VideoFileClip(video_path) | |
audio_sampling_rate = movie_clip.audio.fps | |
clip_audio = np.array(movie_clip.audio.to_soundarray()) | |
# 文字の書き起こし | |
audio_fp32 = convert_sample_rate(clip_audio, audio_sampling_rate, 16000) | |
audio_fp32 = two_chnnel_to_one_channel(audio_fp32).astype(np.float32) | |
transcription_results = transcriber.transcribe(audio_fp32) | |
# 文の句切れごとにテキスト/発話時間をまとめる | |
periods = ('.', '!', '?') | |
clip_sentences = [] | |
head_sentence = True | |
for r in transcription_results['segments']: | |
if head_sentence: | |
start_time = r['start'] | |
clip_sentences.append({'sentence':'', 'sentences':[], 'duration':[r['start'], None], 'durations':[]}) | |
head_sentence = False | |
clip_sentences[-1]['sentence'] += r['text'] | |
clip_sentences[-1]['sentences'].append(r['text']) | |
clip_sentences[-1]['durations'].append([r['start'], r['end']]) | |
if r['text'].endswith(periods): | |
clip_sentences[-1]['duration'][1] = r['end'] | |
head_sentence = True | |
# 文字の要約 | |
transcription = transcription_results['text'] | |
summary_text = summarizer(transcription, max_length=int(len(transcription)*0.1), min_length=int(len(transcription)*0.05), do_sample=False)[0]['summary_text'] | |
print(summary_text) | |
# 要約文と一致する文を判別 | |
summary_embedings = [sentence_transformer.encode(s, convert_to_tensor=True) for s in summary_text.split('.')] | |
important_sentence_idxs = [False]*len(clip_sentences) | |
for s, clip_sentence in enumerate(clip_sentences): | |
embedding = sentence_transformer.encode(clip_sentence['sentence'], convert_to_tensor=True) | |
for s_e in summary_embedings: | |
if util.pytorch_cos_sim(embedding, s_e) > ratio_sum: | |
important_sentence_idxs[s] = True | |
# となりの文と接続する文を判別 | |
def next_prob(prompt, next_sentence, b=1.2): | |
encoding = tokenizer(prompt, next_sentence, return_tensors="pt") | |
logits = next_sentence_predict(**encoding, labels=torch.LongTensor([1])).logits | |
pos = b ** logits[0, 0] | |
neg = b ** logits[0, 1] | |
return float(pos / (pos + neg)) | |
connection_idxs = [False]*(len(clip_sentences)-1) | |
for s in range(len(clip_sentences)-1): | |
if next_prob(clip_sentences[s]['sentence'], clip_sentences[s+1]['sentence']) > 0.88: | |
connection_idxs[s] = True | |
# 要約後の文章のみ残す | |
def combine_arrays(A, B): | |
C = copy.deepcopy(A) | |
for i in range(len(A)): | |
if A[i]: | |
j = i | |
while j < len(B) and B[j]: | |
C[j+1] = True | |
j += 1 | |
j = i | |
while j > 0 and B[j-1]: | |
C[j] = True | |
j -= 1 | |
return C | |
important_idxs = combine_arrays(important_sentence_idxs, connection_idxs) | |
# 要約後の文章がどこかを可視化 | |
html_text = "<h1 class='title'>Full Transcription</h1>" | |
for idx in range(len(important_sentence_idxs)): | |
seconds = clip_sentences[idx]['duration'][0] * (1/playback_speed) | |
minutes = int(seconds // 60) | |
remaining_seconds = str(seconds % 60) | |
if important_idxs[idx]: | |
html_text += '<p> <font color="#dc974e">' + f"{minutes}:{remaining_seconds[0]} | {clip_sentences[idx]['sentence']}</font> </p>" | |
else: | |
html_text += f"<p>{minutes}:{remaining_seconds[0]} | {clip_sentences[idx]['sentence']}</p>" | |
# 動画を結合 | |
clips = [] | |
for i in range(len(important_idxs)): | |
if important_idxs[i]: | |
tmp_clips = [] | |
for j in range(len(clip_sentences[i]['sentences'])): | |
start_time, end_time = clip_sentences[i]['durations'][j][0], clip_sentences[i]['durations'][j][1] | |
if end_time > movie_clip.duration: | |
end_time = movie_clip.duration | |
clip = movie_clip.subclip(start_time, end_time) | |
clip = clip.set_pos("center").set_duration(end_time-start_time) | |
txt_clip = TextClip(clip_sentences[i]['sentences'][j], fontsize=int(movie_clip.w/40), color='white', bg_color='black', font='./fonts/Muller-Trial-Medium.ttf') | |
txt_clip = txt_clip.set_duration(end_time-start_time).set_position(("center", "bottom")) | |
clip = CompositeVideoClip([clip, txt_clip]) | |
tmp_clips.append(clip) | |
clips.append(concatenate_videoclips(tmp_clips)) | |
# クリップをクロスディゾルブで結合 | |
# for c in range(len(clips)-1): | |
# fade_duration = 2 | |
# clips[c] = clips[c].crossfadeout(fade_duration).audio_fadeout(fade_duration) | |
# clips[c+1] = clips[c+1].crossfadein(fade_duration).audio_fadein(fade_duration) | |
# 動画を結合し再生速度を変化させる | |
final_video = concatenate_videoclips(clips, method="chain") | |
final_video_audio = np.array(final_video.audio.to_soundarray(fps=audio_sampling_rate)) | |
if playback_speed != 1: | |
final_video_audio_fixed = tsm.wsola(final_video_audio, 1/playback_speed).T | |
else: | |
final_video_audio_fixed = final_video_audio | |
final_video = speedx(final_video, factor=playback_speed) | |
final_video = final_video.set_audio(AudioArrayClip(final_video_audio_fixed, fps=audio_sampling_rate)) | |
# if final_video.duration > 30: | |
# final_video = final_video.subclip(0, 30) | |
final_video.write_videofile(output_path) | |
print(output_path) | |
print("Success summarize video") | |
return output_path, summary_text, html_text | |
# ---- Gradio Layout ----- | |
youtube_url_in = gr.Textbox(label="Youtube url", lines=1, interactive=True) | |
video_in = gr.Video(label="Input Video", mirror_webcam=False, interactive=True) | |
video_out = gr.Video(label="Output Video") | |
summary_text = gr.Textbox(label="Video Transcription Summary") | |
transcription_text = gr.HTML(label="Full Transcription") | |
demo = gr.Blocks() | |
demo.encrypt = False | |
with demo: | |
gr.Markdown(''' | |
<div style="text-align: center"> | |
<h1 style='text-align: center'>FastPerson: Video summarization applied with transcription and text summarization</h1> | |
<img src="https://user-images.githubusercontent.com/33136532/215362410-97727904-e1ca-408d-967e-f5798671405e.png" alt="Video Summarization"> | |
</div> | |
''') | |
with gr.Row(): | |
gr.Markdown(''' | |
### Summarize video | |
##### Step 1a. Download video from youtube | |
##### Step 1b. You also can upload video directly | |
##### Step 2. Enter summary rate and playback speed | |
##### Step 3. Generating summarized video. | |
''') | |
with gr.Row(): | |
gr.Markdown(''' | |
### You can test by following examples: | |
''') | |
examples = gr.Examples(examples= | |
[ "https://www.youtube.com/watch?v=QghjaS0WQQU", | |
"https://www.youtube.com/watch?v=cUS_22_lDiM", | |
"https://www.youtube.com/watch?v=80yqL2KzBVw"], | |
label="Examples", inputs=[youtube_url_in]) | |
with gr.Column(): | |
youtube_url_in.render() | |
download_youtube_btn = gr.Button("Download Youtube video") | |
download_youtube_btn.click(get_youtube, [youtube_url_in], [video_in]) | |
print(video_in) | |
with gr.Row(): | |
ratio_sum = gr.Slider(label="Summarize Ratio", minimum=0.3, maximum=0.8, step=0.05, value=0.6) | |
playback_speed = gr.Slider(label="Playback Speed", minimum=0.5, maximum=2.0, step=0.25, value=1.0) | |
with gr.Row(): | |
upload_output_video_btn = gr.Button("Summarize Video") | |
upload_output_video_btn.click(summarize_video, [video_in, ratio_sum, playback_speed], [video_out, summary_text, transcription_text]) | |
with gr.Row(): | |
video_in.render() | |
video_out.render() | |
with gr.Row(): | |
summary_text.render() | |
with gr.Row(): | |
transcription_text.render() | |
demo.launch(debug=True, share=True) |