Spaces:
Running
Running
import gradio as gr | |
import requests | |
import tempfile | |
import os | |
import json | |
import traceback | |
# AudioJob integration | |
from audiojob import AudioJobRunner | |
from pydub import AudioSegment | |
from typing import Optional, Tuple | |
import logging | |
import ffmpeg | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
def download_audio_from_url(url: str) -> str: | |
"""Download audio from URL and save to temporary file.""" | |
try: | |
response = requests.get(url, stream=True, timeout=30) | |
response.raise_for_status() | |
# Get content type to determine file extension | |
content_type = response.headers.get('content-type', '') | |
if 'audio/mpeg' in content_type or 'mp3' in content_type: | |
ext = '.mp3' | |
elif 'audio/wav' in content_type or 'wav' in content_type: | |
ext = '.wav' | |
elif 'audio/ogg' in content_type or 'ogg' in content_type: | |
ext = '.ogg' | |
elif 'audio/mp4' in content_type or 'm4a' in content_type: | |
ext = '.m4a' | |
else: | |
# Try to get extension from URL | |
ext = os.path.splitext(url.split('?')[0])[1] | |
if not ext: | |
ext = '.mp3' # Default fallback | |
# Create temporary file | |
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: | |
for chunk in response.iter_content(chunk_size=8192): | |
temp_file.write(chunk) | |
return temp_file.name | |
except Exception as e: | |
logger.error(f"Error downloading audio: {str(e)}") | |
raise gr.Error(f"Failed to download audio from URL: {str(e)}") | |
def cut_audio(audio_url: str, start_time: float, duration: float) -> str: | |
""" | |
Cut audio from the given URL based on start time and duration. | |
Args: | |
audio_url: URL of the audio file | |
start_time: Start time in seconds | |
duration: Duration in seconds | |
Returns: | |
Path to the cut audio file | |
""" | |
try: | |
# Validate inputs | |
if not audio_url.strip(): | |
raise gr.Error("Please provide a valid audio URL") | |
if start_time < 0: | |
raise gr.Error("Start time must be non-negative") | |
if duration <= 0: | |
raise gr.Error("Duration must be positive") | |
# Download audio from URL | |
logger.info(f"Downloading audio from: {audio_url}") | |
temp_input_path = download_audio_from_url(audio_url) | |
try: | |
# Load audio file | |
logger.info("Loading audio file...") | |
audio = AudioSegment.from_file(temp_input_path) | |
# Convert times to milliseconds | |
start_ms = int(start_time * 1000) | |
duration_ms = int(duration * 1000) | |
end_ms = start_ms + duration_ms | |
# Check if start time is within audio duration | |
if start_ms >= len(audio): | |
raise gr.Error(f"Start time ({start_time}s) is beyond audio duration ({len(audio)/1000:.2f}s)") | |
# Adjust end time if it exceeds audio length | |
if end_ms > len(audio): | |
end_ms = len(audio) | |
actual_duration = (end_ms - start_ms) / 1000 | |
logger.warning(f"Requested duration extends beyond audio. Cutting until end. Actual duration: {actual_duration:.2f}s") | |
# Cut the audio with ffmpeg using stream copy to preserve original codec/bitrate | |
logger.info(f"Cutting audio (stream copy) from {start_time}s to {end_ms/1000:.2f}s") | |
# Keep original file extension when saving to /tmp | |
_, input_ext = os.path.splitext(temp_input_path) | |
if not input_ext: | |
input_ext = ".mp3" | |
# Create an output path in /tmp | |
fd, output_path = tempfile.mkstemp(suffix=input_ext, dir="/tmp") | |
os.close(fd) | |
# Duration for the cut in seconds | |
cut_duration_seconds = (end_ms - start_ms) / 1000.0 | |
# Try fast cut using ffmpeg stream copy to avoid re-encoding | |
try: | |
( | |
ffmpeg | |
.input(temp_input_path, ss=start_time, t=cut_duration_seconds) | |
.output(output_path, acodec='copy') | |
.global_args('-loglevel', 'error', '-hide_banner') | |
.overwrite_output() | |
.run(capture_stdout=True, capture_stderr=True) | |
) | |
except ffmpeg.Error as ff_err: | |
# Log detailed ffmpeg stderr and fall back to re-encoding | |
try: | |
ffmpeg_stderr = ff_err.stderr.decode('utf-8', errors='ignore') if hasattr(ff_err, 'stderr') else str(ff_err) | |
except Exception: | |
ffmpeg_stderr = str(ff_err) | |
logger.warning("ffmpeg stream copy failed, falling back to re-encode. Details: %s", ffmpeg_stderr) | |
# Fallback: re-encode using pydub (slower but more compatible) | |
segment = audio[start_ms:end_ms] | |
export_format = input_ext[1:].lower() if input_ext.startswith('.') else input_ext.lower() | |
# pydub/ffmpeg commonly expect 'mp4' as format for m4a container | |
if export_format == 'm4a': | |
export_format = 'mp4' | |
segment.export(output_path, format=export_format) | |
logger.info(f"Cut audio saved to: {output_path}") | |
return output_path | |
finally: | |
# Clean up input file | |
if os.path.exists(temp_input_path): | |
os.unlink(temp_input_path) | |
except gr.Error: | |
# Re-raise Gradio errors | |
raise | |
except Exception as e: | |
logger.error(f"Error cutting audio: {str(e)}") | |
raise gr.Error(f"Failed to process audio: {str(e)}") | |
def process_audio_cut(audio_url: str, start_time: float, duration: float) -> Tuple[str, str]: | |
""" | |
Process audio cutting and return both the audio file and status message. | |
Returns: | |
Tuple of (audio_file_path, status_message) | |
""" | |
try: | |
result_path = cut_audio(audio_url, start_time, duration) | |
status_msg = f"✅ Successfully cut audio: {duration}s segment starting at {start_time}s" | |
return result_path, status_msg | |
except Exception as e: | |
error_msg = f"❌ Error: {str(e)}" | |
return None, error_msg | |
# Create Gradio interface | |
with gr.Blocks(title="Audio Editor", theme=gr.themes.Soft()) as demo: | |
gr.Markdown( | |
""" | |
# 🎵 Audio Editor | |
Upload audio via URL and perform various editing operations. | |
## 🎯 Audio Cut | |
Cut a specific segment from your audio file by providing start time and duration. | |
""" | |
) | |
with gr.Tab("Audio Cut"): | |
gr.Markdown("### Cut Audio Segment") | |
with gr.Row(): | |
with gr.Column(): | |
audio_url_input = gr.Textbox( | |
label="Audio URL", | |
placeholder="https://example.com/audio.mp3", | |
info="Enter the URL of the audio file you want to edit" | |
) | |
with gr.Row(): | |
start_time_input = gr.Number( | |
label="Start Time (seconds)", | |
value=0, | |
minimum=0, | |
info="When to start cutting (in seconds)" | |
) | |
duration_input = gr.Number( | |
label="Duration (seconds)", | |
value=10, | |
minimum=0.1, | |
info="How long the cut should be (in seconds)" | |
) | |
cut_button = gr.Button("🎵 Cut Audio", variant="primary") | |
with gr.Column(): | |
status_output = gr.Textbox( | |
label="Status", | |
interactive=False, | |
info="Processing status and messages" | |
) | |
audio_output = gr.Audio( | |
label="Cut Audio Result", | |
type="filepath", | |
) | |
# Examples | |
gr.Markdown("### 📝 Examples") | |
gr.Examples( | |
examples=[ | |
[ | |
"https://www.soundjay.com/misc/sounds/bell-ringing-05.wav", | |
0, | |
5 | |
], | |
[ | |
"https://file-examples.com/storage/fe68c9d70ede98d3b5f5f90/2017/11/file_example_MP3_700KB.mp3", | |
10, | |
15 | |
] | |
], | |
inputs=[audio_url_input, start_time_input, duration_input], | |
label="Try these examples:" | |
) | |
# Set up event handler | |
cut_button.click( | |
fn=process_audio_cut, | |
inputs=[audio_url_input, start_time_input, duration_input], | |
outputs=[audio_output, status_output] | |
) | |
with gr.Tab("AudioJob Runner"): | |
gr.Markdown("### AudioJob: preprocess -> split (inspect manifest)") | |
with gr.Row(): | |
with gr.Column(): | |
aj_source_input = gr.Textbox( | |
label="Source URI", | |
placeholder="e.g. /abs/path/to/file.wav or s3://bucket/key", | |
info="Source URI for AudioJobRunner" | |
) | |
aj_manifest_input = gr.Textbox( | |
label="Manifest JSON (optional)", | |
placeholder="Paste existing manifest JSON to resume (optional)", | |
lines=10 | |
) | |
aj_s3_prefix = gr.Textbox( | |
label="S3 Prefix", | |
placeholder="Optional prefix for uploaded working copies (e.g. jobs/)", | |
info="Uploaded keys will be prefixed with this value", | |
) | |
aj_run_button = gr.Button("Run AudioJob", variant="primary") | |
with gr.Column(): | |
aj_output = gr.Textbox(label="AudioJob Output (manifest)", lines=30, interactive=False) | |
def run_audiojob_ui(source_uri: str, manifest_json: str, s3_prefix: str) -> str: | |
try: | |
manifest = None | |
if manifest_json and manifest_json.strip(): | |
manifest = json.loads(manifest_json) | |
work_root = tempfile.mkdtemp(prefix="audiojob_") | |
# allow presets from top-level presets if desired; using defaults here | |
runner = AudioJobRunner( | |
manifest=manifest, | |
source_uri=None if manifest else source_uri, | |
work_root=work_root, | |
presets={ | |
# Read bucket and endpoint from environment where possible | |
"s3_bucket": os.environ.get("S3_BUCKET"), | |
"s3_region": "auto", | |
"s3_prefix": s3_prefix or "", | |
"s3_endpoint": os.environ.get("S3_ENDPOINT", ""), | |
"chunk_target_ms": 15 * 60000, | |
} | |
) | |
out_manifest = runner.run_until_split() | |
return json.dumps(out_manifest, ensure_ascii=False, indent=2) | |
except Exception as e: | |
tb = traceback.format_exc() | |
return f"Error: {e}\n\n{tb}" | |
aj_run_button.click(fn=run_audiojob_ui, inputs=[aj_source_input, aj_manifest_input, aj_s3_prefix], outputs=[aj_output]) | |
# Launch the app | |
if __name__ == "__main__": | |
demo.launch() | |