from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session |
import os, json, threading, time |
from datetime import datetime |
from extract_signed_segments_from_annotations import ClipExtractor, VideoClip |
import logging |
from dotenv import load_dotenv |
load_dotenv() |
app = Flask(__name__) |
app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') |
logging.basicConfig(level=logging.INFO) |
VIDEO_DIR = os.path.abspath("data/videos") |
ANNOTATIONS_DIR = os.path.abspath("data/annotations") |
TEMP_DIR = os.path.abspath("data/temp") |
WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") |
ALIGNMENTS_DIR = os.path.abspath("data/alignments") |
TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") |
os.makedirs(directory, exist_ok=True) |
clip_extraction_status = {} |
transcription_progress_status = {} |
is_hf_space = os.getenv('SPACE_ID') is not None |
def login_required(f): |
from functools import wraps |
@wraps(f) |
def decorated_function(*args, **kwargs): |
if 'user' not in session: |
return redirect(url_for('login')) |
return f(*args, **kwargs) |
return decorated_function |
def is_allowed_user(username): |
allowed_users = ['Perilon'] |
return username in allowed_users or not is_hf_space |
def update_extraction_progress(video_id, current, total): |
percent = int((current / total) * 100) |
clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} |
def run_clip_extraction(video_id): |
try: |
base_dir = app.root_path |
extractor = ClipExtractor(base_dir) |
extractor.extract_clips_from_annotations( |
video_id, |
progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) |
) |
if video_id in clip_extraction_status: |
status = clip_extraction_status[video_id] |
if status.get("percent", 0) < 100: |
update_extraction_progress(video_id, status["total"], status["total"]) |
else: |
update_extraction_progress(video_id, 1, 1) |
except Exception as e: |
logging.error(f"Error during clip extraction for {video_id}: {str(e)}") |
clip_extraction_status[video_id] = {"error": str(e)} |
def run_transcription(video_id): |
try: |
base_dir = app.root_path |
output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
app.logger.info(f"Using cached transcription for video {video_id}.") |
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} |
return |
video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") |
transcription_progress_status[video_id] = {"status": "started", "percent": 10} |
from get_transcription_with_amazon import get_word_timestamps |
word_timestamps = get_word_timestamps(video_path) |
with open(output_path, "w") as f: |
json.dump(word_timestamps, f, indent=4) |
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} |
except Exception as e: |
app.logger.error(f"Error during transcription for {video_id}: {str(e)}") |
transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} |
@app.route('/login') |
def login(): |
if is_hf_space: |
return redirect('/auth/login') |
else: |
session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} |
return redirect(url_for('index')) |
@app.route('/auth/callback') |
def auth_callback(): |
if is_hf_space: |
username = request.headers.get('X-Spaces-Username') |
if username: |
session['user'] = {'name': username, 'is_hf': True} |
return redirect(url_for('index')) |
else: |
return render_template('error.html', message="Authentication failed. No username provided.") |
return redirect(url_for('login')) |
@app.route('/auth') |
def auth(): |
if not is_hf_space: |
session['user'] = {'name': 'Perilon', 'is_mock': True} |
return redirect(url_for('index')) |
@app.before_request |
def check_auth(): |
if request.path in ['/login', '/logout', '/auth/callback'] or request.path.startswith('/static/'): |
return |
if is_hf_space: |
username = request.headers.get('X-Spaces-Username') |
if username and is_allowed_user(username): |
if 'user' not in session or session['user'].get('name') != username: |
session['user'] = {'name': username, 'is_hf': True} |
elif 'user' not in session: |
return redirect(url_for('login')) |
elif 'user' not in session: |
return redirect(url_for('login')) |
@app.route('/logout') |
def logout(): |
session.clear() |
if is_hf_space: |
return redirect('/auth/logout') |
return redirect(url_for('login')) |
@app.route('/') |
@login_required |
def index(): |
return redirect(url_for('select_video')) |
@app.route('/select_video') |
@login_required |
def select_video(): |
if not os.path.exists(VIDEO_DIR): |
return render_template('error.html', message="Video directory not found.") |
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] |
video_ids = [os.path.splitext(v)[0] for v in videos] |
return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) |
@app.route('/player/<video_id>') |
@login_required |
def player(video_id): |
return render_template('player.html', video_id=video_id, user=session.get('user')) |
@app.route('/videos') |
@login_required |
def get_videos(): |
if not os.path.exists(VIDEO_DIR): |
return jsonify({'error': 'Video directory not found'}), 404 |
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] |
if not videos: |
return jsonify({'error': 'No videos found'}), 404 |
return jsonify(videos) |
@app.route('/video/<path:filename>') |
@login_required |
def serve_video(filename): |
if not os.path.exists(os.path.join(VIDEO_DIR, filename)): |
return jsonify({'error': 'Video not found'}), 404 |
return send_from_directory(VIDEO_DIR, filename) |
@app.route('/save_annotations', methods=['POST']) |
@login_required |
def save_annotations(): |
data = request.json |
if not data or 'video' not in data or 'timestamps' not in data: |
return jsonify({'success': False, 'message': 'Invalid data'}), 400 |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") |
annotation_data = { |
"video_name": data['video'] + ".mp4", |
"timestamps": sorted(data['timestamps']), |
"annotation_date": datetime.now().isoformat(), |
"annotated_by": session.get('user', {}).get('name', 'unknown') |
} |
with open(annotation_file, 'w') as f: |
json.dump(annotation_data, f, indent=4) |
return jsonify({'success': True, 'message': 'Annotations saved successfully'}) |
@app.route('/get_annotations/<path:video_name>') |
@login_required |
def get_annotations(video_name): |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") |
if not os.path.exists(annotation_file): |
return jsonify({'error': 'No annotations found'}), 404 |
with open(annotation_file, 'r') as f: |
annotations = json.load(f) |
return jsonify(annotations) |
@app.route("/alignment/<video_id>") |
@login_required |
def alignment_mode(video_id): |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") |
if not os.path.exists(annotation_file): |
return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") |
with open(annotation_file, 'r') as f: |
annotations = json.load(f) |
return render_template( |
"alignment.html", |
video_id=video_id, |
total_clips=len(annotations['timestamps']) - 1, |
user=session.get('user') |
) |
@app.route("/api/transcript/<video_id>") |
@login_required |
def get_transcript(video_id): |
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") |
if not os.path.exists(timestamps_file): |
app.logger.warning(f"Word timestamps file not found: {timestamps_file}") |
return jsonify({ |
"status": "error", |
"message": "No word timestamps found for this video" |
}), 404 |
try: |
with open(timestamps_file, 'r') as f: |
word_data = json.load(f) |
full_text = " ".join(item["punctuated_word"] for item in word_data) |
words_with_times = [{ |
"word": item["punctuated_word"], |
"start": float(item["start_time"]), |
"end": float(item["end_time"]) |
} for item in word_data] |
app.logger.info(f"Successfully created transcript ({len(full_text)} characters)") |
return jsonify({ |
"status": "success", |
"text": full_text, |
"words": words_with_times |
}) |
except Exception as e: |
app.logger.error(f"Error processing word timestamps: {str(e)}") |
return jsonify({ |
"status": "error", |
"message": f"Error processing word timestamps: {str(e)}" |
}), 500 |
@app.route("/api/word_timestamps/<video_id>") |
@login_required |
def get_word_timestamps(video_id): |
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") |
if not os.path.exists(timestamps_file): |
app.logger.warning(f"Word timestamps file not found: {timestamps_file}") |
return jsonify({ |
"status": "error", |
"message": "No word timestamps found for this video" |
}), 404 |
try: |
with open(timestamps_file, 'r') as f: |
word_data = json.load(f) |
app.logger.info(f"Successfully loaded {len(word_data)} word timestamps") |
return jsonify({ |
"status": "success", |
"words": word_data |
}) |
except Exception as e: |
app.logger.error(f"Error processing word timestamps: {str(e)}") |
return jsonify({ |
"status": "error", |
"message": f"Error processing word timestamps: {str(e)}" |
}), 500 |
@app.route("/api/clips/<video_id>") |
@login_required |
def get_video_clips(video_id): |
try: |
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") |
if not os.path.exists(annotation_file): |
raise FileNotFoundError("Annotations not found") |
with open(annotation_file, 'r') as f: |
annotations = json.load(f) |
timestamps = annotations['timestamps'] |
clips = [] |
for i in range(len(timestamps)-1): |
clips.append({ |
"index": i, |
"start": timestamps[i], |
"end": timestamps[i+1], |
"path": f"/clip/{video_id}/{i}" |
}) |
return jsonify({ |
"status": "success", |
"clips": clips |
}) |
except Exception as e: |
app.logger.error(f"Error getting clips: {str(e)}") |
return jsonify({ |
"status": "error", |
"message": str(e) |
}), 500 |
@app.route("/clip/<video_id>/<int:clip_index>") |
@login_required |
def serve_clip(video_id, clip_index): |
clip_path = os.path.join( |
f"{video_id}_clip_{clip_index:03d}.mp4" |
) |
app.logger.info(f"Attempting to serve clip: {clip_path}") |
if not os.path.exists(clip_path): |
app.logger.error(f"Clip not found: {clip_path}") |
return jsonify({ |
"status": "error", |
"message": "Clip not found" |
}), 404 |
return send_file(clip_path, mimetype="video/mp4") |
@app.route("/api/save_alignments", methods=["POST"]) |
@login_required |
def save_alignments(): |
try: |
data = request.json |
if not data or 'video_id' not in data or 'alignments' not in data: |
return jsonify({'success': False, 'message': 'Invalid data'}), 400 |
for alignment in data['alignments']: |
if alignment: |
alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') |
output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") |
with open(output_path, "w") as f: |
json.dump(data['alignments'], f, indent=2) |
return jsonify({ |
"success": True, |
"message": "Alignments saved successfully" |
}) |
except Exception as e: |
app.logger.error(f"Error saving alignments: {str(e)}") |
return jsonify({ |
"success": False, |
"message": str(e) |
}), 500 |
@app.route("/api/extract_clips/<video_id>") |
@login_required |
def extract_clips_for_video(video_id): |
status = clip_extraction_status.get(video_id, {}) |
if status.get("percent", 0) < 100: |
thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) |
thread.start() |
if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: |
thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) |
thread_trans.start() |
return jsonify({"status": "started"}) |
@app.route("/api/clip_progress/<video_id>") |
@login_required |
def clip_progress(video_id): |
progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) |
return jsonify(progress) |
@app.route("/api/transcription_progress/<video_id>") |
@login_required |
def transcription_progress(video_id): |
progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) |
return jsonify(progress) |
if __name__ == '__main__': |
port = int(os.getenv('PORT', 5000)) |
app.run(host='', port=port, debug=True) |