Spaces:
Sleeping
Sleeping
| import os | |
| import torch | |
| import numpy as np | |
| import librosa | |
| import gradio as gr | |
| import torchaudio | |
| import asyncio | |
| from gradio_webrtc import ( | |
| AsyncAudioVideoStreamHandler, | |
| WebRTC, | |
| get_twilio_turn_credentials, | |
| ) | |
| from pathlib import Path | |
| # Create directories | |
| os.makedirs("voice_samples", exist_ok=True) | |
| # Voice presets (simple pitch and speed modifications) | |
| VOICE_PRESETS = { | |
| "Deep Male": {"pitch_shift": -4, "speed_factor": 0.9}, | |
| "Standard Male": {"pitch_shift": -2, "speed_factor": 0.95}, | |
| "Standard Female": {"pitch_shift": 2, "speed_factor": 1.05}, | |
| "High Female": {"pitch_shift": 4, "speed_factor": 1.1}, | |
| } | |
| # Audio processing function | |
| def process_audio(waveform, sampling_rate=16000): | |
| # Convert from int16 to floating point if needed | |
| if waveform.dtype == np.int16: | |
| waveform = waveform / 32768.0 | |
| # Make sure input is mono | |
| if len(waveform.shape) > 1: | |
| waveform = librosa.to_mono(waveform.T) | |
| # Resample to 16 kHz if needed | |
| if sampling_rate != 16000: | |
| waveform = librosa.resample(waveform, orig_sr=sampling_rate, target_sr=16000) | |
| # Limit length to avoid memory issues | |
| max_length = 16000 * 15 | |
| if len(waveform) > max_length: | |
| waveform = waveform[:max_length] | |
| return waveform | |
| # Simple voice conversion using torchaudio effects | |
| def convert_voice_simple(waveform, preset): | |
| try: | |
| # Convert to tensor | |
| if not torch.is_tensor(waveform): | |
| waveform_tensor = torch.tensor(waveform).float() | |
| else: | |
| waveform_tensor = waveform | |
| # Ensure tensor is properly shaped | |
| if waveform_tensor.dim() == 1: | |
| waveform_tensor = waveform_tensor.unsqueeze(0) | |
| # Apply pitch shift | |
| pitch_shift = preset.get("pitch_shift", 0) | |
| if pitch_shift != 0: | |
| waveform_tensor = torchaudio.functional.pitch_shift( | |
| waveform_tensor, | |
| sample_rate=16000, | |
| n_steps=pitch_shift | |
| ) | |
| # Apply speed change | |
| speed_factor = preset.get("speed_factor", 1.0) | |
| if speed_factor != 1.0: | |
| waveform_tensor = torchaudio.functional.speed( | |
| waveform_tensor, | |
| speed_factor | |
| ) | |
| # Add some effects for more natural sound | |
| # Light reverb effect | |
| waveform_tensor = torchaudio.functional.add_reverb( | |
| waveform_tensor, | |
| sample_rate=16000, | |
| reverberance=20, | |
| room_scale=50, | |
| wet_gain=0 | |
| ) | |
| return waveform_tensor.squeeze().numpy() | |
| except Exception as e: | |
| print(f"Error in voice conversion: {e}") | |
| return waveform | |
| class VoiceConversionHandler(AsyncAudioVideoStreamHandler): | |
| def __init__( | |
| self, expected_layout="mono", output_sample_rate=16000, output_frame_size=1024 | |
| ) -> None: | |
| super().__init__( | |
| expected_layout, | |
| output_sample_rate, | |
| output_frame_size, | |
| input_sample_rate=16000, | |
| ) | |
| self.audio_queue = asyncio.Queue() | |
| self.quit = asyncio.Event() | |
| self.voice_preset = None | |
| self.buffer = np.array([]) | |
| self.buffer_size = 4096 # Buffer size for processing | |
| def copy(self) -> "VoiceConversionHandler": | |
| return VoiceConversionHandler( | |
| expected_layout=self.expected_layout, | |
| output_sample_rate=self.output_sample_rate, | |
| output_frame_size=self.output_frame_size, | |
| ) | |
| async def receive(self, frame: tuple[int, np.ndarray]) -> None: | |
| sample_rate, array = frame | |
| array = array.squeeze() | |
| # Add new audio to buffer | |
| self.buffer = np.append(self.buffer, process_audio(array, sample_rate)) | |
| # Process when buffer is large enough | |
| if len(self.buffer) >= self.buffer_size: | |
| # Process audio chunk | |
| if self.voice_preset: | |
| preset = VOICE_PRESETS.get(self.voice_preset, VOICE_PRESETS["Standard Male"]) | |
| processed_audio = convert_voice_simple(self.buffer[:self.buffer_size], preset) | |
| result = (processed_audio * 32767).astype(np.int16) | |
| else: | |
| # Return original if no voice preset is selected | |
| result = (self.buffer[:self.buffer_size] * 32767).astype(np.int16) | |
| self.audio_queue.put_nowait((16000, result)) | |
| # Keep remainder | |
| self.buffer = self.buffer[self.buffer_size:] | |
| async def emit(self): | |
| if not self.args_set.is_set(): | |
| await self.wait_for_args() | |
| # Get selected voice preset | |
| if self.latest_args and len(self.latest_args) > 1: | |
| self.voice_preset = self.latest_args[1] | |
| # If queue is empty, return silence | |
| if self.audio_queue.empty(): | |
| return (16000, np.zeros(self.output_frame_size, dtype=np.int16)) | |
| return await self.audio_queue.get() | |
| def shutdown(self) -> None: | |
| self.quit.set() | |
| self.args_set.clear() | |
| self.quit.clear() | |
| # CSS for styling | |
| css = """ | |
| .container { | |
| max-width: 800px; | |
| margin: 0 auto; | |
| padding: 20px; | |
| } | |
| .header { | |
| text-align: center; | |
| margin-bottom: 20px; | |
| } | |
| .voice-controls { | |
| padding: 15px; | |
| border-radius: 8px; | |
| background-color: #f5f5f5; | |
| margin-bottom: 20px; | |
| } | |
| """ | |
| # Main application | |
| def main(): | |
| with gr.Blocks(css=css) as demo: | |
| gr.Markdown( | |
| """ | |
| <div class="header"> | |
| <h1>Real-time Voice Conversion</h1> | |
| <p>Speak into your microphone to convert your voice in real-time using audio effects.</p> | |
| </div> | |
| """ | |
| ) | |
| with gr.Row(equal_height=True): | |
| with gr.Column(): | |
| webrtc = WebRTC( | |
| label="Voice Chat", | |
| modality="audio", | |
| mode="send-receive", | |
| rtc_configuration=get_twilio_turn_credentials(), | |
| pulse_color="rgb(35, 157, 225)", | |
| ) | |
| with gr.Column(elem_classes="voice-controls"): | |
| voice_preset = gr.Radio( | |
| choices=list(VOICE_PRESETS.keys()), | |
| value="Standard Male", | |
| label="Target Voice" | |
| ) | |
| gr.Markdown( | |
| """ | |
| ### How to use: | |
| 1. Allow microphone access | |
| 2. Select your target voice style | |
| 3. Click the microphone button and start speaking | |
| 4. Your voice will be converted in real-time | |
| Note: This version uses basic audio effects without SentencePiece. | |
| """ | |
| ) | |
| webrtc.stream( | |
| VoiceConversionHandler(), | |
| inputs=[webrtc, voice_preset], | |
| outputs=[webrtc], | |
| concurrency_limit=2, | |
| ) | |
| return demo | |
| if __name__ == "__main__": | |
| demo = main() | |
| demo.launch() |