conspectum / utils.py
macsunmood's picture
update app
f7ce4f6
raw
history blame contribute delete
15.9 kB
import streamlit as st
import os
import tempfile
import gdown
import uuid
import tomllib
from pathlib import Path
# import ffmpeg
from moviepy.video.io.VideoFileClip import VideoFileClip
import cv2
import numpy as np
from io import BytesIO
from pydub import AudioSegment
from pydub.silence import detect_leading_silence
import librosa
import librosa.display as lbd
import matplotlib.pyplot as plt
TEMP_DIR = tempfile.mkdtemp()
CONFIG_FILE = 'app_config.toml'
def load_config():
'''Loads configuration from app_config.toml'''
try:
with open(CONFIG_FILE, 'rb') as f:
return tomllib.load(f)
except FileNotFoundError:
print(f'Error: {CONFIG_FILE} not found. Using default settings.')
# Provide default fallback config if needed
return {
"paths": {"output_dir": "output", "temp_dir": "temp_processing"},
"models": {"whisper_model": "base.en", "ocr_languages": ["en"], "summarization_model": "google/pegasus-xsum"},
"settings": {"frame_extraction_interval_seconds": 10, "max_summary_length": 500, "min_summary_length": 100}
}
except Exception as e:
print(f'Error loading config: {e}')
raise # Re-raise after printing
CONFIG = load_config()
def ensure_dir(directory_path):
"""Creates a directory if it doesn't exist."""
Path(directory_path).mkdir(parents=True, exist_ok=True)
def get_secret_api():
with tempfile.NamedTemporaryFile(delete=False) as tmp:
gdown.download(id=CONFIG['links']['secret_api_id'], output=tmp.name, quiet=True, fuzzy=True, use_cookies=True)
tmp.seek(0)
secret_api = tmp.read().decode('utf-8')
tmp_path = tmp.name
tmp.close()
os.remove(tmp_path)
return secret_api
def get_secret_prompt():
with tempfile.NamedTemporaryFile(delete=False) as tmp:
gdown.download(id=CONFIG['links']['secret_prompt_id'], output=tmp.name, quiet=True)
tmp.seek(0)
secret_prompt = tmp.read().decode('utf-8')
tmp_path = tmp.name
tmp.close()
os.remove(tmp_path)
return secret_prompt
def save_uploaded_file(uploaded_file):
"""Saves an uploaded file to a temporary directory."""
if uploaded_file is not None:
# Generate a unique sub-directory for this upload
session_id = get_session_id() # simple way to group files per session/upload
upload_dir = os.path.join(TEMP_DIR, session_id)
os.makedirs(upload_dir, exist_ok=True)
file_path = os.path.join(upload_dir, uploaded_file.name)
with open(file_path, 'wb') as f:
f.write(uploaded_file.getbuffer())
print(f'File saved to: {file_path}') # debugging
return file_path
return None
def get_session_id():
"""Generates or retrieves a unique session ID."""
if 'session_id' not in st.session_state:
st.session_state['session_id'] = str(uuid.uuid4())[:8]
return st.session_state['session_id']
def get_session_dir():
"""Gets the temporary directory path for the current session."""
session_id = get_session_id()
return os.path.join(TEMP_DIR, session_id)
def get_temp_dir():
"""Creates and returns the path to a temporary directory for processing."""
temp_dir = Path(CONFIG['paths']['temp_dir'])
ensure_dir(temp_dir)
# Consider using unique subdirs per run if needed
# processing_subdir = tempfile.mkdtemp(dir=temp_dir)
# return processing_subdir
return str(temp_dir) # Return as string for wider compatibility
def extract_audio(video_path, audio_format='wav'):
"""Extracts audio from video using moviepy."""
try:
session_dir = os.path.dirname(video_path) # assumes video is in session dir
base_name = os.path.splitext(os.path.basename(video_path))[0]
audio_filename = f"{base_name}_audio.{audio_format}"
audio_path = os.path.join(session_dir, audio_filename)
if os.path.exists(audio_path):
print(f"Audio file already exists: {audio_path}")
return audio_path
print(f"Extracting audio from {video_path} to {audio_path}...")
video_clip = VideoFileClip(video_path)
audio_clip = video_clip.audio
if audio_clip is None:
print("No audio track found in the video.")
video_clip.close()
return None
audio_clip.write_audiofile(audio_path, codec='pcm_s16le' if audio_format == 'wav' else 'mp3') # WAV is often better for STT
audio_clip.close()
video_clip.close()
print("Audio extraction complete.")
return audio_path
except Exception as e:
print(f"Error extracting audio: {e}")
# Clean up potentially corrupted file
if 'audio_clip' in locals() and audio_clip:
audio_clip.close()
if 'video_clip' in locals() and video_clip:
video_clip.close()
# Attempt to remove partial file if creation failed mid-way
if os.path.exists(audio_path):
try:
os.remove(audio_path)
except OSError as rm_e:
print(f"Could not remove partial audio file {audio_path}: {rm_e}")
return None
from scenedetect import open_video, SceneManager
from scenedetect.detectors import ContentDetector
def extract_frames_pyscenedetect(video_path, output_dir, threshold=2.0):
# session_dir = os.path.dirname(video_path)
# frames_dir = os.path.join(session_dir, 'frames_pyscenedetect')
# os.makedirs(frames_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True) # ensure the output dir exists
# Init video- and scene- managers
# video_manager = VideoManager([video_path])
video = open_video(video_path)
scene_manager = SceneManager()
scene_manager.add_detector(ContentDetector(threshold=threshold))
# Start analysis
# video_manager.set_downscale_factor()
# video_manager.start()
# scene_manager.detect_scenes(frame_source=video_manager)
scene_manager.detect_scenes(video)
print(scene_manager.get_scene_list())
# Get the scene list
scene_list = scene_manager.get_scene_list()
print(f'Обнаружено {len(scene_list)} смен сцен.')
# Save the scenes switch frames
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f'Error: Could not open video file {video_path}')
return None
extracted_frame_paths = []
for i, (start_time, _) in enumerate(scene_list):
frame_num = start_time.get_frames()
cap.set(cv2.CAP_PROP_POS_FRAMES, frame_num)
success, frame = cap.read()
if success:
timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
# frame_filename = f'scene_{i + 1:03d}.jpg'
# frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.png' # naming by seconds
frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.jpg' # naming by seconds
frame_path = os.path.join(output_dir, frame_filename)
cv2.imwrite(frame_path, frame)
print(f'[*] Сохранён кадр {frame_num} в {frame_path}')
extracted_frame_paths.append(frame_path)
else:
print(f'[!] Ошибка при чтении кадра {frame_num}')
cap.release()
return output_dir, extracted_frame_paths
print(f'Extracted {len(extracted_frame_paths)} frames to {output_dir}.')
return output_dir, extracted_frame_paths
def extract_frames_interval(video_path, output_dir, interval_sec=5):
'''Extracts frames from video at specified intervals using OpenCV.'''
try:
# session_dir = os.path.dirname(video_path)
# frames_dir = os.path.join(session_dir, 'frames_interval')
# os.makedirs(frames_dir, exist_ok=True)
os.makedirs(output_dir, exist_ok=True) # ensure the output dir exists
print(f'Extracting frames from {video_path} every {interval_sec}s..')
cap = cv2.VideoCapture(video_path)
if not cap.isOpened():
print(f'Error: Could not open video file {video_path}')
return None
fps = cap.get(cv2.CAP_PROP_FPS)
if fps == 0:
print('Warning: Could not get FPS, defaulting to 30.')
fps = 30 # provide a default if FPS is not available
frame_interval = int(fps * interval_sec)
frame_count = 0
extracted_frame_paths = []
def extract_frame():
timestamp_ms = cap.get(cv2.CAP_PROP_POS_MSEC)
frame_filename = f'frame_{int(timestamp_ms / 1000):06d}.png' # naming by seconds
frame_path = os.path.join(output_dir, frame_filename)
cv2.imwrite(frame_path, frame)
extracted_frame_paths.append(frame_path)
success = True
while success:
if frame_count % frame_interval == 0:
success, frame = cap.read()
if success:
extract_frame()
else:
# Skip frames efficiently without decoding
for _ in range(frame_interval - 1):
success = cap.grab()
if not success:
break
frame_count += 1
# Now read the desired frame if grab was successful
if success:
success, frame = cap.retrieve()
if success:
extract_frame()
else:
# Handle case where retrieve fails after grab
print(f'Warning: Failed to retrieve frame after grab at frame count {frame_count}')
frame_count += 1
cap.release()
print(f'Extracted {len(extracted_frame_paths)} frames to {output_dir}.')
return output_dir, extracted_frame_paths
except Exception as e:
print(f'Error extracting frames: {e}')
if 'cap' in locals() and cap.isOpened():
cap.release()
return None, []
# --- Add other potential helpers: yt-dlp download, file cleanup etc. ---
def download_youtube(url, output_dir):
"""Downloads YouTube video using yt-dlp."""
import yt_dlp
ydl_opts = {
'format': 'bestvideo[ext=mp4]+bestaudio[ext=m4a]/best[ext=mp4]/best',
'outtmpl': os.path.join(output_dir, '%(title)s.%(ext)s'),
'noplaylist': True, # download only single video if URL is part of playlist
'progress_hooks': [lambda d: print(d['status'])] # basic progress
}
try:
print(f'Attempting to download YouTube video: {url}')
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
info = ydl.extract_info(url, download=True)
# Try to get the downloaded filename
filename = ydl.prepare_filename(info)
print(f"YouTube video downloaded to: {filename}")
return filename
except Exception as e:
print(f"Error downloading YouTube video: {e}")
return None
def cleanup_session_files(session_id):
"""Removes the temporary directory for a given session."""
session_dir = os.path.join(TEMP_DIR, session_id)
if os.path.exists(session_dir):
import shutil
try:
shutil.rmtree(session_dir)
print(f"Cleaned up temporary files for session: {session_id}")
except Exception as e:
print(f"Error cleaning up session files {session_dir}: {e}")
###
###=== Audio Loading and Processing
###
SAMPLE_RATE = 22050
DURATION = 5
n_mfcc = 13 # number of MFCCs to extract from each sample
n_mels = 128
n_fft = 2048
hop_length = 512
delta_width = 9 # MFCC Delta parameter
def trim_silence(sound, s_thresh=-28.0):
'''Trims silent chunks from beginning and end of the sound'''
duration = len(sound)
start_trim = detect_leading_silence(sound, s_thresh)
end_trim = detect_leading_silence(sound.reverse(), s_thresh)
start = start_trim if start_trim != duration else None
end = duration - end_trim if end_trim != duration else None
return sound[start:end]
def normalize_volume(sound, target_dBFS=-20.0):
'''Normalizes sound and shifts to specified loudness'''
sound = sound.normalize()
difference = target_dBFS - sound.dBFS
return sound.apply_gain(difference)
def proc_raw_audio(audio_data, from_start=0, duration=None, before_end=0):
'''Processes raw audio data and return wav and numpy arrays'''
# Instanciate pydub AudioSegment object from raw audio
audioObj = AudioSegment.from_file(BytesIO(audio_data))
# Convert to mono mode with the desired sample rate
audioObj = audioObj.set_frame_rate(SAMPLE_RATE).set_channels(1)
# Normalize audio volume
audioObj = normalize_volume(audioObj)
# Trim by removing silence from beginning and end of the sound
audioObj = trim_silence(audioObj)
# Cut to the desired duration
start = from_start * 1000
if duration:
end = start + duration * 1000
else:
end = len(audioObj) - before_end * 1000
audioObj = audioObj[start:end]
# Convert AudioSegment to wav format instance
buf = BytesIO()
audioObj.export(buf, format='wav')
audio_wav = buf.getvalue()
# Convert the AudioSegment to signal in form of numpy.array
arr = audioObj.get_array_of_samples()
audio_np = np.array(arr, dtype='float')
# Normalize if specified
# if normalized:
# audio_np = np.array(arr) / np.iinfo(arr.typecode).max
# y /= np.linalg.norm(y)
# return y, sample_rate
return audio_wav, audio_np
###==============================================
def obtain_features(y, sr=22050, duration=5, delta_width=9):
'''Extracts sound features from given signal and returns them as a numpy array'''
# --- MFCC (returns M: np.ndarray [shape=(n_mfcc, t)])
mfcc = librosa.feature.mfcc(y, sr,
n_mfcc=n_mfcc, n_mels=n_mels,
n_fft=n_fft, hop_length=hop_length)
return mfcc
def create_features_array(mfcc):#, mfcc_delta1, mfcc_delta2, spectr_c, spectr_r):
'''Creates wholistic numpy array of means and variances out of given features'''
make_meanvar = lambda mean, var: [item for mv in zip(mean, var) for item in mv]
mean_var_ops = [
(mfcc.mean(axis=1), mfcc.var(axis=1))
]
mfcc_meanvars = sum([make_meanvar(mean, var)
for mean, var in mean_var_ops], [])
# features_array = mfcc_meanvars + spectr_meanvars
features_array = [mfcc_meanvars]
return features_array
# def get_features(y, sr=22050, duration=5, delta_width=9):
# '''Returns numpy array of sound features obtained from signal'''
# return create_features_array(*obtain_features(y, sr, duration, delta_width))
def get_features(y, duration=5, sr=SAMPLE_RATE):
'''Returns numpy array of sound features obtained from signal'''
fig, axes = plt.subplots(1, 2, figsize=(24, 2))
# WAVE PLOT
axes[0].set_title(f'Wave Plot for audio sample at {sr} hz')
axes[0].set_facecolor('#B4E8CF')
lbd.waveshow(y, sr=sr, color='#4300FF', ax=axes[0])
# MELSPEC
melspec = librosa.feature.melspectrogram(y=y, sr=sr)
melspec = librosa.power_to_db(np.abs(melspec), ref=np.max)
axes[1].set_title(f'Mel Spectogram | shape: {melspec.shape}')
lbd.specshow(melspec, cmap='viridis', y_axis='mel', x_axis='time', ax=axes[1])
st.pyplot(fig)
pad_signal = lambda s, v: np.pad(
s,
[(0, 0), (0, max(0, 216 - s.shape[1]))],
constant_values=v
)
# Prepare melspec for use
melspec = pad_signal(melspec, melspec.min())
melspec = melspec.reshape(1, *melspec.shape)
# MFCC
# mfcc = create_features_array(obtain_features(y, sr, duration, delta_width))
# mfcc = np.array(mfcc).reshape(1, -1)
return melspec
# return mfcc