Spaces:
Sleeping
Sleeping
import streamlit as st | |
import os | |
import tempfile | |
import gdown | |
import uuid | |
import tomllib | |
from pathlib import Path | |
# import ffmpeg | |
from moviepy.video.io.VideoFileClip import VideoFileClip | |
import cv2 | |
import numpy as np | |
from io import BytesIO | |
from pydub import AudioSegment | |
from pydub.silence import detect_leading_silence | |
import librosa | |
import librosa.display as lbd | |
import matplotlib.pyplot as plt | |
TEMP_DIR = tempfile.mkdtemp() | |
CONFIG_FILE = 'app_config.toml' | |
def load_config(): | |
'''Loads configuration from app_config.toml''' | |
try: | |
with open(CONFIG_FILE, 'rb') as f: | |
return tomllib.load(f) | |
except FileNotFoundError: | |
print(f'Error: {CONFIG_FILE} not found. Using default settings.') | |
# Provide default fallback config if needed | |
return { | |
"paths": {"output_dir": "output", "temp_dir": "temp_processing"}, | |
"models": {"whisper_model": "base.en", "ocr_languages": ["en"], "summarization_model": "google/pegasus-xsum"}, | |
"settings": {"frame_extraction_interval_seconds": 10, "max_summary_length": 500, "min_summary_length": 100} | |
} | |
except Exception as e: | |
print(f'Error loading config: {e}') | |
raise # Re-raise after printing | |
CONFIG = load_config() | |
def ensure_dir(directory_path): | |
"""Creates a directory if it doesn't exist.""" | |
Path(directory_path).mkdir(parents=True, exist_ok=True) | |
def get_secret_api(): | |
with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
gdown.download(id=CONFIG['links']['secret_api_id'], output=tmp.name, quiet=True, fuzzy=True, use_cookies=True) | |
tmp.seek(0) | |
secret_api = tmp.read().decode('utf-8') | |
tmp_path = tmp.name | |
tmp.close() | |
os.remove(tmp_path) | |
return secret_api | |
def get_secret_prompt(): | |
with tempfile.NamedTemporaryFile(delete=False) as tmp: | |
gdown.download(id=CONFIG['links']['secret_prompt_id'], output=tmp.name, quiet=True) | |
tmp.seek(0) | |
secret_prompt = tmp.read().decode('utf-8') | |
tmp_path = tmp.name | |
tmp.close() | |
os.remove(tmp_path) | |
return secret_prompt | |
def save_uploaded_file(uploaded_file): | |
"""Saves an uploaded file to a temporary directory.""" | |
if uploaded_file is not None: | |
# Generate a unique sub-directory for this upload | |
session_id = get_session_id() # simple way to group files per session/upload | |
upload_dir = os.path.join(TEMP_DIR, session_id) | |
os.makedirs(upload_dir, exist_ok=True) | |
file_path = os.path.join(upload_dir, uploaded_file.name) | |
with open(file_path, 'wb') as f: | |
f.write(uploaded_file.getbuffer()) | |
print(f'File saved to: {file_path}') # debugging | |
return file_path | |
return None | |
def get_session_id(): | |
"""Generates or retrieves a unique session ID.""" | |
if 'session_id' not in st.session_state: | |
st.session_state['session_id'] = str(uuid.uuid4())[:8] | |
return st.session_state['session_id'] | |
def get_session_dir(): | |
"""Gets the temporary directory path for the current session.""" | |
session_id = get_session_id() | |
return os.path.join(TEMP_DIR, session_id) | |
def get_temp_dir(): | |
"""Creates and returns the path to a temporary directory for processing.""" | |
temp_dir = Path(CONFIG['paths']['temp_dir']) | |
ensure_dir(temp_dir) | |
# Consider using unique subdirs per run if needed | |
# processing_subdir = tempfile.mkdtemp(dir=temp_dir) | |
# return processing_subdir | |
return str(temp_dir) # Return as string for wider compatibility | |
def extract_audio(video_path, audio_format='wav'): | |
"""Extracts audio from video using moviepy.""" | |
try: | |
session_dir = os.path.dirname(video_path) # assumes video is in session dir | |
base_name = os.path.splitext(os.path.basename(video_path))[0] | |
audio_filename = f"{base_name}_audio.{audio_format}" | |
audio_path = os.path.join(session_dir, audio_filename) | |
if os.path.exists(audio_path): | |
print(f"Audio file already exists: {audio_path}") | |
return audio_path | |
print(f"Extracting audio from {video_path} to {audio_path}...") | |
video_clip = VideoFileClip(video_path) | |
audio_clip = video_clip.audio | |
if audio_clip is None: | |
print("No audio track found in the video.") | |
video_clip.close() | |
return None | |
audio_clip.write_audiofile(audio_path, codec='pcm_s16le' if audio_format == 'wav' else 'mp3') # WAV is often better for STT | |
audio_clip.close() | |
video_clip.close() | |
print("Audio extraction complete.") | |
return audio_path | |
except Exception as e: | |
print(f"Error extracting audio: {e}") | |
# Clean up potentially corrupted file | |
if 'audio_clip' in locals() and audio_clip: | |
audio_clip.close() | |
if 'video_clip' in locals() and video_clip: | |
video_clip.close() | |
# Attempt to remove partial file if creation failed mid-way | |
if os.path.exists(audio_path): | |
try: | |
os.remove(audio_path) | |
except OSError as rm_e: | |
print(f"Could not remove partial audio file {audio_path}: {rm_e}") | |
return None | |
from scenedetect import open_video, SceneManager | |
from scenedetect.detectors import ContentDetector | |
def extract_frames_pyscenedetect(video_path, output_dir, threshold=2.0): | |
# session_dir = os.path.dirname(video_path) | |
# frames_dir = os.path.join(session_dir, 'frames_pyscenedetect') | |
# os.makedirs(frames_dir, exist_ok=True) | |
os.makedirs(output_dir, exist_ok=True) # ensure the output dir exists | |
# Init video- and scene- managers | |
# video_manager = VideoManager([video_path]) | |
video = open_video(video_path) | |
scene_manager = SceneManager() | |
scene_manager.add_detector(ContentDetector(threshold=threshold)) | |
# Start analysis | |
# video_manager.set_downscale_factor() | |
# video_manager.start() | |
# scene_manager.detect_scenes(frame_source=video_manager) | |
scene_manager.detect_scenes(video) | |
print(scene_manager.get_scene_list()) | |
# Get the scene list | |
scene_list = scene_manager.get_scene_list() | |
print(f'Обнаружено {len(scene_list)} смен сцен.') | |
# Save the scenes switch frames | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
print(f'Error: Could not open video file {video_path}') | |
return None | |
extracted_frame_paths = [] | |
for i, (start_time, _) in enumerate(scene_list): | |
frame_num = start_time.get_frames() | |
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num) | |
success, frame = cap.read() | |
if success: | |
timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC) | |
# frame_filename = f'scene_{i + 1:03d}.jpg' | |
# frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.png' # naming by seconds | |
frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.jpg' # naming by seconds | |
frame_path = os.path.join(output_dir, frame_filename) | |
cv2.imwrite(frame_path, frame) | |
print(f'[*] Сохранён кадр {frame_num} в {frame_path}') | |
extracted_frame_paths.append(frame_path) | |
else: | |
print(f'[!] Ошибка при чтении кадра {frame_num}') | |
cap.release() | |
return output_dir, extracted_frame_paths | |
print(f'Extracted {len(extracted_frame_paths)} frames to {output_dir}.') | |
return output_dir, extracted_frame_paths | |
def extract_frames_interval(video_path, output_dir, interval_sec=5): | |
'''Extracts frames from video at specified intervals using OpenCV.''' | |
try: | |
# session_dir = os.path.dirname(video_path) | |
# frames_dir = os.path.join(session_dir, 'frames_interval') | |
# os.makedirs(frames_dir, exist_ok=True) | |
os.makedirs(output_dir, exist_ok=True) # ensure the output dir exists | |
print(f'Extracting frames from {video_path} every {interval_sec}s..') | |
cap = cv2.VideoCapture(video_path) | |
if not cap.isOpened(): | |
print(f'Error: Could not open video file {video_path}') | |
return None | |
fps = cap.get(cv2.CAP_PROP_FPS) | |
if fps == 0: | |
print('Warning: Could not get FPS, defaulting to 30.') | |
fps = 30 # provide a default if FPS is not available | |
frame_interval = int(fps * interval_sec) | |
frame_count = 0 | |
extracted_frame_paths = [] | |
def extract_frame(): | |
timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC) | |
frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.png' # naming by seconds | |
frame_path = os.path.join(output_dir, frame_filename) | |
cv2.imwrite(frame_path, frame) | |
extracted_frame_paths.append(frame_path) | |
success = True | |
while success: | |
if frame_count % frame_interval == 0: | |
success, frame = cap.read() | |
if success: | |
extract_frame() | |
else: | |
# Skip frames efficiently without decoding | |
for _ in range(frame_interval - 1): | |
success = cap.grab() | |
if not success: | |
break | |
frame_count += 1 | |
# Now read the desired frame if grab was successful | |
if success: | |
success, frame = cap.retrieve() | |
if success: | |
extract_frame() | |
else: | |
# Handle case where retrieve fails after grab | |
print(f'Warning: Failed to retrieve frame after grab at frame count {frame_count}') | |
frame_count += 1 | |
cap.release() | |
print(f'Extracted {len(extracted_frame_paths)} frames to {output_dir}.') | |
return output_dir, extracted_frame_paths | |
except Exception as e: | |
print(f'Error extracting frames: {e}') | |
if 'cap' in locals() and cap.isOpened(): | |
cap.release() | |
return None, [] | |
# --- Add other potential helpers: yt-dlp download, file cleanup etc. --- | |
def download_youtube(url, output_dir): | |
"""Downloads YouTube video using yt-dlp.""" | |
import yt_dlp | |
ydl_opts = { | |
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best', | |
'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'), | |
'noplaylist': True, # download only single video if URL is part of playlist | |
'progress_hooks': [lambda d: print(d['status'])] # basic progress | |
} | |
try: | |
print(f'Attempting to download YouTube video: {url}') | |
with yt_dlp.YoutubeDL(ydl_opts) as ydl: | |
info = ydl.extract_info(url, download=True) | |
# Try to get the downloaded filename | |
filename = ydl.prepare_filename(info) | |
print(f"YouTube video downloaded to: {filename}") | |
return filename | |
except Exception as e: | |
print(f"Error downloading YouTube video: {e}") | |
return None | |
def cleanup_session_files(session_id): | |
"""Removes the temporary directory for a given session.""" | |
session_dir = os.path.join(TEMP_DIR, session_id) | |
if os.path.exists(session_dir): | |
import shutil | |
try: | |
shutil.rmtree(session_dir) | |
print(f"Cleaned up temporary files for session: {session_id}") | |
except Exception as e: | |
print(f"Error cleaning up session files {session_dir}: {e}") | |
### | |
###=== Audio Loading and Processing | |
### | |
SAMPLE_RATE = 22050 | |
DURATION = 5 | |
n_mfcc = 13 # number of MFCCs to extract from each sample | |
n_mels = 128 | |
n_fft = 2048 | |
hop_length = 512 | |
delta_width = 9 # MFCC Delta parameter | |
def trim_silence(sound, s_thresh=-28.0): | |
'''Trims silent chunks from beginning and end of the sound''' | |
duration = len(sound) | |
start_trim = detect_leading_silence(sound, s_thresh) | |
end_trim = detect_leading_silence(sound.reverse(), s_thresh) | |
start = start_trim if start_trim != duration else None | |
end = duration - end_trim if end_trim != duration else None | |
return sound[start:end] | |
def normalize_volume(sound, target_dBFS=-20.0): | |
'''Normalizes sound and shifts to specified loudness''' | |
sound = sound.normalize() | |
difference = target_dBFS - sound.dBFS | |
return sound.apply_gain(difference) | |
def proc_raw_audio(audio_data, from_start=0, duration=None, before_end=0): | |
'''Processes raw audio data and return wav and numpy arrays''' | |
# Instanciate pydub AudioSegment object from raw audio | |
audioObj = AudioSegment.from_file(BytesIO(audio_data)) | |
# Convert to mono mode with the desired sample rate | |
audioObj = audioObj.set_frame_rate(SAMPLE_RATE).set_channels(1) | |
# Normalize audio volume | |
audioObj = normalize_volume(audioObj) | |
# Trim by removing silence from beginning and end of the sound | |
audioObj = trim_silence(audioObj) | |
# Cut to the desired duration | |
start = from_start * 1000 | |
if duration: | |
end = start + duration * 1000 | |
else: | |
end = len(audioObj) - before_end * 1000 | |
audioObj = audioObj[start:end] | |
# Convert AudioSegment to wav format instance | |
buf = BytesIO() | |
audioObj.export(buf, format='wav') | |
audio_wav = buf.getvalue() | |
# Convert the AudioSegment to signal in form of numpy.array | |
arr = audioObj.get_array_of_samples() | |
audio_np = np.array(arr, dtype='float') | |
# Normalize if specified | |
# if normalized: | |
# audio_np = np.array(arr) / np.iinfo(arr.typecode).max | |
# y /= np.linalg.norm(y) | |
# return y, sample_rate | |
return audio_wav, audio_np | |
###============================================== | |
def obtain_features(y, sr=22050, duration=5, delta_width=9): | |
'''Extracts sound features from given signal and returns them as a numpy array''' | |
# --- MFCC (returns M: np.ndarray [shape=(n_mfcc, t)]) | |
mfcc = librosa.feature.mfcc(y, sr, | |
n_mfcc=n_mfcc, n_mels=n_mels, | |
n_fft=n_fft, hop_length=hop_length) | |
return mfcc | |
def create_features_array(mfcc):#, mfcc_delta1, mfcc_delta2, spectr_c, spectr_r): | |
'''Creates wholistic numpy array of means and variances out of given features''' | |
make_meanvar = lambda mean, var: [item for mv in zip(mean, var) for item in mv] | |
mean_var_ops = [ | |
(mfcc.mean(axis=1), mfcc.var(axis=1)) | |
] | |
mfcc_meanvars = sum([make_meanvar(mean, var) | |
for mean, var in mean_var_ops], []) | |
# features_array = mfcc_meanvars + spectr_meanvars | |
features_array = [mfcc_meanvars] | |
return features_array | |
# def get_features(y, sr=22050, duration=5, delta_width=9): | |
# '''Returns numpy array of sound features obtained from signal''' | |
# return create_features_array(*obtain_features(y, sr, duration, delta_width)) | |
def get_features(y, duration=5, sr=SAMPLE_RATE): | |
'''Returns numpy array of sound features obtained from signal''' | |
fig, axes = plt.subplots(1, 2, figsize=(24, 2)) | |
# WAVE PLOT | |
axes[0].set_title(f'Wave Plot for audio sample at {sr} hz') | |
axes[0].set_facecolor('#B4E8CF') | |
lbd.waveshow(y, sr=sr, color='#4300FF', ax=axes[0]) | |
# MELSPEC | |
melspec = librosa.feature.melspectrogram(y=y, sr=sr) | |
melspec = librosa.power_to_db(np.abs(melspec), ref=np.max) | |
axes[1].set_title(f'Mel Spectogram | shape: {melspec.shape}') | |
lbd.specshow(melspec, cmap='viridis', y_axis='mel', x_axis='time', ax=axes[1]) | |
st.pyplot(fig) | |
pad_signal = lambda s, v: np.pad( | |
s, | |
[(0, 0), (0, max(0, 216 - s.shape[1]))], | |
constant_values=v | |
) | |
# Prepare melspec for use | |
melspec = pad_signal(melspec, melspec.min()) | |
melspec = melspec.reshape(1, *melspec.shape) | |
# MFCC | |
# mfcc = create_features_array(obtain_features(y, sr, duration, delta_width)) | |
# mfcc = np.array(mfcc).reshape(1, -1) | |
return melspec | |
# return mfcc | |