sawadogosalif's picture
end
fbe0b46
import os
from loguru import logger
import boto3
from tqdm import tqdm
from pydub import AudioSegment
from yt_dlp import YoutubeDL
from dotenv import load_dotenv
load_dotenv()
def filter_videos_by_keywords(candidates, keywords):
if not candidates:
return []
filtered_videos = []
for candidate in candidates:
if not isinstance(candidate, dict):
continue
title = str(candidate.get("title", "")).lower()
description = str(candidate.get("description", "")).lower()
if any(keyword.lower() in title or keyword.lower() in description for keyword in keywords):
filtered_videos.append(candidate)
logger.info(f"Filtrage terminé: {len(filtered_videos)}/{len(candidates)} vidéos correspondent aux mots-clés {keywords}")
return filtered_videos
def get_videos_from_channel(channel_url):
logger.info(f"Extraction des vidéos depuis la chaîne: {channel_url}")
ydl_opts = {
'extract_flat': True,
'quiet': True,
}
with YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(channel_url, download=False)
if 'entries' in info:
videos = info['entries']
videos_urls = [video for video in videos if not "Shorts" in video["title"]]
videos_urls = sum([videos_url["entries"] for videos_url in videos_urls], [])
logger.info(f"Nombre total de videos trouvées: {len(videos_urls)}")
return videos_urls
else:
logger.warning("Aucune vidéo trouvée sur cette chaîne")
return []
def download_youtube_audios(videos, output_dir):
"""
Télécharge les fichiers audio des vidéos YouTube.
Args:
videos: Liste des vidéos à télécharger
output_dir: Répertoire de sortie (utilise INPUT_DIR par défaut)
"""
ydl_opts = {
'format': 'bestaudio/best',
'outtmpl': f'{output_dir}/%(title)s.%(ext)s',
'postprocessors': [{
'key': 'FFmpegExtractAudio',
'preferredcodec': 'wav',
}],
'quiet': False,
}
logger.info(f"Début du téléchargement de {len(videos)} vidéos")
with YoutubeDL(ydl_opts) as ydl:
for video in tqdm(videos, desc="Téléchargement des vidéos"):
try:
url = f"https://www.youtube.com/watch?v={video['id']}"
logger.info(f"Téléchargement de l'audio (WAV) : {video['title']}")
ydl.download([url])
except Exception as e:
logger.error(f"Erreur lors du téléchargement de {video.get('title', video.get('id', 'inconnu'))}: {str(e)}")
def segment_audio_files(input_dir, output_dir, segment_length):
"""
Découpe les fichiers audio en segments.
Args:
input_dir: Répertoire des fichiers audio source (utilise INPUT_DIR par défaut)
output_dir: Répertoire des segments audio (utilise OUTPUT_DIR par défaut)
segment_length: Durée de chaque segment en ms (utilise SEGMENT_LENGTH_MS par défaut)
Returns:
Nombre total de segments créés
"""
wav_files = [f for f in os.listdir(input_dir) if f.endswith(".wav")]
logger.info(f"Nombre de fichiers WAV à traiter: {len(wav_files)}")
total_segments = 0
processed_segments = []
for filename in tqdm(wav_files, desc="Traitement des fichiers audio"):
try:
filepath = os.path.join(input_dir, filename)
audio = AudioSegment.from_wav(filepath)
duration = len(audio)
base_name = os.path.splitext(filename)[0]
video_folder = os.path.join(output_dir, base_name)
os.makedirs(video_folder, exist_ok=True)
logger.info(f"Découpage de : {filename} → dossier [{video_folder}]")
num_segments = (duration + segment_length - 1) // segment_length
segments_created = 0
for i in tqdm(range(0, duration, segment_length),
desc=f"Segments de {base_name}",
total=num_segments):
segment = audio[i:i + segment_length]
segment_name = f"part{i // segment_length + 1}.wav"
segment_path = os.path.join(video_folder, segment_name)
segment.export(segment_path, format="wav")
segments_created += 1
processed_segments.append(segment_path)
logger.info(f"Fichier {filename}: {segments_created} segments créés")
total_segments += segments_created
except Exception as e:
logger.error(f"Erreur lors du traitement de {filename}: {str(e)}")
logger.info(f"Traitement terminé. Total des segments créés: {total_segments}")
return total_segments, processed_segments
def setup_s3_client():
access_key = os.getenv("AWS_ACCESS_KEY_ID")
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
endpoint_url = os.getenv("AWS_ENDPOINT_URL_S3")
if not all([access_key, secret_key]):
logger.warning("Variables d'environnement AWS manquantes (AWS_ACCESS_KEY_ID ou AWS_SECRET_ACCESS_KEY)")
return None
client_params = {
"aws_access_key_id": access_key,
"aws_secret_access_key": secret_key,
}
if endpoint_url:
client_params["endpoint_url"] = endpoint_url
try:
return boto3.client("s3", **client_params)
except Exception as e:
logger.error(f"Erreur lors de l'initialisation du client S3: {str(e)}")
return None
def upload_file_to_s3(s3_client, local_path, bucket_name, s3_key):
try:
s3_client.upload_file(local_path, bucket_name, s3_key)
logger.info(f"Uploadé {local_path} vers s3://{bucket_name}/{s3_key}")
except Exception as e:
logger.error(f"Erreur lors de l'upload de {local_path}: {str(e)}")
def upload_segments_to_s3(segments, bucket_name, prefix, segments_folder):
s3_client = setup_s3_client()
if not s3_client:
logger.error("Client S3 non disponible. Upload annulé.")
return 0
uploaded_count = 0
logger.info(f"Début de l'upload des segments vers S3 (bucket: {bucket_name}, préfixe: {prefix})")
for segment_path in tqdm(segments, desc="Upload des segments vers S3"):
try:
relative_path = os.path.relpath(segment_path, start=segments_folder)
s3_key = f"{prefix}/{relative_path.replace(os.sep, '/')}"
upload_file_to_s3(s3_client, segment_path, bucket_name, s3_key)
uploaded_count += 1
except Exception as e:
logger.error(f"Erreur lors de l'upload de {segment_path}: {str(e)}")
logger.info(f"Upload terminé. {uploaded_count}/{len(segments)} fichiers envoyés vers S3.")
return uploaded_count
def main():
# ====================== CHANGE ME - CONFIGURATION ======================
# Mots-clés pour le filtrage des vidéos
FILTER_KEYWORDS = ["sid pa"] #
CHANNEL_URL = "https://www.youtube.com/@livenewsafrica/"
RAW_AUDIO_DIR = "audios_sidpa_wav"
SEGMENT_AUDIO_DIR = "audios_segments_wav"
# Durée des segments en millisecondes
SEGMENT_LENGTH_MS = 30 * 1000 # 30 secondes par défaut
# Configuration S3
BUCKET_NAME = "moore-collection"
S3_PREFIX = "audios_wav"
USE_S3 = True # Mettre à True pour activer les opérations S3
# ====================== FIN CHANGE ME ======================
os.makedirs(RAW_AUDIO_DIR, exist_ok=True)
os.makedirs(SEGMENT_AUDIO_DIR, exist_ok=True)
logger.info("Démarrage du traitement des fichiers audio")
videos = get_videos_from_channel(CHANNEL_URL)
filtered_videos = filter_videos_by_keywords(videos, keywords=["sid pa"])
download_youtube_audios(filtered_videos, RAW_AUDIO_DIR)
total_segments, processed_segments = segment_audio_files(RAW_AUDIO_DIR, SEGMENT_AUDIO_DIR, SEGMENT_LENGTH_MS)
if USE_S3:
upload_segments_to_s3(processed_segments, BUCKET_NAME, S3_PREFIX, SEGMENT_AUDIO_DIR)
logger.info("Traitement terminé avec succès")
if __name__ == "__main__":
main()