Spaces:
Running
Running
import streamlit as st | |
import ffmpeg | |
import os | |
import time | |
from PIL import Image | |
import re | |
import tempfile | |
import shutil | |
import threading | |
def delete_temp_dir(directory, delay=900): # 15 minutes = 900 seconds | |
timer = threading.Timer(delay, shutil.rmtree, [directory]) | |
timer.start() | |
st.set_page_config(layout="wide", page_title="Video Conversion Tool") | |
# Supported formats | |
supported_formats = [data.upper() for data in (sorted(['3GP', 'ASF', 'AVI', 'DIVX', 'FLV', 'M2TS', 'M4V', 'MKV', 'MOV', 'MP4', 'MPEG', 'MPG', 'MTS', 'TS', 'VOB', 'WEBM', 'WMV', 'XVID'])) if data not in ['3GP', 'DIVX', 'XVID']] | |
audio_formats = [data.upper() for data in (sorted(['MP3', 'WAV', 'AAC', 'FLAC', 'OGG', 'M4A', 'ALAC', 'WMA', 'AIFF', 'OPUS', 'APE', 'CAF', 'PCM', 'DTS', 'TTA', 'AMR', 'MID', 'SPX', 'WV', 'RA', 'TAK'])) if data not in ['ALC', 'AMR', 'APE', 'DTS', 'MID', 'PCM', 'RA', 'TAK']] | |
gif_formats = ['GIF'] | |
image_formats = [data.upper() for data in sorted(Image.SAVE.keys() or ['BLP', 'BMP', 'BUFR', 'DDS', 'DIB', 'EPS', 'GIF', 'GRIB', 'HDF5', 'ICNS', 'ICO', 'IM', | |
'JPEG', 'JPEG2000', 'MPO', 'MSP', 'PALM', 'PCX', 'PDF', 'PNG', 'PPM', 'SGI', 'SPIDER', | |
'TGA', 'TIFF', 'WEBP', 'WMX', 'XBM'])] | |
CACHE_DIR = tempfile.mkdtemp() | |
delete_temp_dir(CACHE_DIR, delay=900) # Clean up cache after 15 minutes | |
def sanitize_filename(filename): | |
"""Sanitize filename by removing special characters and spaces.""" | |
return re.sub(r'[^a-zA-Z0-9_.-]', '_', filename) | |
def clean_cache(): | |
"""Remove files in the cache directory older than 15 minutes.""" | |
now = time.time() | |
for file in os.listdir(CACHE_DIR): | |
file_path = os.path.join(CACHE_DIR, file) | |
if os.path.isfile(file_path) and now - os.path.getmtime(file_path) > 900: # 15 minutes | |
os.remove(file_path) | |
def get_video_duration(video_path): | |
"""Get video duration in seconds using ffmpeg.""" | |
probe = ffmpeg.probe(video_path, v='error', select_streams='v:0', show_entries='stream=duration') | |
return float(probe['streams'][0]['duration']) | |
def convert_video(video, target_format, conversion_type, time_in_seconds=None): | |
try: | |
# Create a temporary directory for the uploaded file | |
temp_dir = tempfile.mkdtemp() | |
# Sanitize the filename and save the uploaded video to the temp directory | |
base_name = os.path.splitext(os.path.basename(video.name))[0] | |
sanitized_base_name = sanitize_filename(base_name) | |
video_path = os.path.join(temp_dir, f"{sanitized_base_name}.mp4") # Saving as mp4 by default for now | |
with open(video_path, "wb") as f: | |
f.write(video.getbuffer()) # Save the uploaded video to a local file | |
if conversion_type == 'Video to Video': | |
output_file = f"flowly_ai_video_converter_{sanitized_base_name}.{target_format.lower()}" | |
ffmpeg.input(video_path).output(output_file).overwrite_output().run() # Add .overwrite_output() | |
return output_file | |
elif conversion_type == 'Video to Audio': | |
audio_output_file = f"flowly_ai_video_to_audio_{sanitized_base_name}.{target_format.lower()}" | |
ffmpeg.input(video_path).output(audio_output_file).overwrite_output().run() # Add .overwrite_output() | |
return audio_output_file | |
elif conversion_type == 'Video to GIF': | |
gif_output_file = f"flowly_ai_video_to_gif_{sanitized_base_name}.gif" | |
ffmpeg.input(video_path).output(gif_output_file, vf="fps=10,scale=320:-1:flags=lanczos").overwrite_output().run() # Add .overwrite_output() | |
return gif_output_file | |
elif conversion_type == 'Video to Image': | |
if time_in_seconds is None: | |
return "Please specify a valid time in seconds for image extraction." | |
image_output_file = f"flowly_ai_video_to_image_{sanitized_base_name}_{time_in_seconds}.png" | |
ffmpeg.input(video_path, ss=time_in_seconds).output(image_output_file, vframes=1).overwrite_output().run() # Add .overwrite_output() | |
return image_output_file | |
except Exception as e: | |
return f"Error: {e}" | |
def update_format_choices(conversion_type): | |
"""Update available format choices based on the conversion type.""" | |
if conversion_type == 'Video to Video': | |
return supported_formats | |
elif conversion_type == 'Video to Audio': | |
return audio_formats | |
elif conversion_type == 'Video to GIF': | |
return gif_formats | |
elif conversion_type == 'Video to Image': | |
return image_formats | |
return [] | |
def main(): | |
st.title("Video Conversion Tool") | |
st.write("Convert videos to audio, GIFs, images, or other formats easily with this powerful tool.") | |
# Create two columns | |
col1, col2 = st.columns([1, 1]) | |
with col1: | |
# Upload video file | |
video_file = st.file_uploader("Upload a Video", type=supported_formats) | |
if video_file: | |
st.video(video_file) | |
with col2: | |
if video_file: | |
# Save uploaded video to cache | |
temp_video_path = os.path.join(CACHE_DIR, video_file.name) | |
with open(temp_video_path, "wb") as f: | |
f.write(video_file.getbuffer()) | |
# Get video duration | |
video_duration = get_video_duration(temp_video_path) | |
# Select conversion type | |
conversion_type = st.selectbox( | |
"Select Conversion Type", | |
['Video to Video', 'Video to Audio', 'Video to GIF', 'Video to Image'] | |
) | |
# Update format choices based on conversion type | |
target_format_choices = update_format_choices(conversion_type) | |
target_format = st.selectbox("Select Target Format", target_format_choices) | |
# If 'Video to Image' conversion, ask for time in seconds | |
if conversion_type == 'Video to Image': | |
time_in_seconds = st.slider( | |
label="Time (in seconds) for image extraction", | |
min_value=0, | |
max_value=int(video_duration), | |
value=0, | |
step=1 | |
) | |
else: | |
time_in_seconds = None | |
if st.button("Convert"): | |
with st.spinner("Converting..."): | |
output_file = convert_video(video_file, target_format, conversion_type, time_in_seconds) | |
if "Error" in output_file: | |
st.error(output_file) | |
else: | |
st.success("Conversion Successful! Download the file:") | |
st.download_button( | |
label="Download Converted File", | |
data=open(output_file, 'rb').read(), | |
file_name=os.path.basename(output_file) | |
) | |
if __name__ == "__main__": | |
main() | |