|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from flask import Flask, render_template, jsonify, request, send_from_directory, send_file, redirect, url_for, session |
|
import os, json, threading, time |
|
from datetime import datetime |
|
from extract_signed_segments_from_annotations import ClipExtractor, VideoClip |
|
import logging |
|
from dotenv import load_dotenv |
|
|
|
|
|
load_dotenv() |
|
|
|
app = Flask(__name__) |
|
app.secret_key = os.getenv('SECRET_KEY', 'dev_key_for_testing') |
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
VIDEO_DIR = os.path.abspath("data/videos") |
|
ANNOTATIONS_DIR = os.path.abspath("data/annotations") |
|
TEMP_DIR = os.path.abspath("data/temp") |
|
WORD_TIMESTAMPS_DIR = os.path.abspath("data/word_timestamps") |
|
ALIGNMENTS_DIR = os.path.abspath("data/alignments") |
|
TRANSCRIPTS_DIR = os.path.abspath("data/transcripts") |
|
|
|
|
|
for directory in [VIDEO_DIR, ANNOTATIONS_DIR, TEMP_DIR, WORD_TIMESTAMPS_DIR, ALIGNMENTS_DIR, TRANSCRIPTS_DIR]: |
|
os.makedirs(directory, exist_ok=True) |
|
|
|
|
|
clip_extraction_status = {} |
|
transcription_progress_status = {} |
|
|
|
|
|
is_hf_space = os.getenv('SPACE_ID') is not None |
|
|
|
|
|
def login_required(f): |
|
from functools import wraps |
|
@wraps(f) |
|
def decorated_function(*args, **kwargs): |
|
if 'user' not in session: |
|
return redirect(url_for('login')) |
|
return f(*args, **kwargs) |
|
return decorated_function |
|
|
|
|
|
def is_allowed_user(username): |
|
allowed_users = ['Perilon'] |
|
return username in allowed_users or not is_hf_space |
|
|
|
def update_extraction_progress(video_id, current, total): |
|
percent = int((current / total) * 100) |
|
clip_extraction_status[video_id] = {"current": current, "total": total, "percent": percent} |
|
|
|
def run_clip_extraction(video_id): |
|
try: |
|
base_dir = app.root_path |
|
extractor = ClipExtractor(base_dir) |
|
extractor.extract_clips_from_annotations( |
|
video_id, |
|
progress_callback=lambda current, total: update_extraction_progress(video_id, current, total) |
|
) |
|
if video_id in clip_extraction_status: |
|
status = clip_extraction_status[video_id] |
|
if status.get("percent", 0) < 100: |
|
update_extraction_progress(video_id, status["total"], status["total"]) |
|
else: |
|
update_extraction_progress(video_id, 1, 1) |
|
except Exception as e: |
|
logging.error(f"Error during clip extraction for {video_id}: {str(e)}") |
|
clip_extraction_status[video_id] = {"error": str(e)} |
|
|
|
def run_transcription(video_id): |
|
try: |
|
base_dir = app.root_path |
|
output_path = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
|
|
|
|
|
if os.path.exists(output_path) and os.path.getsize(output_path) > 0: |
|
app.logger.info(f"Using cached transcription for video {video_id}.") |
|
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} |
|
return |
|
|
|
video_path = os.path.join(base_dir, "data", "videos", f"{video_id}.mp4") |
|
transcription_progress_status[video_id] = {"status": "started", "percent": 10} |
|
|
|
|
|
from get_transcription_with_amazon import get_word_timestamps |
|
word_timestamps = get_word_timestamps(video_path) |
|
|
|
with open(output_path, "w") as f: |
|
json.dump(word_timestamps, f, indent=4) |
|
|
|
transcription_progress_status[video_id] = {"status": "completed", "percent": 100} |
|
except Exception as e: |
|
app.logger.error(f"Error during transcription for {video_id}: {str(e)}") |
|
transcription_progress_status[video_id] = {"status": "error", "percent": 0, "message": str(e)} |
|
|
|
|
|
@app.route('/login') |
|
def login(): |
|
if is_hf_space: |
|
|
|
return redirect('/auth/login') |
|
else: |
|
|
|
session['user'] = {'name': 'LocalDeveloper', 'is_mock': True} |
|
return redirect(url_for('index')) |
|
|
|
@app.route('/auth/callback') |
|
def auth_callback(): |
|
|
|
if is_hf_space: |
|
|
|
username = request.headers.get('X-Spaces-Username') |
|
if username: |
|
session['user'] = {'name': username, 'is_hf': True} |
|
return redirect(url_for('index')) |
|
else: |
|
return render_template('error.html', message="Authentication failed. No username provided.") |
|
return redirect(url_for('login')) |
|
|
|
@app.route('/auth') |
|
def auth(): |
|
|
|
|
|
if not is_hf_space: |
|
session['user'] = {'name': 'Perilon', 'is_mock': True} |
|
return redirect(url_for('index')) |
|
|
|
@app.before_request |
|
def check_auth(): |
|
|
|
if request.path in ['/login', '/logout', '/auth/callback'] or request.path.startswith('/static/'): |
|
return |
|
|
|
|
|
if is_hf_space: |
|
username = request.headers.get('X-Spaces-Username') |
|
if username and is_allowed_user(username): |
|
|
|
if 'user' not in session or session['user'].get('name') != username: |
|
session['user'] = {'name': username, 'is_hf': True} |
|
elif 'user' not in session: |
|
return redirect(url_for('login')) |
|
|
|
elif 'user' not in session: |
|
return redirect(url_for('login')) |
|
|
|
@app.route('/logout') |
|
def logout(): |
|
session.clear() |
|
if is_hf_space: |
|
return redirect('/auth/logout') |
|
return redirect(url_for('login')) |
|
|
|
|
|
@app.route('/') |
|
@login_required |
|
def index(): |
|
return redirect(url_for('select_video')) |
|
|
|
@app.route('/select_video') |
|
@login_required |
|
def select_video(): |
|
if not os.path.exists(VIDEO_DIR): |
|
return render_template('error.html', message="Video directory not found.") |
|
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith('.mp4')] |
|
video_ids = [os.path.splitext(v)[0] for v in videos] |
|
return render_template('select_video.html', video_ids=video_ids, user=session.get('user')) |
|
|
|
@app.route('/player/<video_id>') |
|
@login_required |
|
def player(video_id): |
|
return render_template('player.html', video_id=video_id, user=session.get('user')) |
|
|
|
@app.route('/videos') |
|
@login_required |
|
def get_videos(): |
|
if not os.path.exists(VIDEO_DIR): |
|
return jsonify({'error': 'Video directory not found'}), 404 |
|
videos = [f for f in os.listdir(VIDEO_DIR) if f.endswith(('.mp4', '.avi', '.mov'))] |
|
if not videos: |
|
return jsonify({'error': 'No videos found'}), 404 |
|
return jsonify(videos) |
|
|
|
@app.route('/video/<path:filename>') |
|
@login_required |
|
def serve_video(filename): |
|
if not os.path.exists(os.path.join(VIDEO_DIR, filename)): |
|
return jsonify({'error': 'Video not found'}), 404 |
|
return send_from_directory(VIDEO_DIR, filename) |
|
|
|
@app.route('/save_annotations', methods=['POST']) |
|
@login_required |
|
def save_annotations(): |
|
data = request.json |
|
if not data or 'video' not in data or 'timestamps' not in data: |
|
return jsonify({'success': False, 'message': 'Invalid data'}), 400 |
|
|
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{data['video']}_annotations.json") |
|
annotation_data = { |
|
"video_name": data['video'] + ".mp4", |
|
"timestamps": sorted(data['timestamps']), |
|
"annotation_date": datetime.now().isoformat(), |
|
"annotated_by": session.get('user', {}).get('name', 'unknown') |
|
} |
|
with open(annotation_file, 'w') as f: |
|
json.dump(annotation_data, f, indent=4) |
|
return jsonify({'success': True, 'message': 'Annotations saved successfully'}) |
|
|
|
@app.route('/get_annotations/<path:video_name>') |
|
@login_required |
|
def get_annotations(video_name): |
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_name}_annotations.json") |
|
if not os.path.exists(annotation_file): |
|
return jsonify({'error': 'No annotations found'}), 404 |
|
with open(annotation_file, 'r') as f: |
|
annotations = json.load(f) |
|
return jsonify(annotations) |
|
|
|
@app.route("/alignment/<video_id>") |
|
@login_required |
|
def alignment_mode(video_id): |
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") |
|
if not os.path.exists(annotation_file): |
|
return render_template("error.html", message="No annotations found for this video. Please annotate the video first.") |
|
with open(annotation_file, 'r') as f: |
|
annotations = json.load(f) |
|
return render_template( |
|
"alignment.html", |
|
video_id=video_id, |
|
total_clips=len(annotations['timestamps']) - 1, |
|
user=session.get('user') |
|
) |
|
|
|
@app.route("/api/transcript/<video_id>") |
|
@login_required |
|
def get_transcript(video_id): |
|
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
|
app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") |
|
if not os.path.exists(timestamps_file): |
|
app.logger.warning(f"Word timestamps file not found: {timestamps_file}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": "No word timestamps found for this video" |
|
}), 404 |
|
try: |
|
with open(timestamps_file, 'r') as f: |
|
word_data = json.load(f) |
|
full_text = " ".join(item["punctuated_word"] for item in word_data) |
|
words_with_times = [{ |
|
"word": item["punctuated_word"], |
|
"start": float(item["start_time"]), |
|
"end": float(item["end_time"]) |
|
} for item in word_data] |
|
app.logger.info(f"Successfully created transcript ({len(full_text)} characters)") |
|
return jsonify({ |
|
"status": "success", |
|
"text": full_text, |
|
"words": words_with_times |
|
}) |
|
except Exception as e: |
|
app.logger.error(f"Error processing word timestamps: {str(e)}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": f"Error processing word timestamps: {str(e)}" |
|
}), 500 |
|
|
|
@app.route("/api/word_timestamps/<video_id>") |
|
@login_required |
|
def get_word_timestamps(video_id): |
|
timestamps_file = os.path.join(WORD_TIMESTAMPS_DIR, f"{video_id}_word_timestamps.json") |
|
app.logger.info(f"Attempting to load word timestamps from: {timestamps_file}") |
|
if not os.path.exists(timestamps_file): |
|
app.logger.warning(f"Word timestamps file not found: {timestamps_file}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": "No word timestamps found for this video" |
|
}), 404 |
|
try: |
|
with open(timestamps_file, 'r') as f: |
|
word_data = json.load(f) |
|
app.logger.info(f"Successfully loaded {len(word_data)} word timestamps") |
|
return jsonify({ |
|
"status": "success", |
|
"words": word_data |
|
}) |
|
except Exception as e: |
|
app.logger.error(f"Error processing word timestamps: {str(e)}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": f"Error processing word timestamps: {str(e)}" |
|
}), 500 |
|
|
|
@app.route("/api/clips/<video_id>") |
|
@login_required |
|
def get_video_clips(video_id): |
|
try: |
|
annotation_file = os.path.join(ANNOTATIONS_DIR, f"{video_id}_annotations.json") |
|
if not os.path.exists(annotation_file): |
|
raise FileNotFoundError("Annotations not found") |
|
with open(annotation_file, 'r') as f: |
|
annotations = json.load(f) |
|
timestamps = annotations['timestamps'] |
|
clips = [] |
|
for i in range(len(timestamps)-1): |
|
clips.append({ |
|
"index": i, |
|
"start": timestamps[i], |
|
"end": timestamps[i+1], |
|
"path": f"/clip/{video_id}/{i}" |
|
}) |
|
return jsonify({ |
|
"status": "success", |
|
"clips": clips |
|
}) |
|
except Exception as e: |
|
app.logger.error(f"Error getting clips: {str(e)}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": str(e) |
|
}), 500 |
|
|
|
@app.route("/clip/<video_id>/<int:clip_index>") |
|
@login_required |
|
def serve_clip(video_id, clip_index): |
|
clip_path = os.path.join( |
|
TEMP_DIR, |
|
f"{video_id}_clip_{clip_index:03d}.mp4" |
|
) |
|
app.logger.info(f"Attempting to serve clip: {clip_path}") |
|
if not os.path.exists(clip_path): |
|
app.logger.error(f"Clip not found: {clip_path}") |
|
return jsonify({ |
|
"status": "error", |
|
"message": "Clip not found" |
|
}), 404 |
|
return send_file(clip_path, mimetype="video/mp4") |
|
|
|
@app.route("/api/save_alignments", methods=["POST"]) |
|
@login_required |
|
def save_alignments(): |
|
try: |
|
data = request.json |
|
if not data or 'video_id' not in data or 'alignments' not in data: |
|
return jsonify({'success': False, 'message': 'Invalid data'}), 400 |
|
|
|
|
|
for alignment in data['alignments']: |
|
if alignment: |
|
alignment['aligned_by'] = session.get('user', {}).get('name', 'unknown') |
|
|
|
output_path = os.path.join(ALIGNMENTS_DIR, f"{data['video_id']}.json") |
|
with open(output_path, "w") as f: |
|
json.dump(data['alignments'], f, indent=2) |
|
return jsonify({ |
|
"success": True, |
|
"message": "Alignments saved successfully" |
|
}) |
|
except Exception as e: |
|
app.logger.error(f"Error saving alignments: {str(e)}") |
|
return jsonify({ |
|
"success": False, |
|
"message": str(e) |
|
}), 500 |
|
|
|
@app.route("/api/extract_clips/<video_id>") |
|
@login_required |
|
def extract_clips_for_video(video_id): |
|
status = clip_extraction_status.get(video_id, {}) |
|
if status.get("percent", 0) < 100: |
|
thread = threading.Thread(target=run_clip_extraction, args=(video_id,)) |
|
thread.start() |
|
if video_id not in transcription_progress_status or transcription_progress_status.get(video_id, {}).get("percent", 0) < 100: |
|
thread_trans = threading.Thread(target=run_transcription, args=(video_id,)) |
|
thread_trans.start() |
|
return jsonify({"status": "started"}) |
|
|
|
@app.route("/api/clip_progress/<video_id>") |
|
@login_required |
|
def clip_progress(video_id): |
|
progress = clip_extraction_status.get(video_id, {"current": 0, "total": 0, "percent": 0}) |
|
return jsonify(progress) |
|
|
|
@app.route("/api/transcription_progress/<video_id>") |
|
@login_required |
|
def transcription_progress(video_id): |
|
progress = transcription_progress_status.get(video_id, {"status": "not started", "percent": 0}) |
|
return jsonify(progress) |
|
|
|
if __name__ == '__main__': |
|
port = int(os.getenv('PORT', 5000)) |
|
app.run(host='0.0.0.0', port=port, debug=True) |