audio-edit / app.py
liuyang
fix value
f69d26d
import gradio as gr
import requests
import tempfile
import os
import json
import traceback
# AudioJob integration
from audiojob import AudioJobRunner
from pydub import AudioSegment
from typing import Optional, Tuple
import logging
import ffmpeg
# Set up logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def download_audio_from_url(url: str) -> str:
"""Download audio from URL and save to temporary file."""
try:
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
# Get content type to determine file extension
content_type = response.headers.get('content-type', '')
if 'audio/mpeg' in content_type or 'mp3' in content_type:
ext = '.mp3'
elif 'audio/wav' in content_type or 'wav' in content_type:
ext = '.wav'
elif 'audio/ogg' in content_type or 'ogg' in content_type:
ext = '.ogg'
elif 'audio/mp4' in content_type or 'm4a' in content_type:
ext = '.m4a'
else:
# Try to get extension from URL
ext = os.path.splitext(url.split('?')[0])[1]
if not ext:
ext = '.mp3' # Default fallback
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file:
for chunk in response.iter_content(chunk_size=8192):
temp_file.write(chunk)
return temp_file.name
except Exception as e:
logger.error(f"Error downloading audio: {str(e)}")
raise gr.Error(f"Failed to download audio from URL: {str(e)}")
def cut_audio(audio_url: str, start_time: float, duration: float) -> str:
"""
Cut audio from the given URL based on start time and duration.
Args:
audio_url: URL of the audio file
start_time: Start time in seconds
duration: Duration in seconds
Returns:
Path to the cut audio file
"""
try:
# Validate inputs
if not audio_url.strip():
raise gr.Error("Please provide a valid audio URL")
if start_time < 0:
raise gr.Error("Start time must be non-negative")
if duration <= 0:
raise gr.Error("Duration must be positive")
# Download audio from URL
logger.info(f"Downloading audio from: {audio_url}")
temp_input_path = download_audio_from_url(audio_url)
try:
# Load audio file
logger.info("Loading audio file...")
audio = AudioSegment.from_file(temp_input_path)
# Convert times to milliseconds
start_ms = int(start_time * 1000)
duration_ms = int(duration * 1000)
end_ms = start_ms + duration_ms
# Check if start time is within audio duration
if start_ms >= len(audio):
raise gr.Error(f"Start time ({start_time}s) is beyond audio duration ({len(audio)/1000:.2f}s)")
# Adjust end time if it exceeds audio length
if end_ms > len(audio):
end_ms = len(audio)
actual_duration = (end_ms - start_ms) / 1000
logger.warning(f"Requested duration extends beyond audio. Cutting until end. Actual duration: {actual_duration:.2f}s")
# Cut the audio with ffmpeg using stream copy to preserve original codec/bitrate
logger.info(f"Cutting audio (stream copy) from {start_time}s to {end_ms/1000:.2f}s")
# Keep original file extension when saving to /tmp
_, input_ext = os.path.splitext(temp_input_path)
if not input_ext:
input_ext = ".mp3"
# Create an output path in /tmp
fd, output_path = tempfile.mkstemp(suffix=input_ext, dir="/tmp")
os.close(fd)
# Duration for the cut in seconds
cut_duration_seconds = (end_ms - start_ms) / 1000.0
# Try fast cut using ffmpeg stream copy to avoid re-encoding
try:
(
ffmpeg
.input(temp_input_path, ss=start_time, t=cut_duration_seconds)
.output(output_path, acodec='copy')
.global_args('-loglevel', 'error', '-hide_banner')
.overwrite_output()
.run(capture_stdout=True, capture_stderr=True)
)
except ffmpeg.Error as ff_err:
# Log detailed ffmpeg stderr and fall back to re-encoding
try:
ffmpeg_stderr = ff_err.stderr.decode('utf-8', errors='ignore') if hasattr(ff_err, 'stderr') else str(ff_err)
except Exception:
ffmpeg_stderr = str(ff_err)
logger.warning("ffmpeg stream copy failed, falling back to re-encode. Details: %s", ffmpeg_stderr)
# Fallback: re-encode using pydub (slower but more compatible)
segment = audio[start_ms:end_ms]
export_format = input_ext[1:].lower() if input_ext.startswith('.') else input_ext.lower()
# pydub/ffmpeg commonly expect 'mp4' as format for m4a container
if export_format == 'm4a':
export_format = 'mp4'
segment.export(output_path, format=export_format)
logger.info(f"Cut audio saved to: {output_path}")
return output_path
finally:
# Clean up input file
if os.path.exists(temp_input_path):
os.unlink(temp_input_path)
except gr.Error:
# Re-raise Gradio errors
raise
except Exception as e:
logger.error(f"Error cutting audio: {str(e)}")
raise gr.Error(f"Failed to process audio: {str(e)}")
def process_audio_cut(audio_url: str, start_time: float, duration: float) -> Tuple[str, str]:
"""
Process audio cutting and return both the audio file and status message.
Returns:
Tuple of (audio_file_path, status_message)
"""
try:
result_path = cut_audio(audio_url, start_time, duration)
status_msg = f"✅ Successfully cut audio: {duration}s segment starting at {start_time}s"
return result_path, status_msg
except Exception as e:
error_msg = f"❌ Error: {str(e)}"
return None, error_msg
# Create Gradio interface
with gr.Blocks(title="Audio Editor", theme=gr.themes.Soft()) as demo:
gr.Markdown(
"""
# 🎵 Audio Editor
Upload audio via URL and perform various editing operations.
## 🎯 Audio Cut
Cut a specific segment from your audio file by providing start time and duration.
"""
)
with gr.Tab("Audio Cut"):
gr.Markdown("### Cut Audio Segment")
with gr.Row():
with gr.Column():
audio_url_input = gr.Textbox(
label="Audio URL",
placeholder="https://example.com/audio.mp3",
info="Enter the URL of the audio file you want to edit"
)
with gr.Row():
start_time_input = gr.Number(
label="Start Time (seconds)",
value=0,
minimum=0,
info="When to start cutting (in seconds)"
)
duration_input = gr.Number(
label="Duration (seconds)",
value=10,
minimum=0.1,
info="How long the cut should be (in seconds)"
)
cut_button = gr.Button("🎵 Cut Audio", variant="primary")
with gr.Column():
status_output = gr.Textbox(
label="Status",
interactive=False,
info="Processing status and messages"
)
audio_output = gr.Audio(
label="Cut Audio Result",
type="filepath",
)
# Examples
gr.Markdown("### 📝 Examples")
gr.Examples(
examples=[
[
"https://www.soundjay.com/misc/sounds/bell-ringing-05.wav",
0,
5
],
[
"https://file-examples.com/storage/fe68c9d70ede98d3b5f5f90/2017/11/file_example_MP3_700KB.mp3",
10,
15
]
],
inputs=[audio_url_input, start_time_input, duration_input],
label="Try these examples:"
)
# Set up event handler
cut_button.click(
fn=process_audio_cut,
inputs=[audio_url_input, start_time_input, duration_input],
outputs=[audio_output, status_output]
)
with gr.Tab("AudioJob Runner"):
gr.Markdown("### AudioJob: preprocess -> split (inspect manifest)")
with gr.Row():
with gr.Column():
aj_source_input = gr.Textbox(
label="Source URI",
placeholder="e.g. /abs/path/to/file.wav or s3://bucket/key",
info="Source URI for AudioJobRunner"
)
aj_manifest_input = gr.Textbox(
label="Manifest JSON (optional)",
placeholder="Paste existing manifest JSON to resume (optional)",
lines=10
)
aj_s3_prefix = gr.Textbox(
label="S3 Prefix",
placeholder="Optional prefix for uploaded working copies (e.g. jobs/)",
info="Uploaded keys will be prefixed with this value",
)
aj_run_button = gr.Button("Run AudioJob", variant="primary")
with gr.Column():
aj_output = gr.Textbox(label="AudioJob Output (manifest)", lines=30, interactive=False)
def run_audiojob_ui(source_uri: str, manifest_json: str, s3_prefix: str) -> str:
try:
manifest = None
if manifest_json and manifest_json.strip():
manifest = json.loads(manifest_json)
work_root = tempfile.mkdtemp(prefix="audiojob_")
# allow presets from top-level presets if desired; using defaults here
runner = AudioJobRunner(
manifest=manifest,
source_uri=None if manifest else source_uri,
work_root=work_root,
presets={
# Read bucket and endpoint from environment where possible
"s3_bucket": os.environ.get("S3_BUCKET"),
"s3_region": "auto",
"s3_prefix": s3_prefix or "",
"s3_endpoint": os.environ.get("S3_ENDPOINT", ""),
"chunk_target_ms": 15 * 60000,
}
)
out_manifest = runner.run_until_split()
return json.dumps(out_manifest, ensure_ascii=False, indent=2)
except Exception as e:
tb = traceback.format_exc()
return f"Error: {e}\n\n{tb}"
aj_run_button.click(fn=run_audiojob_ui, inputs=[aj_source_input, aj_manifest_input, aj_s3_prefix], outputs=[aj_output])
# Launch the app
if __name__ == "__main__":
demo.launch()