saylss / app-v1.py
Taino's picture
Rename app.py to app-v1.py
ca02250
import torch
import pytube as pt
from transformers import pipeline
import json
import whisper_timestamped as whispertime
from pydub import AudioSegment
from spleeter.separator import Separator
import os
from profanity_check import predict
import sys
import tempfile
import uuid
import shutil
import json
import streamlit as st
# CORE #
MODEL_NAME = "openai/whisper-large-v2"
PROFANE_WORDS = ["Fuck", "fuck","f***","s***", "b****", "c***", "h**","n*****","f*****","p****", "dick", "slit", "slut", "pussy", "ass", "fucking", "fuckin", "pussy."]
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
def create_tmp_copy_of_file(file, dir=None):
"""
Creates a temporary copy of the file and returns the path to the copy.
:param file: the path to the file
:param dir: optional directory to place the copy in
:return: path to the temporary copy
"""
if isinstance(file, dict):
file_path = file["path"]
else:
file_path = file
if dir is None:
dir = tempfile.gettempdir()
file_name = os.path.basename(file_path)
tmp_path = os.path.join(dir, f"{str(uuid.uuid4())}_{file_name}")
shutil.copy2(file_path, tmp_path)
return json.dumps(tmp_path).strip('"')
def source_separation(input_file, output_folder="separated_audio"):
separator = Separator('spleeter:2stems')
separator.separate_to_file(input_file, output_folder)
return f"{output_folder}/{os.path.splitext(os.path.basename(input_file))[0]}"
def process_audio(input_file, model_size='tiny', verbose=False, play_output=False):
if not os.path.isfile(input_file):
print('Error: input file not found')
sys.exit()
stems_dir = source_separation(input_file)
vocal_stem = os.path.join(stems_dir, 'vocals.wav')
instr_stem = os.path.join(stems_dir, 'accompaniment.wav')
model = whispertime.load_model(model_size, device=device)
result = whispertime.transcribe(model, vocal_stem, language="en")
if verbose:
print('\nTranscribed text:')
print(result['text']+'\n')
print(result["text"])
profane_indices = predict(result["text"].split())
profanities = [word for word, is_profane in zip(result["text"].split(), profane_indices) if is_profane]
if not profanities:
print(f'No profanities detected found in {input_file} - exiting')
# sys.exit()
if verbose:
print('Profanities found in text:')
print(profanities)
vocals = AudioSegment.from_wav(vocal_stem)
segments = result["segments"]
for segment in segments:
words = segment["words"]
for word in words:
if word["text"].lower() in PROFANE_WORDS:
start_time = int(word["start"] * 1000)
end_time = int(word["end"] * 1000)
silence = AudioSegment.silent(duration=(end_time - start_time))
vocals = vocals[:start_time] + silence + vocals[end_time:]
mix = AudioSegment.from_wav(instr_stem).overlay(vocals)
print("#### \n\n" + input_file)
outpath = input_file.replace('.mp3', '_masked.mp3').replace('.wav', '_masked.wav')
print("#### \n\n" + outpath)
# if input_file.endswith('.wav'):
# mix.export(outpath, format="wav")
# elif input_file.endswith('.mp3'):
final_mix = mix.export(outpath, format="wav")
print(f'Mixed file written to: {outpath}')
# out = create_tmp_copy_of_file(outpath)
print('\n Returning final mix: ', final_mix)
return outpath
# try getting it to work just returning the transcribed text
# return result["text"]
def transcribe(microphone=None, file_upload=None):
if (microphone is not None) and (file_upload is not None):
warn_output = (
"WARNING: You've uploaded an audio file and used the microphone. "
"The recorded file from the microphone will be used and the uploaded audio will be discarded.\n"
)
elif (microphone is None) and (file_upload is None):
return "ERROR: You have to e~ither use the microphone or upload an audio file"
file = microphone if microphone is not None else file_upload
processed_file = process_audio(file)
print('File sucessfully processed:, ', processed_file)
# audio = AudioSegment.from_file(processed_file, format="wav").export()
audio = processed_file
return str(audio)
def _return_yt_html_embed(yt_url):
video_id = yt_url.split("?v=")[-1]
HTML_str = (
f'<center> <iframe width="500" height="320" src="https://www.youtube.com/embed/{video_id}"> </iframe>'
" </center>"
)
return HTML_str
def yt_transcribe(yt_url):
yt = pt.YouTube(yt_url)
html_embed_str = _return_yt_html_embed(yt_url)
stream = yt.streams.filter(only_audio=True)[0]
stream.download(filename="audio.mp3")
processed_file = process_audio("audio.mp3")
audio = AudioSegment.from_file(processed_file, format="mp3")
return html_embed_str, audio
# STREAMLIT #
import streamlit as st
st.title("Saylss - remove profane audio from uploaded content")
f"""
Saylss censors profane audio inputs with the click of a button! Saylss uses a custom Whisper model to timestamp at the word level and then several audio manipulation libraries to process and return a clean version.
by olmec.ai & batman.
"""
tab1, tab2 = st.tabs(["Transcribe Audio", "Transcribe YouTube"])
with tab1: # file upload
uploaded_files = st.file_uploader("Upload your audio file here", type=["mp3", "wav"], help="Drag and drop or click to choose file")
if uploaded_files is not None:
bytes_data = uploaded_files.read()
st.write("Your uploaded file")
st.audio(bytes_data)
# format can be specified, default is wav
# st.audio(bytes_data, format="audio/mp3")
st.markdown("---")
st.write("## Your processed file")
with st.spinner("...is being processed"):
# uploaded file is stored in RAM, so save it to a file to pass into `transcribe`
with open(uploaded_files.name, "wb") as f:
f.write((uploaded_files).getbuffer())
processed_audio = transcribe(microphone=None, file_upload=uploaded_files.name)
audio_file = open(processed_audio, 'rb')
audio_bytes2 = audio_file.read()
st.audio(audio_bytes2)
with tab2: # youtube
link = st.text_input("Paste your YouTube link", placeholder="https://www.youtube.com/watch?v=EuEe3WKpbCo")
if link != "":
try:
st.video(link)
except:
st.warning("Not a video")
st.stop()
with st.spinner("YouTube link is being processed"):
html_embed_str, audio = yt_transcribe(link)
audio_file = open(audio, 'rb')
audio_bytes_yt = audio_file.read()
st.audio(audio_bytes_yt)