mafqoud / modules /cloudservice.py
Dev-mohamed's picture
init
677c57e
raw
history blame
No virus
5.13 kB
# from flask import Flask, jsonify
# import cloudinary
# import cloudinary.api
# import cloudinary.uploader
# import os
# import glob
# import requests
# from dotenv import load_dotenv
# from deepface.commons.logger import Logger
# logger = Logger(module="modules/cloudservice.py")
# load_dotenv()
# # Configure Cloudinary
# cloudinary.config(
# cloud_name=os.getenv('CLOUDINARY_CLOUD_NAME'),
# api_key=os.getenv('CLOUDINARY_API_KEY'),
# api_secret=os.getenv('CLOUDINARY_API_SECRET')
# )
# def fetch_cloudinary_images(folder_name):
# resources = []
# res = cloudinary.api.resources(type='upload', resource_type='image', prefix=f'mafqoud/images/{folder_name}')
# resources.extend(res.get('resources', []))
# return resources
# def download_image(url, local_path):
# response = requests.get(url, stream=True)
# if response.status_code == 200:
# with open(local_path, 'wb') as file:
# for chunk in response:
# file.write(chunk)
# def sync_folder(folder_name, local_dir):
# cloudinary_images = fetch_cloudinary_images(folder_name)
# cloudinary_urls = {img['secure_url']: img['public_id'] for img in cloudinary_images}
# # Download new images and track downloaded image paths
# downloaded_paths = []
# for img in cloudinary_images:
# url = img['secure_url']
# public_id = img['public_id']
# file_name = url.split('/')[-1] # Get the actual file name
# local_path = os.path.join(local_dir, file_name)
# print(local_path)
# if not os.path.exists(local_path):
# download_image(url, local_path)
# downloaded_paths.append(local_path)
# # Remove old images
# local_images = [os.path.join(local_dir, f) for f in os.listdir(local_dir) if os.path.isfile(os.path.join(local_dir, f))]
# for local_path in local_images:
# if local_path not in downloaded_paths and (local_path.endswith('.jpg') or local_path.endswith('.jpeg') or local_path.endswith('.png')):
# os.remove(local_path)
# def delete_pkl_files(directory):
# """Delete all .pkl files in the specified directory."""
# pkl_files = glob.glob(os.path.join(directory, '*.pkl'))
# for pkl_file in pkl_files:
# os.remove(pkl_file)
from flask import Flask, jsonify
import cloudinary
import cloudinary.api
import cloudinary.uploader
import os
import glob
import requests
from dotenv import load_dotenv
from deepface.commons.logger import Logger
from deepface import DeepFace # Assuming this is the correct import for training
logger = Logger(module="modules/cloudservice.py")
load_dotenv()
# Configure Cloudinary
cloudinary.config(
cloud_name=os.getenv('CLOUDINARY_CLOUD_NAME'),
api_key=os.getenv('CLOUDINARY_API_KEY'),
api_secret=os.getenv('CLOUDINARY_API_SECRET')
)
def fetch_cloudinary_images(folder_name):
resources = []
next_cursor = None
while True:
if next_cursor:
res = cloudinary.api.resources(
type='upload',
resource_type='image',
prefix=f'mafqoud/images/{folder_name}',
max_results=500,
next_cursor=next_cursor
)
else:
res = cloudinary.api.resources(
type='upload',
resource_type='image',
prefix=f'mafqoud/images/{folder_name}',
max_results=500
)
resources.extend(res.get('resources', []))
next_cursor = res.get('next_cursor')
if not next_cursor:
break
return resources
def download_image(url, local_path):
response = requests.get(url, stream=True)
if response.status_code == 200:
with open(local_path, 'wb') as file:
for chunk in response:
file.write(chunk)
def sync_folder(folder_name, local_dir):
cloudinary_images = fetch_cloudinary_images(folder_name)
cloudinary_urls = {img['secure_url']: img['public_id'] for img in cloudinary_images}
# Download new images and track downloaded image paths
downloaded_paths = []
for img in cloudinary_images:
url = img['secure_url']
public_id = img['public_id']
file_name = url.split('/')[-1] # Get the actual file name
local_path = os.path.join(local_dir, file_name)
print(local_path)
if not os.path.exists(local_path):
download_image(url, local_path)
downloaded_paths.append(local_path)
# Remove old images
local_images = [os.path.join(local_dir, f) for f in os.listdir(local_dir) if os.path.isfile(os.path.join(local_dir, f))]
for local_path in local_images:
if local_path not in downloaded_paths and (local_path.endswith('.jpg') or local_path.endswith('.jpeg') or local_path.endswith('.png')):
os.remove(local_path)
def delete_pkl_files(directory):
"""Delete all .pkl files in the specified directory."""
pkl_files = glob.glob(os.path.join(directory, '*.pkl'))
for pkl_file in pkl_files:
os.remove(pkl_file)