fastperson / app.py
kwmr's picture
add first files
c0a6bf7
raw
history blame
No virus
10.1 kB
import copy
import subprocess
from pytube import YouTube
from scipy.signal import resample
import gradio as gr
import numpy as np
import pytsmod as tsm
from moviepy.audio.AudioClip import AudioArrayClip
from moviepy.editor import *
from moviepy.video.fx.speedx import speedx
from sentence_transformers import SentenceTransformer, util
from transformers import pipeline, BertTokenizer, BertForNextSentencePrediction
import torch
import whisper
subprocess.run(['apt-get', '-y', 'install', 'imagemagick'])
transcriber = whisper.load_model("medium")
sentence_transformer = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
tokenizer = BertTokenizer.from_pretrained("bert-base-cased")
next_sentence_predict = BertForNextSentencePrediction.from_pretrained("bert-base-cased").eval()
summarizer = pipeline("summarization", model="philschmid/bart-large-cnn-samsum")
root_dir = '/home/user/app/video'
def get_youtube(video_url):
# YouTubeの動画をダウンロード
print("Start download video")
yt = YouTube(video_url)
abs_video_path = yt.streams.filter(progressive=True, file_extension='mp4').order_by('resolution').desc().first().download(filename='download.mp4', output_path='movies/')
print("Success download video")
print(abs_video_path)
return abs_video_path
def two_chnnel_to_one_channel(sample):
# 音声を2チャンネルから1チャンネルに変換
left_channel = sample[:, 0]
right_channel = sample[:, 1]
mono_sample = (left_channel + right_channel) / 2
return mono_sample
def convert_sample_rate(data, original_sr, target_sr):
# 音声データのサンプリング周波数を変更
target_length = int(len(data) * target_sr / original_sr)
return resample(data, target_length)
def summarize_video(video_path, ratio_sum, playback_speed):
print("Start summarize video")
output_path = os.path.join(os.path.dirname(video_path), 'output.mp4')
movie_clip = VideoFileClip(video_path)
audio_sampling_rate = movie_clip.audio.fps
clip_audio = np.array(movie_clip.audio.to_soundarray())
# 文字の書き起こし
print("Start transcribing text")
audio_fp32 = convert_sample_rate(clip_audio, audio_sampling_rate, 16000)
audio_fp32 = two_chnnel_to_one_channel(audio_fp32).astype(np.float32)
transcription_results = transcriber.transcribe(audio_fp32)
# 文の句切れごとにテキスト/発話時間をまとめる
print("Start summarizing text/speech time")
periods = ('.', '!', '?')
clip_sentences = []
head_sentence = True
for r in transcription_results['segments']:
if head_sentence:
start_time = r['start']
clip_sentences.append({'sentence':'', 'sentences':[], 'duration':[r['start'], None], 'durations':[]})
head_sentence = False
clip_sentences[-1]['sentence'] += r['text']
clip_sentences[-1]['sentences'].append(r['text'])
clip_sentences[-1]['durations'].append([r['start'], r['end']])
if r['text'].endswith(periods):
clip_sentences[-1]['duration'][1] = r['end']
head_sentence = True
# 文字の要約
print("Start summarizing sentences")
transcription = transcription_results['text']
summary_text = summarizer(transcription, max_length=int(len(transcription)*0.1), min_length=int(len(transcription)*0.05), do_sample=False)[0]['summary_text']
print(summary_text)
# 要約文と一致する文を判別
print("Start deleting sentences that match the summary sentence")
summary_embedings = [sentence_transformer.encode(s, convert_to_tensor=True) for s in summary_text.split('.')]
important_sentence_idxs = [False]*len(clip_sentences)
for s, clip_sentence in enumerate(clip_sentences):
embedding = sentence_transformer.encode(clip_sentence['sentence'], convert_to_tensor=True)
for s_e in summary_embedings:
if util.pytorch_cos_sim(embedding, s_e) > ratio_sum:
important_sentence_idxs[s] = True
# となりの文と接続する文を判別
print("Start identifying sentences that are connected to the sentence next to it")
def next_prob(prompt, next_sentence, b=1.2):
encoding = tokenizer(prompt, next_sentence, return_tensors="pt")
logits = next_sentence_predict(**encoding, labels=torch.LongTensor([1])).logits
pos = b ** logits[0, 0]
neg = b ** logits[0, 1]
return float(pos / (pos + neg))
connection_idxs = [False]*(len(clip_sentences)-1)
for s in range(len(clip_sentences)-1):
if next_prob(clip_sentences[s]['sentence'], clip_sentences[s+1]['sentence']) > 0.88:
connection_idxs[s] = True
# 要約後の文章のみ残す
def combine_arrays(A, B):
C = copy.deepcopy(A)
for i in range(len(A)):
if A[i]:
j = i
while j < len(B) and B[j]:
C[j+1] = True
j += 1
j = i
while j > 0 and B[j-1]:
C[j] = True
j -= 1
return C
important_idxs = combine_arrays(important_sentence_idxs, connection_idxs)
# 要約後の文章がどこかを可視化
html_text = "<h1 class='title'>Full Transcription</h1>"
for idx in range(len(important_sentence_idxs)):
seconds = clip_sentences[idx]['duration'][0] * (1/playback_speed)
minutes, seconds = divmod(seconds, 60)
if important_idxs[idx]:
html_text += '<p> <b>' + f"{int(minutes)}:{int(seconds):02} | {clip_sentences[idx]['sentence']} </b> </p>"
else:
html_text += f"{int(minutes)}:{int(seconds):02} | {clip_sentences[idx]['sentence']}</p>"
print(html_text)
# 動画を結合
print("Start combine movies")
clips = []
for i in range(len(important_idxs)):
if important_idxs[i]:
tmp_clips = []
for j in range(len(clip_sentences[i]['sentences'])):
start_time, end_time = clip_sentences[i]['durations'][j][0], clip_sentences[i]['durations'][j][1]
if end_time > movie_clip.duration:
end_time = movie_clip.duration
if start_time > movie_clip.duration:
continue
clip = movie_clip.subclip(start_time, end_time)
clip = clip.set_pos("center").set_duration(end_time-start_time)
tmp_clips.append(clip)
clips.append(concatenate_videoclips(tmp_clips))
# クリップをクロスディゾルブで結合
# for c in range(len(clips)-1):
# fade_duration = 2
# clips[c] = clips[c].crossfadeout(fade_duration).audio_fadeout(fade_duration)
# clips[c+1] = clips[c+1].crossfadein(fade_duration).audio_fadein(fade_duration)
# 動画を結合し再生速度を変化させる
final_video = concatenate_videoclips(clips, method="chain")
final_video_audio = np.array(final_video.audio.to_soundarray(fps=audio_sampling_rate))
if playback_speed != 1:
final_video_audio_fixed = tsm.wsola(final_video_audio, 1/playback_speed).T
else:
final_video_audio_fixed = final_video_audio
final_video = speedx(final_video, factor=playback_speed)
final_video = final_video.set_audio(AudioArrayClip(final_video_audio_fixed, fps=audio_sampling_rate))
# if final_video.duration > 30:
# final_video = final_video.subclip(0, 30)
final_video.write_videofile(output_path)
print(output_path)
print("Success summarize video")
return output_path, summary_text, html_text
# ---- Gradio Layout -----
youtube_url_in = gr.Textbox(label="Youtube url", lines=1, interactive=True)
video_in = gr.Video(label="Input Video", mirror_webcam=False, interactive=True)
video_out = gr.Video(label="Output Video")
summary_text = gr.Textbox(label="Video Transcription Summary")
transcription_text = gr.HTML(label="Full Transcription")
demo = gr.Blocks()
demo.encrypt = False
with demo:
gr.Markdown('''
<div style="text-align: center">
<h1 style='text-align: center'>FastPerson: Video summarization applied with transcription and text summarization</h1>
<img src="https://user-images.githubusercontent.com/33136532/215362410-97727904-e1ca-408d-967e-f5798671405e.png" alt="Video Summarization">
</div>
''')
with gr.Row():
gr.Markdown('''
### Summarize video
##### Step 1a. Download video from youtube
##### Step 1b. You also can upload video directly
##### Step 2. Enter summary rate and playback speed
##### Step 3. Generating summarized video.
''')
with gr.Row():
gr.Markdown('''
### You can test by following examples:
''')
examples = gr.Examples(examples=
[ "https://www.youtube.com/watch?v=QghjaS0WQQU",
"https://www.youtube.com/watch?v=cUS_22_lDiM",
"https://www.youtube.com/watch?v=80yqL2KzBVw"],
label="Examples", inputs=[youtube_url_in])
with gr.Column():
youtube_url_in.render()
download_youtube_btn = gr.Button("Download Youtube video")
download_youtube_btn.click(get_youtube, [youtube_url_in], [video_in])
print(video_in)
with gr.Row():
ratio_sum = gr.Slider(label="Summarize Ratio", minimum=0.3, maximum=0.8, step=0.05, value=0.6)
playback_speed = gr.Slider(label="Playback Speed", minimum=0.5, maximum=2.0, step=0.25, value=1.0)
with gr.Row():
upload_output_video_btn = gr.Button("Summarize Video")
upload_output_video_btn.click(summarize_video, [video_in, ratio_sum, playback_speed], [video_out, summary_text, transcription_text])
with gr.Row():
video_in.render()
video_out.render()
with gr.Row():
summary_text.render()
with gr.Row():
transcription_text.render()
demo.launch(debug=True)