import gradio as gr import requests import tempfile import os import json import traceback # AudioJob integration from audiojob import AudioJobRunner from pydub import AudioSegment from typing import Optional, Tuple import logging import ffmpeg # Set up logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def download_audio_from_url(url: str) -> str: """Download audio from URL and save to temporary file.""" try: response = requests.get(url, stream=True, timeout=30) response.raise_for_status() # Get content type to determine file extension content_type = response.headers.get('content-type', '') if 'audio/mpeg' in content_type or 'mp3' in content_type: ext = '.mp3' elif 'audio/wav' in content_type or 'wav' in content_type: ext = '.wav' elif 'audio/ogg' in content_type or 'ogg' in content_type: ext = '.ogg' elif 'audio/mp4' in content_type or 'm4a' in content_type: ext = '.m4a' else: # Try to get extension from URL ext = os.path.splitext(url.split('?')[0])[1] if not ext: ext = '.mp3' # Default fallback # Create temporary file with tempfile.NamedTemporaryFile(delete=False, suffix=ext) as temp_file: for chunk in response.iter_content(chunk_size=8192): temp_file.write(chunk) return temp_file.name except Exception as e: logger.error(f"Error downloading audio: {str(e)}") raise gr.Error(f"Failed to download audio from URL: {str(e)}") def cut_audio(audio_url: str, start_time: float, duration: float) -> str: """ Cut audio from the given URL based on start time and duration. Args: audio_url: URL of the audio file start_time: Start time in seconds duration: Duration in seconds Returns: Path to the cut audio file """ try: # Validate inputs if not audio_url.strip(): raise gr.Error("Please provide a valid audio URL") if start_time < 0: raise gr.Error("Start time must be non-negative") if duration <= 0: raise gr.Error("Duration must be positive") # Download audio from URL logger.info(f"Downloading audio from: {audio_url}") temp_input_path = download_audio_from_url(audio_url) try: # Load audio file logger.info("Loading audio file...") audio = AudioSegment.from_file(temp_input_path) # Convert times to milliseconds start_ms = int(start_time * 1000) duration_ms = int(duration * 1000) end_ms = start_ms + duration_ms # Check if start time is within audio duration if start_ms >= len(audio): raise gr.Error(f"Start time ({start_time}s) is beyond audio duration ({len(audio)/1000:.2f}s)") # Adjust end time if it exceeds audio length if end_ms > len(audio): end_ms = len(audio) actual_duration = (end_ms - start_ms) / 1000 logger.warning(f"Requested duration extends beyond audio. Cutting until end. Actual duration: {actual_duration:.2f}s") # Cut the audio with ffmpeg using stream copy to preserve original codec/bitrate logger.info(f"Cutting audio (stream copy) from {start_time}s to {end_ms/1000:.2f}s") # Keep original file extension when saving to /tmp _, input_ext = os.path.splitext(temp_input_path) if not input_ext: input_ext = ".mp3" # Create an output path in /tmp fd, output_path = tempfile.mkstemp(suffix=input_ext, dir="/tmp") os.close(fd) # Duration for the cut in seconds cut_duration_seconds = (end_ms - start_ms) / 1000.0 # Try fast cut using ffmpeg stream copy to avoid re-encoding try: ( ffmpeg .input(temp_input_path, ss=start_time, t=cut_duration_seconds) .output(output_path, acodec='copy') .global_args('-loglevel', 'error', '-hide_banner') .overwrite_output() .run(capture_stdout=True, capture_stderr=True) ) except ffmpeg.Error as ff_err: # Log detailed ffmpeg stderr and fall back to re-encoding try: ffmpeg_stderr = ff_err.stderr.decode('utf-8', errors='ignore') if hasattr(ff_err, 'stderr') else str(ff_err) except Exception: ffmpeg_stderr = str(ff_err) logger.warning("ffmpeg stream copy failed, falling back to re-encode. Details: %s", ffmpeg_stderr) # Fallback: re-encode using pydub (slower but more compatible) segment = audio[start_ms:end_ms] export_format = input_ext[1:].lower() if input_ext.startswith('.') else input_ext.lower() # pydub/ffmpeg commonly expect 'mp4' as format for m4a container if export_format == 'm4a': export_format = 'mp4' segment.export(output_path, format=export_format) logger.info(f"Cut audio saved to: {output_path}") return output_path finally: # Clean up input file if os.path.exists(temp_input_path): os.unlink(temp_input_path) except gr.Error: # Re-raise Gradio errors raise except Exception as e: logger.error(f"Error cutting audio: {str(e)}") raise gr.Error(f"Failed to process audio: {str(e)}") def process_audio_cut(audio_url: str, start_time: float, duration: float) -> Tuple[str, str]: """ Process audio cutting and return both the audio file and status message. Returns: Tuple of (audio_file_path, status_message) """ try: result_path = cut_audio(audio_url, start_time, duration) status_msg = f"✅ Successfully cut audio: {duration}s segment starting at {start_time}s" return result_path, status_msg except Exception as e: error_msg = f"❌ Error: {str(e)}" return None, error_msg # Create Gradio interface with gr.Blocks(title="Audio Editor", theme=gr.themes.Soft()) as demo: gr.Markdown( """ # 🎵 Audio Editor Upload audio via URL and perform various editing operations. ## 🎯 Audio Cut Cut a specific segment from your audio file by providing start time and duration. """ ) with gr.Tab("Audio Cut"): gr.Markdown("### Cut Audio Segment") with gr.Row(): with gr.Column(): audio_url_input = gr.Textbox( label="Audio URL", placeholder="https://example.com/audio.mp3", info="Enter the URL of the audio file you want to edit" ) with gr.Row(): start_time_input = gr.Number( label="Start Time (seconds)", value=0, minimum=0, info="When to start cutting (in seconds)" ) duration_input = gr.Number( label="Duration (seconds)", value=10, minimum=0.1, info="How long the cut should be (in seconds)" ) cut_button = gr.Button("🎵 Cut Audio", variant="primary") with gr.Column(): status_output = gr.Textbox( label="Status", interactive=False, info="Processing status and messages" ) audio_output = gr.Audio( label="Cut Audio Result", type="filepath", ) # Examples gr.Markdown("### 📝 Examples") gr.Examples( examples=[ [ "https://www.soundjay.com/misc/sounds/bell-ringing-05.wav", 0, 5 ], [ "https://file-examples.com/storage/fe68c9d70ede98d3b5f5f90/2017/11/file_example_MP3_700KB.mp3", 10, 15 ] ], inputs=[audio_url_input, start_time_input, duration_input], label="Try these examples:" ) # Set up event handler cut_button.click( fn=process_audio_cut, inputs=[audio_url_input, start_time_input, duration_input], outputs=[audio_output, status_output] ) with gr.Tab("AudioJob Runner"): gr.Markdown("### AudioJob: preprocess -> split (inspect manifest)") with gr.Row(): with gr.Column(): aj_source_input = gr.Textbox( label="Source URI", placeholder="e.g. /abs/path/to/file.wav or s3://bucket/key", info="Source URI for AudioJobRunner" ) aj_manifest_input = gr.Textbox( label="Manifest JSON (optional)", placeholder="Paste existing manifest JSON to resume (optional)", lines=10 ) aj_s3_prefix = gr.Textbox( label="S3 Prefix", placeholder="Optional prefix for uploaded working copies (e.g. jobs/)", info="Uploaded keys will be prefixed with this value", ) aj_run_button = gr.Button("Run AudioJob", variant="primary") with gr.Column(): aj_output = gr.Textbox(label="AudioJob Output (manifest)", lines=30, interactive=False) def run_audiojob_ui(source_uri: str, manifest_json: str, s3_prefix: str) -> str: try: manifest = None if manifest_json and manifest_json.strip(): manifest = json.loads(manifest_json) work_root = tempfile.mkdtemp(prefix="audiojob_") # allow presets from top-level presets if desired; using defaults here runner = AudioJobRunner( manifest=manifest, source_uri=None if manifest else source_uri, work_root=work_root, presets={ # Read bucket and endpoint from environment where possible "s3_bucket": os.environ.get("S3_BUCKET"), "s3_region": "auto", "s3_prefix": s3_prefix or "", "s3_endpoint": os.environ.get("S3_ENDPOINT", ""), "chunk_target_ms": 15 * 60000, } ) out_manifest = runner.run_until_split() return json.dumps(out_manifest, ensure_ascii=False, indent=2) except Exception as e: tb = traceback.format_exc() return f"Error: {e}\n\n{tb}" aj_run_button.click(fn=run_audiojob_ui, inputs=[aj_source_input, aj_manifest_input, aj_s3_prefix], outputs=[aj_output]) # Launch the app if __name__ == "__main__": demo.launch()