mimosa-ai / utils /utils.py
vivekk3's picture
Upload folder using huggingface_hub
9c4b01e verified
import os
import requests
from dotenv import load_dotenv
import boto3
import supabase
import cv2
R2_ACCESS_KEY = os.getenv('R2_ACCESS_KEY')
R2_SECRET_KEY = os.getenv('R2_SECRET_KEY')
R2_BUCKET_NAME = os.getenv('R2_BUCKET_NAME')
R2_ENDPOINT_URL = os.getenv('R2_ENDPOINT_URL')
def download_video(video_url):
if not os.path.exists('./input'):
os.makedirs('./input')
print(f'Downloading video from {video_url}')
response = requests.get(video_url, stream=True)
if response.status_code == 200:
video_name = video_url.split('/')[-1]
print(video_name)
video_path = f'./input/{video_name}.mp4'
print(video_path)
with open(video_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return video_path
else:
raise Exception(f"Failed to download video: {response.status_code}")
def download_file(url, path):
if not os.path.exists(path):
os.makedirs(path)
print(f'Downloading file from {url} to {path}')
response = requests.get(url, stream=True)
if response.status_code == 200:
file_name = url.split('/')[-1]
file_path = f'./{path}/{file_name}.mp4'
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
return file_path
else:
raise Exception(f"Failed to download file: {response.status_code}")
def upload_file(file_path, bucket_name, object_name, endpoint_url, access_key, secret_key):
s3 = boto3.client('s3', endpoint_url=endpoint_url, aws_access_key_id=access_key, aws_secret_access_key=secret_key)
try:
response =s3.upload_file(file_path, bucket_name, object_name)
print(f'{file_path} uploaded to {bucket_name}/{object_name}')
return response
except Exception as e:
print(f'Error uploading file: {e}')
def detect_faces_frames(video_url):
video_name = video_url.split('/')[-1]
print(video_name)
video_path = download_video(video_url)
frames =[]
face_cascade = cv2.CascadeClassifier('./utils/face_detection.xml')
# Open the video file
cap = cv2.VideoCapture(video_path)
# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
duration = total_frames / fps
frame_count = 0
time_count = 0
while True:
ret, frame = cap.read()
if not ret:
break
# Process frame every 5 seconds
if frame_count % int(fps * 5) == 0:
# Convert frame to grayscale
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# Detect faces
faces = face_cascade.detectMultiScale(gray, scaleFactor=1.1, minNeighbors=5, minSize=(30, 30))
# Draw rectangles around the faces
for (x, y, w, h) in faces:
cv2.rectangle(frame, (x, y), (x+w, y+h), (255, 0, 0), 2)
# Save the frame with detected faces
frame_name = f"./output/{video_name}_{time_count}.jpg"
print(frame_name)
frames.append(frame_name)
cv2.imwrite(f"./output/{video_name}_{time_count}.jpg", frame)
time_count += 1
frame_count += 1
cap.release()
cv2.destroyAllWindows()
print(f"Total video duration: {duration:.2f} seconds")
print(f"Total frames processed: {time_count // 5}")
res = []
for frame in frames:
upload_file(f'{frame}', 'outputs', frame.split('/')[-1] , 'https://c98643a1da5e9aa06b27b8bb7eb9227a.r2.cloudflarestorage.com/warden-ai', R2_ACCESS_KEY, R2_SECRET_KEY)
res.append(f'https://pub-08a118f4cb7c4b208b55e6877b0bacca.r2.dev/outputs/{frame.split("/")[-1]}')
return res