Spaces:
Running
Running
import os | |
from loguru import logger | |
import boto3 | |
from tqdm import tqdm | |
from pydub import AudioSegment | |
from yt_dlp import YoutubeDL | |
from dotenv import load_dotenv | |
load_dotenv() | |
def filter_videos_by_keywords(candidates, keywords): | |
if not candidates: | |
return [] | |
filtered_videos = [] | |
for candidate in candidates: | |
if not isinstance(candidate, dict): | |
continue | |
title = str(candidate.get("title", "")).lower() | |
description = str(candidate.get("description", "")).lower() | |
if any(keyword.lower() in title or keyword.lower() in description for keyword in keywords): | |
filtered_videos.append(candidate) | |
logger.info(f"Filtrage terminé: {len(filtered_videos)}/{len(candidates)} vidéos correspondent aux mots-clés {keywords}") | |
return filtered_videos | |
def get_videos_from_channel(channel_url): | |
logger.info(f"Extraction des vidéos depuis la chaîne: {channel_url}") | |
ydl_opts = { | |
'extract_flat': True, | |
'quiet': True, | |
} | |
with YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(channel_url, download=False) | |
if 'entries' in info: | |
videos = info['entries'] | |
videos_urls = [video for video in videos if not "Shorts" in video["title"]] | |
videos_urls = sum([videos_url["entries"] for videos_url in videos_urls], []) | |
logger.info(f"Nombre total de videos trouvées: {len(videos_urls)}") | |
return videos_urls | |
else: | |
logger.warning("Aucune vidéo trouvée sur cette chaîne") | |
return [] | |
def download_youtube_audios(videos, output_dir): | |
""" | |
Télécharge les fichiers audio des vidéos YouTube. | |
Args: | |
videos: Liste des vidéos à télécharger | |
output_dir: Répertoire de sortie (utilise INPUT_DIR par défaut) | |
""" | |
ydl_opts = { | |
'format': 'bestaudio/best', | |
'outtmpl': f'{output_dir}/%(title)s.%(ext)s', | |
'postprocessors': [{ | |
'key': 'FFmpegExtractAudio', | |
'preferredcodec': 'wav', | |
}], | |
'quiet': False, | |
} | |
logger.info(f"Début du téléchargement de {len(videos)} vidéos") | |
with YoutubeDL(ydl_opts) as ydl: | |
for video in tqdm(videos, desc="Téléchargement des vidéos"): | |
try: | |
url = f"https://www.youtube.com/watch?v={video['id']}" | |
logger.info(f"Téléchargement de l'audio (WAV) : {video['title']}") | |
ydl.download([url]) | |
except Exception as e: | |
logger.error(f"Erreur lors du téléchargement de {video.get('title', video.get('id', 'inconnu'))}: {str(e)}") | |
def segment_audio_files(input_dir, output_dir, segment_length): | |
""" | |
Découpe les fichiers audio en segments. | |
Args: | |
input_dir: Répertoire des fichiers audio source (utilise INPUT_DIR par défaut) | |
output_dir: Répertoire des segments audio (utilise OUTPUT_DIR par défaut) | |
segment_length: Durée de chaque segment en ms (utilise SEGMENT_LENGTH_MS par défaut) | |
Returns: | |
Nombre total de segments créés | |
""" | |
wav_files = [f for f in os.listdir(input_dir) if f.endswith(".wav")] | |
logger.info(f"Nombre de fichiers WAV à traiter: {len(wav_files)}") | |
total_segments = 0 | |
processed_segments = [] | |
for filename in tqdm(wav_files, desc="Traitement des fichiers audio"): | |
try: | |
filepath = os.path.join(input_dir, filename) | |
audio = AudioSegment.from_wav(filepath) | |
duration = len(audio) | |
base_name = os.path.splitext(filename)[0] | |
video_folder = os.path.join(output_dir, base_name) | |
os.makedirs(video_folder, exist_ok=True) | |
logger.info(f"Découpage de : {filename} → dossier [{video_folder}]") | |
num_segments = (duration + segment_length - 1) // segment_length | |
segments_created = 0 | |
for i in tqdm(range(0, duration, segment_length), | |
desc=f"Segments de {base_name}", | |
total=num_segments): | |
segment = audio[i:i + segment_length] | |
segment_name = f"part{i // segment_length + 1}.wav" | |
segment_path = os.path.join(video_folder, segment_name) | |
segment.export(segment_path, format="wav") | |
segments_created += 1 | |
processed_segments.append(segment_path) | |
logger.info(f"Fichier {filename}: {segments_created} segments créés") | |
total_segments += segments_created | |
except Exception as e: | |
logger.error(f"Erreur lors du traitement de {filename}: {str(e)}") | |
logger.info(f"Traitement terminé. Total des segments créés: {total_segments}") | |
return total_segments, processed_segments | |
def setup_s3_client(): | |
access_key = os.getenv("AWS_ACCESS_KEY_ID") | |
secret_key = os.getenv("AWS_SECRET_ACCESS_KEY") | |
endpoint_url = os.getenv("AWS_ENDPOINT_URL_S3") | |
if not all([access_key, secret_key]): | |
logger.warning("Variables d'environnement AWS manquantes (AWS_ACCESS_KEY_ID ou AWS_SECRET_ACCESS_KEY)") | |
return None | |
client_params = { | |
"aws_access_key_id": access_key, | |
"aws_secret_access_key": secret_key, | |
} | |
if endpoint_url: | |
client_params["endpoint_url"] = endpoint_url | |
try: | |
return boto3.client("s3", **client_params) | |
except Exception as e: | |
logger.error(f"Erreur lors de l'initialisation du client S3: {str(e)}") | |
return None | |
def upload_file_to_s3(s3_client, local_path, bucket_name, s3_key): | |
try: | |
s3_client.upload_file(local_path, bucket_name, s3_key) | |
logger.info(f"Uploadé {local_path} vers s3://{bucket_name}/{s3_key}") | |
except Exception as e: | |
logger.error(f"Erreur lors de l'upload de {local_path}: {str(e)}") | |
def upload_segments_to_s3(segments, bucket_name, prefix, segments_folder): | |
s3_client = setup_s3_client() | |
if not s3_client: | |
logger.error("Client S3 non disponible. Upload annulé.") | |
return 0 | |
uploaded_count = 0 | |
logger.info(f"Début de l'upload des segments vers S3 (bucket: {bucket_name}, préfixe: {prefix})") | |
for segment_path in tqdm(segments, desc="Upload des segments vers S3"): | |
try: | |
relative_path = os.path.relpath(segment_path, start=segments_folder) | |
s3_key = f"{prefix}/{relative_path.replace(os.sep, '/')}" | |
upload_file_to_s3(s3_client, segment_path, bucket_name, s3_key) | |
uploaded_count += 1 | |
except Exception as e: | |
logger.error(f"Erreur lors de l'upload de {segment_path}: {str(e)}") | |
logger.info(f"Upload terminé. {uploaded_count}/{len(segments)} fichiers envoyés vers S3.") | |
return uploaded_count | |
def main(): | |
# ====================== CHANGE ME - CONFIGURATION ====================== | |
# Mots-clés pour le filtrage des vidéos | |
FILTER_KEYWORDS = ["sid pa"] # | |
CHANNEL_URL = "https://www.youtube.com/@livenewsafrica/" | |
RAW_AUDIO_DIR = "audios_sidpa_wav" | |
SEGMENT_AUDIO_DIR = "audios_segments_wav" | |
# Durée des segments en millisecondes | |
SEGMENT_LENGTH_MS = 30 * 1000 # 30 secondes par défaut | |
# Configuration S3 | |
BUCKET_NAME = "moore-collection" | |
S3_PREFIX = "audios_wav" | |
USE_S3 = True # Mettre à True pour activer les opérations S3 | |
# ====================== FIN CHANGE ME ====================== | |
os.makedirs(RAW_AUDIO_DIR, exist_ok=True) | |
os.makedirs(SEGMENT_AUDIO_DIR, exist_ok=True) | |
logger.info("Démarrage du traitement des fichiers audio") | |
videos = get_videos_from_channel(CHANNEL_URL) | |
filtered_videos = filter_videos_by_keywords(videos, keywords=["sid pa"]) | |
download_youtube_audios(filtered_videos, RAW_AUDIO_DIR) | |
total_segments, processed_segments = segment_audio_files(RAW_AUDIO_DIR, SEGMENT_AUDIO_DIR, SEGMENT_LENGTH_MS) | |
if USE_S3: | |
upload_segments_to_s3(processed_segments, BUCKET_NAME, S3_PREFIX, SEGMENT_AUDIO_DIR) | |
logger.info("Traitement terminé avec succès") | |
if __name__ == "__main__": | |
main() | |