Spaces:
Runtime error
Runtime error
| import os | |
| import json | |
| import time | |
| from datetime import datetime | |
| from utils.video_processor import VideoProcessor | |
| from utils.mediapipe_utils import MediaPipeProcessor | |
| from utils.huggingface_utils import HuggingFaceUploader | |
| class LibrasProcessingPipeline: | |
| def __init__(self, config): | |
| self.config = config | |
| self.setup_directories() | |
| def setup_directories(self): | |
| """Cria os diretórios necessários automaticamente""" | |
| directories = [ | |
| self.config['input_dir'], | |
| self.config['normalized_dir'], | |
| self.config['keypoints_dir'], | |
| self.config['huggingface_dir'] | |
| ] | |
| for directory in directories: | |
| os.makedirs(directory, exist_ok=True) | |
| print(f"Diretório criado/verificado: {directory}") | |
| def run_pipeline(self, video_filename): | |
| """Executa o pipeline completo""" | |
| print(f"Iniciando processamento do vídeo: {video_filename}") | |
| try: | |
| # 1. Normalização do vídeo | |
| print("1. Normalizando vídeo...") | |
| normalized_path = self.normalize_video(video_filename) | |
| # 2. Extração de keypoints com MediaPipe | |
| print("2. Extraindo keypoints...") | |
| keypoints_data = self.extract_keypoints(normalized_path) | |
| # 3. Salvar JSON com keypoints | |
| print("3. Salvando keypoints...") | |
| json_path = self.save_keypoints(keypoints_data, video_filename) | |
| # 4. Preparar para Hugging Face | |
| print("4. Preparando para Hugging Face...") | |
| hf_ready_path = self.prepare_for_huggingface( | |
| normalized_path, json_path, video_filename | |
| ) | |
| # 5. Upload para Hugging Face (opcional) | |
| if self.config.get('upload_to_hf', False): | |
| print("5. Upload para Hugging Face...") | |
| self.upload_to_huggingface(hf_ready_path) | |
| print("Pipeline concluído com sucesso!") | |
| return { | |
| 'normalized_video': normalized_path, | |
| 'keypoints_json': json_path, | |
| 'hf_ready': hf_ready_path | |
| } | |
| except Exception as e: | |
| print(f"Erro no pipeline: {e}") | |
| import traceback | |
| traceback.print_exc() | |
| return None | |
| def normalize_video(self, video_filename): | |
| """Normaliza o vídeo usando OpenCV/FFmpeg""" | |
| processor = VideoProcessor(self.config) | |
| input_path = os.path.join(self.config['input_dir'], video_filename) | |
| # Verifica se o arquivo existe | |
| if not os.path.exists(input_path): | |
| raise FileNotFoundError(f"Vídeo não encontrado: {input_path}") | |
| output_filename = f"normalized_{video_filename}" | |
| output_path = os.path.join(self.config['normalized_dir'], output_filename) | |
| return processor.normalize_video(input_path, output_path) | |
| def extract_keypoints(self, video_path): | |
| """Extrai keypoints usando MediaPipe""" | |
| processor = MediaPipeProcessor(self.config) | |
| return processor.process_video(video_path) | |
| def save_keypoints(self, keypoints_data, original_filename): | |
| """Salva os keypoints em formato JSON""" | |
| base_name = os.path.splitext(original_filename)[0] | |
| output_filename = f"{base_name}_keypoints.json" | |
| output_path = os.path.join(self.config['keypoints_dir'], output_filename) | |
| # Converter numpy arrays para listas para serialização JSON | |
| serializable_data = [] | |
| for frame_data in keypoints_data: | |
| serializable_frame = {} | |
| for key, value in frame_data.items(): | |
| if hasattr(value, 'tolist'): | |
| serializable_frame[key] = value.tolist() | |
| else: | |
| serializable_frame[key] = value | |
| serializable_data.append(serializable_frame) | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(serializable_data, f, indent=2, ensure_ascii=False) | |
| print(f"Keypoints salvos em: {output_path}") | |
| return output_path | |
| def prepare_for_huggingface(self, video_path, json_path, original_filename): | |
| """Prepara os arquivos para upload no Hugging Face""" | |
| base_name = os.path.splitext(original_filename)[0] | |
| hf_dir = os.path.join(self.config['huggingface_dir'], base_name) | |
| os.makedirs(hf_dir, exist_ok=True) | |
| # Copiar vídeo normalizado | |
| video_dest = os.path.join(hf_dir, f"{base_name}.mp4") | |
| if os.path.exists(video_path): | |
| import shutil | |
| shutil.copy2(video_path, video_dest) | |
| print(f"Vídeo copiado para: {video_dest}") | |
| # Copiar JSON de keypoints | |
| json_dest = os.path.join(hf_dir, f"{base_name}_keypoints.json") | |
| if os.path.exists(json_path): | |
| import shutil | |
| shutil.copy2(json_path, json_dest) | |
| print(f"JSON copiado para: {json_dest}") | |
| # Criar metadata | |
| metadata = { | |
| 'processing_date': datetime.now().isoformat(), | |
| 'original_video': original_filename, | |
| 'keypoints_format': 'MediaPipe Holistic', | |
| 'frame_count': len(self.load_json(json_path)) if os.path.exists(json_path) else 0, | |
| 'video_resolution': self.get_video_info(video_path) if os.path.exists(video_path) else {} | |
| } | |
| metadata_path = os.path.join(hf_dir, 'metadata.json') | |
| with open(metadata_path, 'w', encoding='utf-8') as f: | |
| json.dump(metadata, f, indent=2, ensure_ascii=False) | |
| print(f"Metadata criado em: {metadata_path}") | |
| return hf_dir | |
| def upload_to_huggingface(self, directory_path): | |
| """Faz upload para o Hugging Face Space""" | |
| uploader = HuggingFaceUploader(self.config) | |
| return uploader.upload_directory(directory_path) | |
| def load_json(self, json_path): | |
| """Carrega arquivo JSON""" | |
| if not os.path.exists(json_path): | |
| return [] | |
| with open(json_path, 'r', encoding='utf-8') as f: | |
| return json.load(f) | |
| def get_video_info(self, video_path): | |
| """Obtém informações do vídeo""" | |
| import cv2 | |
| cap = cv2.VideoCapture(video_path) | |
| width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) | |
| height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) | |
| fps = cap.get(cv2.CAP_PROP_FPS) | |
| cap.release() | |
| return {'width': width, 'height': height, 'fps': fps} | |
| # Configuração | |
| config = { | |
| 'input_dir': 'input', | |
| 'normalized_dir': 'output/normalized', | |
| 'keypoints_dir': 'output/keypoints', | |
| 'huggingface_dir': 'output/huggingface', | |
| 'video_normalization': { | |
| 'target_resolution': (640, 480), | |
| 'target_fps': 30, | |
| 'normalize_brightness': True, | |
| 'enhance_contrast': True | |
| }, | |
| 'mediapipe_config': { | |
| 'static_image_mode': False, | |
| 'model_complexity': 1, | |
| 'smooth_landmarks': True, | |
| 'min_detection_confidence': 0.5, | |
| 'min_tracking_confidence': 0.5 | |
| }, | |
| 'huggingface': { | |
| 'repo_id': 'your-username/your-repo-name', | |
| 'token': 'your-hf-token', | |
| 'upload_to_hf': False | |
| } | |
| } | |
| if __name__ == "__main__": | |
| # Criar diretórios automaticamente | |
| pipeline = LibrasProcessingPipeline(config) | |
| # Verificar se existe vídeo na pasta input | |
| input_dir = config['input_dir'] | |
| video_files = [f for f in os.listdir(input_dir) if f.endswith(('.mp4', '.avi', '.mov'))] | |
| if not video_files: | |
| print(f"Nenhum vídeo encontrado na pasta '{input_dir}'") | |
| print("Por favor, coloque um vídeo na pasta input/") | |
| else: | |
| # Processar o primeiro vídeo encontrado | |
| video_filename = video_files[0] | |
| print(f"Processando: {video_filename}") | |
| start_time = time.time() | |
| result = pipeline.run_pipeline(video_filename) | |
| end_time = time.time() | |
| if result: | |
| print(f"\n✅ Processamento concluído em {end_time - start_time:.2f} segundos") | |
| print(f"📁 Resultados:") | |
| for key, path in result.items(): | |
| print(f" {key}: {path}") | |
| else: | |
| print("❌ Falha no processamento") |