Spaces:
Paused
Paused
0x07CB
commited on
refactor: Amélioration de la robustesse de la concaténation audio avec gestion des erreurs et nettoyage des fichiers temporaires
Browse files- core/audio_files.py +69 -26
core/audio_files.py
CHANGED
@@ -11,57 +11,100 @@ from typing import Tuple
|
|
11 |
from typing import Union
|
12 |
import base64
|
13 |
import io
|
|
|
|
|
|
|
14 |
|
15 |
def concatenate_audio_files(audio_list: List[Tuple[Union[bytes, str], float]]) -> Optional[bytes]:
|
16 |
"""
|
17 |
-
Concatène
|
18 |
|
19 |
Args:
|
20 |
-
audio_list (List[Tuple[Union[bytes, str], float]]):
|
21 |
-
|
22 |
|
23 |
Returns:
|
24 |
-
Optional[bytes]:
|
|
|
|
|
|
|
|
|
25 |
"""
|
26 |
-
|
|
|
|
|
|
|
27 |
final_audio = AudioSegment.empty()
|
28 |
-
|
|
|
29 |
try:
|
30 |
# Charger les effets sonores
|
31 |
-
|
32 |
-
|
33 |
-
|
34 |
-
|
35 |
-
|
36 |
-
|
|
|
|
|
|
|
|
|
37 |
|
38 |
-
# 5 secondes de silence
|
39 |
-
silence = AudioSegment.silent(duration=1500)
|
40 |
|
41 |
for audio_data, _ in audio_list:
|
42 |
-
|
43 |
-
|
44 |
-
|
45 |
-
|
46 |
-
|
|
|
|
|
|
|
|
|
|
|
47 |
|
48 |
-
|
49 |
-
|
50 |
-
|
51 |
-
|
52 |
-
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
53 |
|
|
|
|
|
54 |
|
55 |
# Convertir le segment audio final en octets
|
56 |
buffer = io.BytesIO()
|
57 |
final_audio.export(buffer, format="mp3")
|
58 |
return buffer.getvalue()
|
|
|
|
|
|
|
|
|
59 |
except IOError as e:
|
60 |
-
|
61 |
return None
|
62 |
except Exception as e:
|
63 |
-
|
64 |
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
65 |
|
66 |
|
67 |
def split_audio(audio_file, max_size_mb: int = 25) -> List[bytes]:
|
|
|
11 |
from typing import Union
|
12 |
import base64
|
13 |
import io
|
14 |
+
import tempfile
|
15 |
+
import os
|
16 |
+
import streamlit as st
|
17 |
|
18 |
def concatenate_audio_files(audio_list: List[Tuple[Union[bytes, str], float]]) -> Optional[bytes]:
|
19 |
"""
|
20 |
+
Concatène une liste de fichiers audio avec des effets sonores.
|
21 |
|
22 |
Args:
|
23 |
+
audio_list (List[Tuple[Union[bytes, str], float]]): Liste de tuples contenant les données audio
|
24 |
+
(en bytes ou base64) et leur durée.
|
25 |
|
26 |
Returns:
|
27 |
+
Optional[bytes]: Données audio concaténées ou None en cas d'erreur.
|
28 |
+
|
29 |
+
Raises:
|
30 |
+
ValueError: Si la liste d'audio est vide ou invalide.
|
31 |
+
IOError: Si une erreur se produit lors de la lecture/écriture des fichiers.
|
32 |
"""
|
33 |
+
if not audio_list:
|
34 |
+
st.error("Erreur : Aucun fichier audio à concaténer")
|
35 |
+
return None
|
36 |
+
|
37 |
final_audio = AudioSegment.empty()
|
38 |
+
temp_files = [] # Pour le nettoyage des fichiers temporaires
|
39 |
+
|
40 |
try:
|
41 |
# Charger les effets sonores
|
42 |
+
try:
|
43 |
+
begin_sound = AudioSegment.from_mp3(
|
44 |
+
"sound-effects/voice-message-play-begin/voice-message-play-begin-1.mp3"
|
45 |
+
)
|
46 |
+
end_sound = AudioSegment.from_mp3(
|
47 |
+
"sound-effects/voice-message-play-ending/voice-message-play-ending-1.mp3"
|
48 |
+
)
|
49 |
+
except IOError as e:
|
50 |
+
st.warning("Impossible de charger les effets sonores, continuation sans effets")
|
51 |
+
begin_sound = end_sound = AudioSegment.empty()
|
52 |
|
53 |
+
# 1.5 secondes de silence
|
54 |
+
silence = AudioSegment.silent(duration=1500)
|
55 |
|
56 |
for audio_data, _ in audio_list:
|
57 |
+
try:
|
58 |
+
# Convertir en bytes si c'est une chaîne base64
|
59 |
+
if isinstance(audio_data, str):
|
60 |
+
try:
|
61 |
+
audio_bytes = base64.b64decode(audio_data)
|
62 |
+
except Exception as e:
|
63 |
+
st.error(f"Erreur de décodage base64 : {e}")
|
64 |
+
continue
|
65 |
+
else:
|
66 |
+
audio_bytes = audio_data
|
67 |
|
68 |
+
# Créer un fichier temporaire pour l'audio
|
69 |
+
temp_file = tempfile.NamedTemporaryFile(delete=False, suffix='.mp3')
|
70 |
+
temp_files.append(temp_file.name)
|
71 |
+
temp_file.write(audio_bytes)
|
72 |
+
temp_file.close()
|
73 |
+
|
74 |
+
# Convertir les octets en un segment audio
|
75 |
+
segment = AudioSegment.from_mp3(temp_file.name)
|
76 |
+
|
77 |
+
# Ajouter le son de début, le segment TTS, le son de fin et le silence
|
78 |
+
final_audio += begin_sound + segment + end_sound + silence
|
79 |
+
|
80 |
+
except Exception as e:
|
81 |
+
st.warning(f"Erreur lors du traitement d'un segment audio : {e}")
|
82 |
+
continue
|
83 |
|
84 |
+
if len(final_audio) == 0:
|
85 |
+
raise ValueError("Aucun segment audio n'a pu être traité correctement")
|
86 |
|
87 |
# Convertir le segment audio final en octets
|
88 |
buffer = io.BytesIO()
|
89 |
final_audio.export(buffer, format="mp3")
|
90 |
return buffer.getvalue()
|
91 |
+
|
92 |
+
except ValueError as e:
|
93 |
+
st.error(f"Erreur de validation : {e}")
|
94 |
+
return None
|
95 |
except IOError as e:
|
96 |
+
st.error(f"Erreur lors de la lecture ou de l'écriture des fichiers audio : {e}")
|
97 |
return None
|
98 |
except Exception as e:
|
99 |
+
st.error(f"Une erreur inattendue s'est produite : {e}")
|
100 |
return None
|
101 |
+
finally:
|
102 |
+
# Nettoyage des fichiers temporaires
|
103 |
+
for temp_file in temp_files:
|
104 |
+
try:
|
105 |
+
os.remove(temp_file)
|
106 |
+
except Exception:
|
107 |
+
pass
|
108 |
|
109 |
|
110 |
def split_audio(audio_file, max_size_mb: int = 25) -> List[bytes]:
|