import gdown import zipfile import os import chromadb from chromadb.utils.embedding_functions import OpenCLIPEmbeddingFunction from chromadb.utils.data_loaders import ImageLoader import cv2 path = "mm_vdb2" client = chromadb.PersistentClient(path=path) image_loader = ImageLoader() CLIP = OpenCLIPEmbeddingFunction() video_collection = client.get_or_create_collection( name='video_collection', embedding_function=CLIP, data_loader=image_loader ) def extract_frames(video_folder, output_folder): if not os.path.exists(output_folder): os.makedirs(output_folder) for video_filename in os.listdir(video_folder): if video_filename.endswith('.mp4'): video_path = os.path.join(video_folder, video_filename) video_capture = cv2.VideoCapture(video_path) fps = video_capture.get(cv2.CAP_PROP_FPS) frame_count = int(video_capture.get(cv2.CAP_PROP_FRAME_COUNT)) duration = frame_count / fps output_subfolder = os.path.join(output_folder, os.path.splitext(video_filename)[0]) if not os.path.exists(output_subfolder): os.makedirs(output_subfolder) success, image = video_capture.read() frame_number = 0 while success: if frame_number == 0 or frame_number % int(fps * 5) == 0 or frame_number == frame_count - 1: frame_time = frame_number / fps output_frame_filename = os.path.join(output_subfolder, f'frame_{int(frame_time)}.jpg') cv2.imwrite(output_frame_filename, image) success, image = video_capture.read() frame_number += 1 video_capture.release() def add_frames_to_chromadb(video_dir, frames_dir): # Dictionary to hold video titles and their corresponding frames video_frames = {} # Process each video and associate its frames for video_file in os.listdir(video_dir): if video_file.endswith('.mp4'): video_title = video_file[:-4] frame_folder = os.path.join(frames_dir, video_title) if os.path.exists(frame_folder): # List all jpg files in the folder video_frames[video_title] = [f for f in os.listdir(frame_folder) if f.endswith('.jpg')] # Prepare ids, uris and metadatas ids = [] uris = [] metadatas = [] for video_title, frames in video_frames.items(): video_path = os.path.join(video_dir, f"{video_title}.mp4") for frame in frames: frame_id = f"{frame[:-4]}_{video_title}" frame_path = os.path.join(frames_dir, video_title, frame) ids.append(frame_id) uris.append(frame_path) metadatas.append({'video_uri': video_path}) video_collection.add(ids=ids, uris=uris, metadatas=metadatas) # Running it def unzip_file(zip_path, extract_to): """ Unzips a zip file to the specified directory and flattens the folder structure. Args: zip_path (str): Path to the zip file. extract_to (str): Directory where the contents should be extracted. """ try: # Ensure the destination directory exists os.makedirs(extract_to, exist_ok=True) with zipfile.ZipFile(zip_path, 'r') as zip_ref: for file in zip_ref.namelist(): # Extract only files (not directories) if not file.endswith('/'): # Flatten by extracting all files to the root of extract_to file_name = os.path.basename(file) if file_name: # Ensure it is not an empty string extracted_path = os.path.join(extract_to, file_name) with zip_ref.open(file) as source, open(extracted_path, 'wb') as target: target.write(source.read()) print(f"Successfully extracted and flattened {zip_path} to {extract_to}") except Exception as e: print(f"An error occurred during extraction: {e}") def initiate_video(): file_id = "1Nzy-ep9Zn15_mLAq8rUi-iKPGOpN8g9Q" output_file = r"StockVideos-CC01.zip" # Download the ZIP file gdown.download(f"https://drive.google.com/uc?id={file_id}", output_file, quiet=False) print(f"File downloaded successfully: {output_file}") # Define paths zip_file_path = output_file flattened_video_folder = r"videos_flattened" frames_output_folder = r"extracted_frames" # Ensure directories exist os.makedirs(flattened_video_folder, exist_ok=True) os.makedirs(frames_output_folder, exist_ok=True) # Unzip and flatten the videos unzip_file(zip_file_path, flattened_video_folder) # Process the videos and extract frames extract_frames(flattened_video_folder, frames_output_folder) # Add frames to ChromaDB add_frames_to_chromadb(flattened_video_folder, frames_output_folder) return video_collection