from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session import os, json, threading, time, signal, sys from datetime import datetime from extract_signed_segments_from_annotations import ClipExtractor, VideoClip import logging from dotenv import load_dotenv # Load environment variables load_dotenv() # Configure logging first logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Hugging Face specific configuration is_hf_space = os.getenv('SPACE_ID') is not None if is_hf_space: logger.info("Running in Hugging Face Spaces environment") # Allow insecure transport for development in HF os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' # Ensure port is set correctly os.environ['PORT'] = '7860' app = Flask(__name__) app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') # Configure session for HF if is_hf_space: app.config['SESSION_COOKIE_SECURE'] = False app.config['SESSION_COOKIE_HTTPONLY'] = True app.config['PERMANENT_SESSION_LIFETIME'] = 86400 # 24 hours # Directory paths VIDEO_DIR = os.path.abspath("data/videos") ANNOTATIONS_DIR = os.path.abspath("data/annotations") TEMP_DIR = os.path.abspath("data/temp") WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") ALIGNMENTS_DIR = os.path.abspath("data/alignments") TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") # Ensure all required directories exist for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: os.makedirs(directory, exist_ok=True) # Global dictionaries for progress tracking clip_extraction_status = {} transcription_progress_status = {} # Graceful shutdown handler def graceful_shutdown(signum, frame): """Handle graceful shutdown on signals.""" logger.info(f"Received signal {signum}, shutting down gracefully...") # Clean up as needed here sys.exit(0) # Register signal handlers signal.signal(signal.SIGTERM, graceful_shutdown) signal.signal(signal.SIGINT, graceful_shutdown) # Login required decorator def login_required(f): from functools import wraps @wraps(f) def decorated_function(*args, **kwargs): if 'user' not in session: logger.info(f"User not in session, redirecting to login") return redirect(url_for('login')) return f(*args, **kwargs) return decorated_function # Allow specific users (for testing) def is_allowed_user(username): allowed_users_env = os.getenv('ALLOWED_USERS', 'Perilon') # Default to your username allowed_users = [user.strip() for user in allowed_users_env.split(',')] return username in allowed_users or not is_hf_space # Allow all users in local dev def update_extraction_progress(video_id, current, total): percent = int((current / total) * 100) clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} def run_clip_extraction(video_id): try: base_dir = app.root_path extractor = ClipExtractor(base_dir) extractor.extract_clips_from_annotations( video_id, progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) ) if video_id in clip_extraction_status: status = clip_extraction_status[video_id] if status.get("percent", 0) < 100: update_extraction_progress(video_id, status["total"], status["total"]) else: update_extraction_progress(video_id, 1, 1) except Exception as e: logger.error(f"Error during clip extraction for {video_id}: {str(e)}") clip_extraction_status[video_id] = {"error": str(e)} def run_transcription(video_id): try: base_dir = app.root_path output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") # Check if transcription already exists and is valid. if os.path.exists(output_path) and os.path.getsize(output_path) > 0: logger.info(f"Using cached transcription for video {video_id}.") transcription_progress_status[video_id] = {"status": "completed", "percent": 100} return video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") transcription_progress_status[video_id] = {"status": "started", "percent": 10} # Check if AWS credentials are available if not os.environ.get('AWS_ACCESS_KEY_ID') or not os.environ.get('AWS_SECRET_ACCESS_KEY'): logger.warning("AWS credentials not found. Transcription will not work properly.") transcription_progress_status[video_id] = { "status": "error", "percent": 0, "message": "AWS credentials missing" } return # Run transcription via the imported function from get_transcription_with_amazon.py from get_transcription_with_amazon import get_word_timestamps word_timestamps = get_word_timestamps(video_path) with open(output_path, "w") as f: json.dump(word_timestamps, f, indent=4) transcription_progress_status[video_id] = {"status": "completed", "percent": 100} except Exception as e: logger.error(f"Error during transcription for {video_id}: {str(e)}") transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} # Authentication routes @app.route('/login') def login(): """Handle login for both local and HF environments.""" logger.info(f"Login route called. Headers: {dict(request.headers)}") if is_hf_space: username = request.headers.get('X-Spaces-Username') logger.info(f"Username from headers in login: {username}") if username and is_allowed_user(username): session['user'] = {'name': username, 'is_hf': True} return redirect(url_for('index')) else: # Redirect to the HF auth endpoint return redirect('/auth') else: # For local development session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} return redirect(url_for('index')) @app.route('/auth/callback') def auth_callback(): """This route will be called by Hugging Face after successful authentication.""" logger.info(f"Auth callback called. Headers: {dict(request.headers)}") if is_hf_space: # In Hugging Face Spaces, the user info is available in the request headers username = request.headers.get('X-Spaces-Username') if username: session['user'] = {'name': username, 'is_hf': True} return redirect(url_for('index')) else: return render_template('error.html', message="Authentication failed. No username provided.") return redirect(url_for('login')) @app.route('/auth') def auth(): """This route handles HF authentication.""" logger.info(f"Auth route called. Headers: {dict(request.headers)}") # Check for the username in headers before proceeding username = request.headers.get('X-Spaces-Username') logger.info(f"Username from headers in auth: {username}") if is_hf_space and username and is_allowed_user(username): logger.info(f"Setting user in session: {username}") session['user'] = {'name': username, 'is_hf': True} return redirect(url_for('index')) elif not is_hf_space: # For local development session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} return redirect(url_for('index')) else: # For HF with no valid username yet, render a simple page with auth information return render_template('error.html', message= "Waiting for Hugging Face authentication. If you continue to see this message, " "please make sure you're logged into Hugging Face and your username is allowed.") @app.before_request def check_auth(): """Check authentication before processing requests.""" # Skip authentication for certain routes and static files if request.path in ['/login', '/logout', '/auth', '/auth/callback', '/debug'] or request.path.startswith('/static/'): return # Log all request paths to help troubleshoot logger.debug(f"Request path: {request.path}, User in session: {'user' in session}") if is_hf_space: # Check for HF username header username = request.headers.get('X-Spaces-Username') if 'user' in session: logger.debug(f"User in session: {session['user']}") return if username and is_allowed_user(username): logger.info(f"Setting user from headers: {username}") session['user'] = {'name': username, 'is_hf': True} return # No valid user in session or headers logger.info(f"No authenticated user, redirecting to /auth") return redirect('/auth') elif 'user' not in session: return redirect(url_for('login')) @app.route('/logout') def logout(): """Clear session and redirect to login.""" session.clear() # Clear the entire session if is_hf_space: return redirect('/auth/logout') return redirect(url_for('login')) @app.route('/debug') def debug_info(): """Return debug information.""" info = { "session": dict(session) if session else None, "headers": dict(request.headers), "is_hf_space": is_hf_space, "allowed_users": os.getenv('ALLOWED_USERS', 'Perilon'), "app_config": {k: str(v) for k, v in app.config.items()}, "env_vars": { "SPACE_ID": os.getenv('SPACE_ID'), "PORT": os.getenv('PORT'), "DEBUG": os.getenv('DEBUG'), "AWS_KEYS_SET": bool(os.getenv('AWS_ACCESS_KEY_ID')) and bool(os.getenv('AWS_SECRET_ACCESS_KEY')) } } return jsonify(info) # Main application routes @app.route('/') @login_required def index(): """Main entry point, redirects to video selection.""" return redirect(url_for('select_video')) @app.route('/select_video') @login_required def select_video(): """Page to select a video for annotation.""" if not os.path.exists(VIDEO_DIR): return render_template('error.html', message="Video directory not found.") videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] video_ids = [os.path.splitext(v)[0] for v in videos] return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) @app.route('/player/') @login_required def player(video_id): """Video player page for annotation.""" return render_template('player.html', video_id=video_id, user=session.get('user')) @app.route('/videos') @login_required def get_videos(): """API endpoint to get available videos.""" if not os.path.exists(VIDEO_DIR): return jsonify({'error': 'Video directory not found'}), 404 videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] if not videos: return jsonify({'error': 'No videos found'}), 404 return jsonify(videos) @app.route('/video/') @login_required def serve_video(filename): """Serve a video file.""" if not os.path.exists(os.path.join(VIDEO_DIR, filename)): return jsonify({'error': 'Video not found'}), 404 return send_from_directory(VIDEO_DIR, filename) @app.route('/save_annotations', methods=['POST']) @login_required def save_annotations(): """Save annotation data.""" data = request.json if not data or 'video' not in data or 'timestamps' not in data: return jsonify({'success': False, 'message': 'Invalid data'}), 400 annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") annotation_data = { "video_name": data['video'] + ".mp4", "timestamps": sorted(data['timestamps']), "annotation_date": datetime.now().isoformat(), "annotated_by": session.get('user', {}).get('name', 'unknown') } with open(annotation_file, 'w') as f: json.dump(annotation_data, f, indent=4) return jsonify({'success': True, 'message': 'Annotations saved successfully'}) @app.route('/get_annotations/') @login_required def get_annotations(video_name): """Get annotations for a video.""" annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") if not os.path.exists(annotation_file): return jsonify({'error': 'No annotations found'}), 404 with open(annotation_file, 'r') as f: annotations = json.load(f) return jsonify(annotations) @app.route("/alignment/") @login_required def alignment_mode(video_id): """Page for aligning sign language with transcribed text.""" annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") if not os.path.exists(annotation_file): return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") with open(annotation_file, 'r') as f: annotations = json.load(f) return render_template( "alignment.html", video_id=video_id, total_clips=len(annotations['timestamps']) - 1, user=session.get('user') ) @app.route("/api/transcript/") @login_required def get_transcript(video_id): """Get transcript for a video.""" timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") logger.info(f"Attempting to load word timestamps from: {timestamps_file}") if not os.path.exists(timestamps_file): logger.warning(f"Word timestamps file not found: {timestamps_file}") return jsonify({ "status": "error", "message": "No word timestamps found for this video" }), 404 try: with open(timestamps_file, 'r') as f: word_data = json.load(f) full_text = " ".join(item["punctuated_word"] for item in word_data) words_with_times = [{ "word": item["punctuated_word"], "start": float(item["start_time"]), "end": float(item["end_time"]) } for item in word_data] logger.info(f"Successfully created transcript ({len(full_text)} characters)") return jsonify({ "status": "success", "text": full_text, "words": words_with_times }) except Exception as e: logger.error(f"Error processing word timestamps: {str(e)}") return jsonify({ "status": "error", "message": f"Error processing word timestamps: {str(e)}" }), 500 @app.route("/api/word_timestamps/") @login_required def get_word_timestamps(video_id): """Get word-level timestamps for a video.""" timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") logger.info(f"Attempting to load word timestamps from: {timestamps_file}") if not os.path.exists(timestamps_file): logger.warning(f"Word timestamps file not found: {timestamps_file}") return jsonify({ "status": "error", "message": "No word timestamps found for this video" }), 404 try: with open(timestamps_file, 'r') as f: word_data = json.load(f) logger.info(f"Successfully loaded {len(word_data)} word timestamps") return jsonify({ "status": "success", "words": word_data }) except Exception as e: logger.error(f"Error processing word timestamps: {str(e)}") return jsonify({ "status": "error", "message": f"Error processing word timestamps: {str(e)}" }), 500 @app.route("/api/clips/") @login_required def get_video_clips(video_id): """Get clips for a video.""" try: annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") if not os.path.exists(annotation_file): raise FileNotFoundError("Annotations not found") with open(annotation_file, 'r') as f: annotations = json.load(f) timestamps = annotations['timestamps'] clips = [] for i in range(len(timestamps)-1): clips.append({ "index": i, "start": timestamps[i], "end": timestamps[i+1], "path": f"/clip/{video_id}/{i}" }) return jsonify({ "status": "success", "clips": clips }) except Exception as e: logger.error(f"Error getting clips: {str(e)}") return jsonify({ "status": "error", "message": str(e) }), 500 @app.route("/clip//") @login_required def serve_clip(video_id, clip_index): """Serve a specific clip.""" clip_path = os.path.join( TEMP_DIR, f"{video_id}_clip_{clip_index:03d}.mp4" ) logger.info(f"Attempting to serve clip: {clip_path}") if not os.path.exists(clip_path): logger.error(f"Clip not found: {clip_path}") return jsonify({ "status": "error", "message": "Clip not found" }), 404 return send_file(clip_path, mimetype="video/mp4") @app.route("/api/save_alignments", methods=["POST"]) @login_required def save_alignments(): """Save alignment data.""" try: data = request.json if not data or 'video_id' not in data or 'alignments' not in data: return jsonify({'success': False, 'message': 'Invalid data'}), 400 # Add user information to the alignments for alignment in data['alignments']: if alignment: alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") with open(output_path, "w") as f: json.dump(data['alignments'], f, indent=2) return jsonify({ "success": True, "message": "Alignments saved successfully" }) except Exception as e: logger.error(f"Error saving alignments: {str(e)}") return jsonify({ "success": False, "message": str(e) }), 500 @app.route("/api/extract_clips/") @login_required def extract_clips_for_video(video_id): """Extract clips and start transcription for a video.""" status = clip_extraction_status.get(video_id, {}) if status.get("percent", 0) < 100: thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) thread.start() if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) thread_trans.start() return jsonify({"status": "started"}) @app.route("/api/clip_progress/") @login_required def clip_progress(video_id): """Get clip extraction progress.""" progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) return jsonify(progress) @app.route("/api/transcription_progress/") @login_required def transcription_progress(video_id): """Get transcription progress.""" progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) return jsonify(progress) if __name__ == '__main__': try: port = int(os.getenv('PORT', 5000)) print(f"Starting app on port {port}, debug mode: {app.debug}") # Explicitly create the session directory if needed os.makedirs('flask_session', exist_ok=True) app.run(host='0.0.0.0', port=port, debug=True) except Exception as e: print(f"Error starting the application: {e}") import traceback traceback.print_exc()