Spaces:
Sleeping
Sleeping
import os | |
import ffmpeg | |
import uuid | |
import streamlit as st | |
import shutil | |
import logging | |
logging.basicConfig(level=logging.INFO) # Set the logging level | |
def log_info(message): | |
logging.info(message) | |
def log_error(error_message): | |
logging.error(error_message) | |
# Function to extract audio using ffmpeg | |
def convert_audio(input_file): | |
audio_file = f"{input_file}.wav" | |
ffmpeg.input(input_file).output(audio_file, ar=48000, ac=1).run() | |
return audio_file | |
# Function to split audio into segments | |
def split_audio(audio_file): | |
segment_pattern = os.path.splitext(audio_file)[0] + ".segmented_%03d.wav" | |
ffmpeg.input(audio_file).output(segment_pattern, f="segment", segment_time=300).run() | |
return segment_pattern | |
# Function to filter each audio segment | |
def filter_audio(segments): | |
filtered_dir = "temp-out" | |
os.makedirs(filtered_dir, exist_ok=True) | |
os.system(f"deepFilter -a 25 -o {filtered_dir} {segments.replace('%03d.wav', '')}*.wav") | |
os.system(f"rm {segments.replace('%03d.wav', '')}*.wav") | |
return filtered_dir | |
# Function to combine filtered segments using sox | |
def combine_segments(filtered_dir, output_file): | |
os.system(f"sox {filtered_dir}/*.wav {output_file}") | |
return output_file | |
# Function to check if input file is video | |
def is_video(input_file): | |
result = os.popen(f"ffprobe -v error -show_entries stream=codec_type -of default=noprint_wrappers=1:nokey=1 {input_file}").read() | |
return "video" in result.lower() | |
# Function to get format info for audio files | |
def get_format_info(input_file): | |
result = os.popen(f"ffprobe -v error -show_entries stream=codec_name -of default=noprint_wrappers=1:nokey=1 {input_file}").read() | |
return result | |
# Function to increase volume of audio with sox | |
def increase_volume(input_file): | |
os.system(f"sox -v 1.25 {input_file} {input_file}.vol.wav") | |
os.rename(f"{input_file}.vol.wav", input_file) | |
return input_file | |
def restore_metadata(input, output): | |
os.system(f"ffmpeg -y -i {input} -c copy -map_metadata 0 -f ffmetadata {input}.txt") | |
lines = open(f"{input}.txt").readlines() | |
if len(lines) > 1: | |
lines = [lines[0],] + [line for line in lines if any(x in line for x in ("title", "artist", "album", "track", "date", "comment"))] | |
open(f"{input}.txt", "w").writelines(lines) | |
os.system(f"ffmpeg -y -i {output} -f ffmetadata -i {input}.txt -c copy -map_metadata 1 fixed.{output}") | |
os.rename(f"fixed.{output}", output) | |
os.remove(f"{input}.txt") | |
return output | |
def override_metadata(output, title="", artist="", album=""): | |
lines = [";FFMETADATA1", f"title={title}", f"artist={artist}", f"album={album}"] | |
open(f"{output}.txt", "w").write("\n".join(lines) + "\n") | |
# strip existing metadata | |
os.system(f"ffmpeg -y -i {output} -map_metadata -1 -c:v copy -c:a copy temp.{output}") | |
# add new metadata | |
os.system(f"ffmpeg -y -i temp.{output} -f ffmetadata -i {output}.txt -c copy -map_metadata 1 {output}") | |
os.remove(f"temp.{output}") | |
os.remove(f"{output}.txt") | |
return output | |
def get_samplerate(input_file): | |
result = os.popen(f"ffprobe -v error -show_entries stream=sample_rate -of default=noprint_wrappers=1:nokey=1 {input_file}").read() | |
return int(result.strip()) | |
# Function to handle video or audio input | |
def process_input(input_file, convert_to_mp3=False): | |
try: | |
# extract audio from video and convert to wav | |
audio_file = convert_audio(input_file) | |
# split audio into smaller segments | |
segments = split_audio(audio_file) | |
# filter each segment | |
filtered_dir = filter_audio(segments) | |
# combine filtered segments | |
output_file = combine_segments(filtered_dir, f"{input_file}.filtered.wav") | |
shutil.rmtree(filtered_dir) # delete temp directory | |
# increase volume | |
output_file = increase_volume(output_file) | |
output_format = input_file.split(".")[-1] | |
if convert_to_mp3: | |
output_format = "mp3" | |
processed_file = f"filtered-{input_file}.{output_format}" | |
if output_format in ["mp4", "mkv", "webm", "avi", "mov", "flv", "wmv", "m4v"]: | |
audio_aac_file = f"filtered-{input_file}.aac" | |
ffmpeg.input(output_file).output(audio_aac_file, ar=44100, codec="aac").run() | |
video_stream = ffmpeg.input(input_file).video | |
audio_aac_stream = ffmpeg.input(audio_aac_file).audio | |
ffmpeg.output(video_stream, audio_aac_stream, processed_file, vcodec="copy", acodec="copy").run() | |
os.remove(audio_aac_file) | |
else: | |
sample_rate = 16000 if convert_to_mp3 else get_samplerate(input_file) | |
ffmpeg.input(output_file).output(processed_file, ac=1, ar=16000 if sample_rate < 16000 else sample_rate).run() | |
os.remove(output_file) | |
os.remove(audio_file) | |
restore_metadata(input_file, processed_file) | |
return processed_file | |
except Exception as e: | |
log_error(f"Error occurred during processing: {str(e)}") | |
return None | |
# Streamlit UI | |
st.title("Audio Enhancement with Deep Filter") | |
uploaded_file = st.file_uploader("Upload a file", type=["mp4", "wav", "mp3", "aac", "m4a", "flac", "ogg", "opus", "wma", "webm", "mkv"]) | |
if uploaded_file is not None: | |
try: | |
file_details = {"FileName": uploaded_file.name, "FileType": uploaded_file.type, "FileSize": uploaded_file.size} | |
st.write(file_details) | |
convert_to_mp3 = st.toggle("Convert to MP3", False) | |
title = st.text_input("Title", "") | |
artist = st.text_input("Artist", "") | |
album = st.text_input("Album", "") | |
args = { | |
"title": title, | |
"artist": artist, | |
"album": album, | |
} | |
if st.button("Process File"): | |
# generate a random file name to save the uploaded file | |
file_path = f"temp-{str(uuid.uuid4())[:8]}.{uploaded_file.name.split('.')[-1]}" | |
with open(file_path, "wb") as f: | |
f.write(uploaded_file.getbuffer()) | |
output_file_path = process_input(file_path, convert_to_mp3=convert_to_mp3) | |
# Override metadata if any | |
if title or artist or album: | |
override_metadata(output_file_path, title=title, artist=artist, album=album) | |
if output_file_path and os.path.exists(output_file_path): | |
data = open(output_file_path, "rb").read() | |
os.remove(file_path) | |
os.remove(output_file_path) | |
# uploaded file name without extension | |
download_name = "filtered-" + uploaded_file.name.split('.')[0] + "." + output_file_path.split(".")[-1] | |
st.success("File processed successfully!") | |
st.download_button(label="Download Processed File", | |
data=data, | |
file_name=download_name) | |
else: | |
st.error("File processing failed.") | |
except Exception as e: | |
log_error(f"Error occurred during file processing: {str(e)}") | |