| import os
|
| import logging
|
| from typing import List, Optional
|
| from pydub import AudioSegment
|
| from pydub.effects import normalize
|
| import uuid
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| class AudioProcessor:
|
| def __init__(self):
|
| self.supported_formats = ['mp3', 'wav', 'm4a', 'aac', 'ogg']
|
|
|
| async def combine_audios(
|
| self,
|
| audio_paths: List[str],
|
| background_music_path: Optional[str] = None,
|
| output_dir: str = "temp",
|
| background_volume: float = 0.3,
|
| fade_duration: int = 1000
|
| ) -> str:
|
| """
|
| Tổng hợp các file audio thành một file duy nhất
|
|
|
| Args:
|
| audio_paths: Danh sách đường dẫn audio các cảnh
|
| background_music_path: Đường dẫn nhạc nền (optional)
|
| output_dir: Thư mục output
|
| background_volume: Âm lượng nhạc nền (0.0 - 1.0)
|
| fade_duration: Thời gian fade in/out (ms)
|
|
|
| Returns:
|
| Đường dẫn file audio đã tổng hợp
|
| """
|
| try:
|
| logger.info(f"Bắt đầu tổng hợp {len(audio_paths)} file audio")
|
|
|
|
|
| valid_audio_paths = []
|
| for path in audio_paths:
|
| if os.path.exists(path):
|
| valid_audio_paths.append(path)
|
| logger.info(f"Audio hợp lệ: {path}")
|
| else:
|
| logger.warning(f"Audio không tồn tại: {path}")
|
|
|
| if not valid_audio_paths:
|
| raise ValueError("Không có file audio hợp lệ nào")
|
|
|
|
|
| combined_audio = None
|
| total_duration = 0
|
|
|
| for i, audio_path in enumerate(valid_audio_paths):
|
| logger.info(f"Xử lý audio {i+1}/{len(valid_audio_paths)}: {audio_path}")
|
|
|
|
|
| audio_segment = AudioSegment.from_file(audio_path)
|
|
|
|
|
| audio_segment = normalize(audio_segment)
|
|
|
|
|
| if i == 0:
|
| audio_segment = audio_segment.fade_in(fade_duration)
|
| if i == len(valid_audio_paths) - 1:
|
| audio_segment = audio_segment.fade_out(fade_duration)
|
|
|
|
|
| if combined_audio is None:
|
| combined_audio = audio_segment
|
| else:
|
| combined_audio = combined_audio + audio_segment
|
|
|
| total_duration += len(audio_segment)
|
| logger.info(f"Đã thêm audio {i+1}, tổng thời lượng: {total_duration/1000:.2f}s")
|
|
|
| logger.info(f"Hoàn thành ghép audio cảnh, tổng thời lượng: {total_duration/1000:.2f}s")
|
|
|
|
|
| if background_music_path and os.path.exists(background_music_path):
|
| logger.info("Đang thêm nhạc nền...")
|
|
|
|
|
| background_music = AudioSegment.from_file(background_music_path)
|
|
|
|
|
| background_music = background_music - (20 - int(background_volume * 20))
|
|
|
|
|
| if len(background_music) < len(combined_audio):
|
|
|
| repeat_times = (len(combined_audio) // len(background_music)) + 1
|
| background_music = background_music * repeat_times
|
|
|
|
|
| background_music = background_music[:len(combined_audio)]
|
|
|
|
|
| background_music = background_music.fade_in(fade_duration * 2).fade_out(fade_duration * 2)
|
|
|
|
|
| combined_audio = combined_audio.overlay(background_music)
|
| logger.info("Đã thêm nhạc nền thành công")
|
|
|
|
|
| output_filename = f"combined_audio_{uuid.uuid4().hex[:8]}.wav"
|
| output_path = os.path.join(output_dir, output_filename)
|
|
|
|
|
| combined_audio.export(
|
| output_path,
|
| format="wav",
|
| parameters=["-ac", "2", "-ar", "44100"]
|
| )
|
|
|
| logger.info(f"Đã xuất file audio tổng hợp: {output_path}")
|
| logger.info(f"Thời lượng cuối cùng: {len(combined_audio)/1000:.2f}s")
|
|
|
| return output_path
|
|
|
| except Exception as e:
|
| logger.error(f"Lỗi khi tổng hợp audio: {str(e)}")
|
| raise
|
|
|
| def get_audio_info(self, audio_path: str) -> dict:
|
| """
|
| Lấy thông tin về file audio
|
| """
|
| try:
|
| audio = AudioSegment.from_file(audio_path)
|
| return {
|
| "duration_seconds": len(audio) / 1000,
|
| "channels": audio.channels,
|
| "frame_rate": audio.frame_rate,
|
| "sample_width": audio.sample_width,
|
| "file_size_mb": os.path.getsize(audio_path) / (1024 * 1024)
|
| }
|
| except Exception as e:
|
| logger.error(f"Lỗi khi lấy thông tin audio {audio_path}: {str(e)}")
|
| return {}
|
|
|
| def validate_audio_file(self, audio_path: str) -> bool:
|
| """
|
| Kiểm tra tính hợp lệ của file audio
|
| """
|
| try:
|
| if not os.path.exists(audio_path):
|
| return False
|
|
|
|
|
| ext = audio_path.lower().split('.')[-1]
|
| if ext not in self.supported_formats:
|
| return False
|
|
|
|
|
| AudioSegment.from_file(audio_path)
|
| return True
|
|
|
| except Exception:
|
| return False |