mylesai's picture
Update app.py
38ee680 verified
raw
history blame
38.1 kB
import openai
from openai import OpenAI
import requests
import base64
import os
import ast
import cv2
from PIL import Image, ImageSequence
from tempfile import NamedTemporaryFile
import time
from zipfile import ZipFile
import gradio as gr
from docx import Document
from io import BytesIO
import pyheif
import numpy as np
from tenacity import (
retry,
stop_after_attempt,
wait_random_exponential,
)
import bf_trigger
# import change_ui as change
import offline_update
# for exponential backoff
# FUNCTIONS
brandfolder_api = os.environ['BRANDFOLDER_API_KEY']
def get_asset_info(asset_id):
'''
Takes information from asset_id
Input: asset_id
Output: collection_id, collection_name, section_id
'''
# asset_id = data['data']['attributes']['key']
headers = {
'Content-Type': 'application/json',
'Authorization': brandfolder_api
}
r = requests.get(f'https://brandfolder.com/api/v4/assets/{asset_id}?include=section,collections,custom_fields,attachments', params={}, headers=headers)
# gets section_id
try:
section_id = r.json()['data']['relationships']['section']['data']['id']
except:
section_id = ''
# gets collection_id
# gets collection_name
try:
collection_id = r.json()['data']['relationships']['collections']['data'][0]['id']
collection_name = [item['attributes']['name'] for item in r.json()['included'] if item['type']=='collections'][0]
except:
collection_id = ''
collection_name = ''
# gets asset_name, asset_type, and asset_url
try:
asset_type = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['value']=='Photo'][0]
except:
asset_type = ''
try:
asset_name = r.json()['data']['attributes']['name']
except:
asset_name = ''
try:
access_key = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'What is your Access Code?'][0]
except:
access_key = ''
try:
asset_url = [item['attributes']['url'] for item in r.json()['included'] if item['type'] == 'attachments'][0]
except:
asset_url = ''
try:
client_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'Client Name'][0]
except:
client_name = ''
try:
project_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'List Project Name Photos Belong To'][0]
except:
project_name = ''
return_dict = {
"section_id": section_id,
"collection_id": collection_id,
"collection_name": collection_name,
"asset_type": asset_type,
"asset_name": asset_name,
"access_key": access_key,
"image_url": asset_url,
"client_name": client_name,
"project_name": project_name
}
return return_dict
def get_all_collection_dict():
headers = {
'Accept': 'application/json',
'Authorization': brandfolder_api
}
r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=300', params={
# use a dict with your desired URL parameters here
}, headers=headers)
temp = r.json()['data']
collection_dict = {item['attributes']['name']:item['id'] for item in temp}
return collection_dict
def get_collection_names():
collection_dict = get_collection_dict()
return list(collection_dict.keys())
def rename(filename):
client = OpenAI()
completion = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "You are a helpful assistant specializing in renaming files."},
{"role": "user", "content": f"Provide a similar name for this filename: {filename}. Only return the filename and use hyphens in the filename."}
]
)
return completion.choices[0].message.content
def get_topical_map(path):
document = Document(path)
extracted_text = []
for paragraph in document.paragraphs:
# Get the left indentation of the current paragraph (if any)
left_indent = paragraph.paragraph_format.left_indent
if left_indent == None:
continue
else:
indent_level = int(left_indent.pt / 20) # Convert Twips to points and then to a simple indentation level
# You might want to adjust the logic below depending on how you want to represent indentation
indent_symbol = " " * indent_level # This creates a number of spaces based on the indentation level; adjust as needed
# Construct the paragraph text with indentation representation
formatted_text = f"{indent_symbol}{paragraph.text}"
extracted_text.append(formatted_text)
return "\n".join(extracted_text)
# gets a list of images from the google drive folder
def get_imgs_from_folder(image_files, zipfile):
# image file types
IMAGE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd']
# file types
FILE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'zip', 'mp4', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd']
# gets all the image paths from the zipfile
zip = ZipFile(zipfile)
zip_list = zip.namelist()
image_files.extend([f for f in zip_list if f.split('.')[-1].lower() in IMAGE_TYPES and f[0] != '_'])
return image_files
def get_seo_tags(image_path, topical_map, new_imgs, attempts=0, max_attempts=6):
'''
Gets the seo tags and topic/sub-topic classification for an image using OpenAI GPT-4 Vision Preview
Input: image path of desired file
Output: dict of topic, sub-topic, and seo tags
'''
if attempts > max_attempts:
print("Maximum number of retries exceeded.")
return {"error": "Max retries exceeded, operation failed."}
print('in seo_tags')
filenames = ', '.join(new_imgs)
# Query for GPT-4
topic_map_query = f"""
% You are an expert web designer that can only answer questions relevent to the following Topical Map.
% Goal: Output the topic, description, caption, seo tags, alt_tags, and filename for this image using the Topical Map provided.
% TOPCIAL MAP
```{topical_map}```
"""
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'.
topic_list = topical_map.split('\n')
topic_list = [topic.strip() for topic in topic_list]
topic_list.insert(0, "irrelevant")
def encode_image(image_path):
# Check the file size (in bytes)
file_size = os.path.getsize(image_path)
# Define the maximum file size for compression (20 MB)
max_size = 20 * 1024 * 1024 # 20 MB in bytes
if image_path.lower().endswith('.heic'):
# Read the HEIC file
heif_file = pyheif.read(image_path)
# Convert to a PIL image
image = Image.frombytes(
heif_file.mode,
heif_file.size,
heif_file.data,
"raw",
heif_file.mode,
heif_file.stride,
)
elif image_path.lower().endswith('.gif'):
# Open GIF image
with Image.open(image_path) as img:
# Extract the first frame of the GIF
image = img.convert('RGB')
else:
# Open other image types with PIL directly
image = Image.open(image_path)
# Convert image to RGB if it has an incompatible mode
if image.mode not in ['RGB', 'L']: # L is for grayscale
image = image.convert('RGB')
# Use in-memory buffer for processing
with BytesIO() as img_buffer:
if file_size > max_size:
# Adjust the quality to reduce the file size
image.save(img_buffer, format='JPEG', quality=75)
else:
# Save the image without changing the quality if not needed
image.save(img_buffer, format='JPEG', quality=85)
# Seek to the beginning of the stream
img_buffer.seek(0)
# Read the JPEG image data and encode it in base64
return base64.b64encode(img_buffer.read()).decode('utf-8')
print(image_path)
base64_image = encode_image(image_path)
# REMOVE WHEN SHARING FILE
api_key = os.environ['OPENAI_API_KEY']
# Calling gpt-4 vision
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'.
payload = {
"model": "gpt-4o",
"response_format": {"type": "json_object"},
"messages": [
{'role': 'system', 'content': 'You are an expert web designer that can only answer questions relevent to the following topical map.'
},
{
"role": "user",
"content": [
{
"type": "text",
"text": topic_map_query +
"""
% INSTRUCTIONS
Step 1 - Generate keywords to describe this image
Step 2 - Decide which topic in the Topicla Map this image fall under, using the keywords you generated and the image itself. You are only permitted to use the exact wording of the topic in the topical map.
Step 2 - Provide a topic-relevant 5 sentence description for the image. Describe the image only using context relevant to the topics in the topical map.
Adhere to the following guidelines when crafting your 5 sentence description:
- Mention only the contents of the image.
- Do not mention the quality of the image.
- Ignore all personal information within the image.
- Be as specific as possible when identifying tools/items in the image.
Step 3 - Using the description in Step 1, create a 160 character caption. Make sure the caption is less than 160 characters.
Step 4 - Using the description in Step 1, create 3 topic-relevant SEO tags for this image that will drive traffic to our website. The SEO tags must be two words or less. You must give 3 SEO tags.
Step 5 - Using the description in Step 1, provide a topic-relevant SEO alt tag for the image that will enhance how the website is ranked on search engines.
Step 6 - Using the description in Step 1, provide a new and unique filename for the image as well. Use hyphens for the filename. Do not include extension.
Step 7 - YOU ARE ONLY PERMITTED TO OUTPUT THE TOPIC, DESCRIPTION, CAPTION, SEO, ALT_TAG, AND FILENAME IN THE FOLLOWING JSON FORMAT:
% OUTPUT FORMAT:
{"topic": topic,
"description": description,
"caption": caption,
"seo": [seo],
"alt_tag": [alt tag],
"filename": filename
}
"""
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64, {base64_image}"
}
}
]
}
],
"max_tokens": 300
}
try:
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
response_data = response.json()
print(response_data)
if response.status_code == 200 and 'choices' in response_data and len(response_data['choices']) > 0:
keys = ['topic', 'description', 'caption', 'seo', 'alt_tag', 'filename']
json_dict = ast.literal_eval(response.json()['choices'][0]['message']['content'])
if json_dict['topic'] not in topic_list:
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1)
if set(json_dict.keys()) != set(keys):
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1)
return json_dict
else:
print("API call failed or bad data, retrying...")
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1)
except Exception as e:
time.sleep(5*attempts)
print("Exception during API call:", str(e))
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1)
def read_image(image_path):
if image_path.lower().endswith('.heic'):
# Read and convert HEIC file
heif_file = pyheif.read(image_path)
image = Image.frombytes(
heif_file.mode,
heif_file.size,
heif_file.data,
"raw",
heif_file.mode,
heif_file.stride,
)
# Convert PIL image to OpenCV format
image = np.array(image)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
elif image_path.lower().endswith('.gif'):
# Open GIF and convert the first frame to RGB
with Image.open(image_path) as img:
for frame in ImageSequence.Iterator(img):
frame = frame.convert('RGB')
# Convert PIL image to OpenCV format
image = np.array(frame)
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
break # Only process the first frame
else:
# Use OpenCV for other formats
image = cv2.imread(image_path)
return image
def process_image(image_path):
image = read_image(image_path)
height, width, c = image.shape
area = width * height
if width > height:
# Landscape image
if area > 667000:
image = cv2.resize(image, (1000, 667))
else:
# Portrait image
if area > 442236:
image = cv2.resize(image, (548, 807))
return image
def convert_heic_to_jpeg(heic_path):
# Read the HEIC file
heif_file = pyheif.read(heic_path)
# Convert to a PIL image
image = Image.frombytes(
heif_file.mode,
heif_file.size,
heif_file.data,
"raw",
heif_file.mode,
heif_file.stride,
)
# Convert image to JPEG in memory
jpeg_buffer = BytesIO()
image.save(jpeg_buffer, format="JPEG")
jpeg_buffer.seek(0)
return jpeg_buffer
def upload_image(image_path, upload_url):
# Check if the image is a HEIC file
print(image_path)
if image_path.lower().endswith('.heic'):
# Convert HEIC to JPEG
data = convert_heic_to_jpeg(image_path)
else:
# Open other image types directly
data = open(image_path, 'rb')
# Upload the image
response = requests.put(upload_url, data=data)
# Ensure you close the file stream if opened directly
if not image_path.lower().endswith('.heic'):
data.close()
return response
# creates the asset in the client's brand folder
def create_asset(client_name, collection_id, image_path, topical_map, new_imgs, tags=True, project_bool=False):
'''
Creates asset from image path. Also creates seo tags, topic, and alt tag for
image
Input: name of client, path to image, create tags boolean
Output: id of asset
'''
# get seo, topic, and sub-topic from OpenAI API
json_dict = get_seo_tags(image_path, topical_map, new_imgs)
if not json_dict:
json_dict = get_seo_tags(image_path, topical_map, new_imgs)
topic = json_dict['topic']
description = json_dict['description']
caption = json_dict['caption']
seo_tags = json_dict['seo']
alt_tag = json_dict['alt_tag']
image_name = json_dict['filename']
counter = 1
while image_name in new_imgs:
image_name = f'{image_name}_{counter}'
counter += 1
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.get(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', params={
# use a dict with your desired URL parameters here
}, headers=headers)
asset_names = [item['attributes']['name'] for item in r.json()['data']]
asset_names = new_imgs + asset_names
while image_name in asset_names:
image_name = rename(image_name)
# binary upload of image_path
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers)
# used to upload the image
upload_url = r.json()['upload_url']
# container for the uploaded image to be used by the post request
og_object_url = r.json()['object_url']
response = upload_image(image_path, upload_url)
# binary upload of image_path
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers)
# used to upload the image
upload_url = r.json()['upload_url']
# container for the uploaded image to be used by the post request
object_url = r.json()['object_url']
image = process_image(image_path)
# image = sharpen_image(image)
with NamedTemporaryFile(delete=True, suffix='.jpg') as temp_image:
# fp = TemporaryFile()
cv2.imwrite(temp_image.name, image, [int(cv2.IMWRITE_JPEG_QUALITY), 70])
# fp.seek(0)
response = requests.put(upload_url, data=temp_image)
# fp.close()
# posts image with image name
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={
# use a dict with the POST body here
'data': {
'attributes': [
{
'name': image_name,
'description': description,
'attachments': [
{
'url': object_url,
'filename': f'{image_name}.jpg'
},
{
'url': og_object_url,
'filename': f'{image_name}-original.jpg'
}
]
}
]
},
# AI Processed section key
'section_key': 'czpq4nwz78c3cwnp6h9n44z'
}, params={}, headers=headers)
# id of newly created asset
asset_id = r.json()['data'][0]['id']
# tags and topic payloads
tags_payload = {'data': {'attributes': [{'name': tag} for tag in seo_tags]}}
topic_payload = {'data':
[
{
'attributes': {
'value': topic
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
alt_tag_payload = {'data':
[
{
'attributes': {
'value': alt_tag
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
year_payload = {'data':
[
{
'attributes': {
'value': 2024
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
client_payload = {'data':
[
{
'attributes': {
'value': client_name
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
caption_payload = {'data':
[
{
'attributes': {
'value': caption
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
year_id = 'k8vr5chnkw3nrnrpkh4f9fqm'
client_name_id = 'x56t6r9vh9xjmg5whtkmp'
# Tone ID: px4jkk2nqrf9h6gp7wwxnhvz
# Location ID: nm6xqgcf5j7sw8w994c6sc8h
alt_tag_id = 'vk54n6pwnxm27gwrvrzfb'
topic_id = '9mcg3rgm5mf72jqrtw2gqm7t'
project_name_id = '5zpqwt2r348sjbnc6rpxc96'
caption_id = 'cmcbhcc5nmm72v57vrxppw2x'
# Original Project Images Section ID: c5vm8cnh9jvkjbh7r43qxkv
# Edited Project Images Section ID: 5wpz2s9m3g7ctcjpm4vrt46
r_asset = requests.post(f'https://brandfolder.com/api/v4/assets/{asset_id}/tags', json=tags_payload, params={}, headers=headers)
# alt_tags
r_topic = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{topic_id}/custom_field_values', json=
topic_payload
, params={
}, headers=headers)
r_alt_tag = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{alt_tag_id}/custom_field_values', json=
alt_tag_payload
, params={
}, headers=headers)
r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json=
year_payload
, params={
}, headers=headers)
r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json=
client_payload
, params={
}, headers=headers)
r_caption = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{caption_id}/custom_field_values', json=
caption_payload
, params={
}, headers=headers)
if project_bool == 'Yes':
project_name = str(image_path).split('/')[-2]
project_payload = {'data':
[
{
'attributes': {
'value': project_name
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json=
project_payload
, params={
}, headers=headers)
return image_name
def create_asset_no_ai(client_name, collection_id, image_path, project_bool=False):
'''
Creates an asset without going through the AI process
'''
image_name = str(image_path).split('/')[-1].split('.')[0]
headers = {
'Accept': 'application/json',
'Authorization': 'eyJhbGciOiJIUzI1NiJ9.eyJvcmdhbml6YXRpb25fa2V5IjoiZmY0cmt0NDNoMzRtMjVoa2duNWJteDlmIiwiaWF0IjoxNzA1OTQ4NjI3LCJ1c2VyX2tleSI6IjhyNnhxeDR6bTdyN2Z4NnJqY25jM2IzIiwic3VwZXJ1c2VyIjpmYWxzZX0.xUPT9j08a0THBwW_0GkQjllJxmjeDGtcPeoIOu_w9Zs'
}
# binary upload of image_path
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers)
# used to upload the image
upload_url = r.json()['upload_url']
# container for the uploaded image to be used by the post request
object_url = r.json()['object_url']
# uploads the image
response = upload_image(image_path, upload_url)
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={
# use a dict with the POST body here
'data': {
'attributes': [
{
'name': image_name,
'attachments': [
{
'url': object_url,
'filename': f'{image_name}.jpg'
}
]
}
]
},
# Original Project Assets
'section_key': 'c5vm8cnh9jvkjbh7r43qxkv'
}, params={}, headers=headers)
# id of newly created asset
asset_id = r.json()['data'][0]['id']
year_payload = {'data':
[
{
'attributes': {
'value': 2024
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
client_payload = {'data':
[
{
'attributes': {
'value': client_name
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
year_id = 'k8vr5chnkw3nrnrpkh4f9fqm'
client_name_id = 'x56t6r9vh9xjmg5whtkmp'
r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json=
year_payload
, params={
}, headers=headers)
r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json=
client_payload
, params={
}, headers=headers)
if project_bool.lower() == 'yes':
project_name_id = '5zpqwt2r348sjbnc6rpxc96'
project_name = str(image_path).split('/')[-2]
project_payload = {'data':
[
{
'attributes': {
'value': project_name
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json=
project_payload
, params={
}, headers=headers)
return
def create_collection(collection_name):
'''
Creates collection with collection_name and tagline
Input: collection name and tagline
Output: request response
'''
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={
# use a dict with the POST body here
'data': {
'attributes': {
'name': collection_name
}
}
}, params={}, headers=headers)
collection_id = r.json()['data']['id']
return collection_id
def get_collection_id(collection_name):
'''
Creates collection with collection_name and tagline
Input: collection name and tagline
Output: request response
'''
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={
# use a dict with the POST body here
'data': {
'attributes': {
'name': collection_name
}
}
}, params={}, headers=headers)
collection_id = r.json()['data']['id']
return collection_id
# get ids of existing collections
def get_collection_dict():
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=200', params={
# use a dict with your desired URL parameters here
}, headers=headers)
temp = r.json()['data']
collection_dict = dict(sorted({item['attributes']['name']:item['id'] for item in temp}.items()))
return collection_dict
def import_client_data(client_name, zipfile, topical_map, password, project_bool, ai_bool, progress=gr.Progress(), create=False):
'''
Takes the client neame and the client zipfile path to import all image files in the google drive into brandfolder under a collection
with the client's name
Input: client name (str), client_drive_path (str)
Output: Completed Brandfolder
'''
print(zipfile)
if client_name == None:
raise gr.Error("Please choose a client")
if password != os.environ['BRANDFOLDER_PASSWORD']:
raise gr.Error("Incorrect Password")
if zipfile == None:
raise gr.Error("Please upload a zipfile")
if zipfile.split('.')[-1] != 'zip':
raise gr.Error("Client Photos must be in a zipfile")
if ai_bool.lower() == 'on':
if topical_map == None:
raise gr.Error("Please upload a topical map")
if topical_map.split('.')[-1] != 'docx':
raise gr.Error("Topical Map must be a docx file")
topical_map = get_topical_map(topical_map)
# get all collection ID names
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.get('https://brandfolder.com/api/v4/collections?per=200', params={
# use a dict with your desired URL parameters here
}, headers=headers)
collection_dict = {entry['attributes']['name']:entry['id'] for entry in r.json()['data']}
if client_name not in list(collection_dict.keys()):
if create==True:
# creates the collection and gets the collection id
collection_id = create_collection(client_name)
else:
AssertionError(f'Client Name: {client_name} does not exist in this Brandfolder')
else:
collection_id = collection_dict[client_name]
# gets all image files from the google drive folder
img_lists = []
img_dict = {}
for zip in zipfile:
zip_name = ZipFile(zip.name)
unpack_list = get_imgs_from_folder([], zip)
for img in unpack_list:
img_dict.update({img:zip_name})
img_lists.append(unpack_list)
img_list = sum(img_lists, [])
new_imgs = []
error_imgs = []
error_imgs_text = 'No errors detected.'
# iterates all images and puts them into brandfolder with AI elements
for img in progress.tqdm(img_list, desc="Uploading..."):
zip = img_dict[img]
img = zip.extract(img)
print(client_name)
try:
if ai_bool.lower() == 'on':
time.sleep(15)
new_img = create_asset(client_name, collection_id, img, topical_map, new_imgs, project_bool=project_bool)
new_imgs.append(new_img)
elif ai_bool.lower() == 'off':
create_asset_no_ai(client_name, collection_id, img, project_bool=project_bool)
except Exception as e:
error_imgs.append(f'{str(img)}; error: {e}\n')
print(f'An unexpected error occured processing {img}: {e}')
gr.Info('Images have been uploaded!')
if error_imgs:
error_imgs_text = '\n'.join(error_imgs)
return "Images Uploaded", error_imgs_text
def get_collection_names():
collection_dict = get_collection_dict()
return list(collection_dict.keys())
def upload_file(files):
file_paths = [file.name for file in files]
return file_paths
collection_names = get_collection_names()
with gr.Blocks() as block:
gr.Markdown("""
# Brandfolder Zipfile Dashboard
This dashboard is for uploading photos from a zipfile to a brandfolder collection.
""")
with gr.Column(visible=True, elem_id='login') as login:
password = gr.Textbox(label='Enter Password')
# with gr.Row():
# login_to_zipfile_btn = gr.Button("Login")
with gr.Column(visible=True, elem_id='zipfile') as zipfile:
with gr.Row():
with gr.Column():
options = get_collection_names()
selection = gr.Dropdown(options, label='Choose Existing Collection', info='If creating a new section, select Create a Collection')
gr.Markdown('## Upload zipfile containing client photos below')
zipfile = gr.File(label='Client Photos (must be zipfile)', file_count='multiple', file_types=['.zip'], interactive=False)
upload_btn = gr.UploadButton("Upload Zipfile(s)", file_count='multiple')
ai_bool = gr.Radio(choices=['On', 'Off'], label='AI Algorithm?', info = 'Would you like to use the AI Algorithm to upload these images?')
project_bool = gr.Radio(choices=['Yes', 'No'], label='Project Names?', info='Would you like to include project names for these images?')
gr.Markdown('## Upload topical map document for the client below')
topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx'])
algorithm = gr.Button('Run Algorithm')
upload = gr.Label(label='Uploader')
err_imgs = gr.Textbox(label="Images Not Processed")
stop = gr.Button("Stop Run")
# with gr.Row():
# zipfile_to_login_btn = gr.Button("Back to Login")
# zipfile_to_trigger_btn = gr.Button("Brandfolder Trigger")
# zipfile_to_offline_btn = gr.Button("Offline Image Update")
with gr.Column(visible=True, elem_id='trigger') as trigger:
gr.Markdown('''
# Run AI in Brandfolder
This button runs the AI algorithm using all the images stored in the Pre-Processed Images section in Brandfolder.
The algorithm will move the new processed images to the AI Processed Images.
ALL COPIES OF THE IMAGES IN THE PRE-PROCESSED SECTION WILL BE DELETED AFTER PUSHING THIS BUTTON
''')
bf_options = get_collection_names()
bf_selection = gr.Dropdown(bf_options, label='Choose Existing Collection')
section = gr.Radio(choices=['Pre-Processed Images', 'Original Project Assets'], label='Which Sections is the data in?')
bf_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx'])
bf_button = gr.Button('Run AI algorithm for Pre-Processed Images')
bf_upload = gr.Label(label='Uploader')
stop_bf = gr.Button('Stop Run')
# with gr.Row():
# trigger_to_zipfile_btn = gr.Button("Zipfile Upload")
# trigger_to_offline_btn = gr.Button("Offline Image Update")
with gr.Column(visible=False, elem_id='offline') as offline:
gr.Markdown('''
# AI Processed Images Algorithm
Runs the AI algorithm over the images in the AI Processed Images Section.
Use this only when the Brandfolder API is not uploading images properly.
The Images will not be reduced but the tags, descriptions, etc. for the images will be populated.
''')
offline_options = get_collection_names()
offline_selection = gr.Dropdown(offline_options, label='Choose Existing Collection')
offline_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx'])
offline_button = gr.Button('Run AI algorithm for AI Processed Images Section')
offline_upload = gr.Label(label='Uploader')
stop_offline = gr.Button("Stop Run")
# with gr.Row():
# offline_to_zipfile_btn = gr.Button("Zipfile Upload")
# offline_to_trigger_btn = gr.Button("Brandfolder Trigger")
# selection.select(fn=get_collection_names, outputs=[selection])
algo_event = algorithm.click(fn=import_client_data, inputs=[selection, zipfile, topical_map, password, project_bool, ai_bool], outputs=[upload, err_imgs])
bf_event = bf_button.click(fn=bf_trigger.run_preprocess_ai, inputs=[bf_topical_map, bf_selection, section], outputs=[bf_upload])
offline_event = offline_button.click(fn=offline_update.run_preprocess_ai, inputs=[offline_topical_map, offline_selection], outputs=[offline_upload])
stop.click(fn=None, inputs=None, outputs=None, cancels=[algo_event])
stop_bf.click(fn=None, inputs=None, outputs=None, cancels=[bf_event])
upload_btn.upload(upload_file, upload_btn, zipfile)
# stop_offline.click(fn=None, inputs=None, outputs=None, cancels=[offline_event])
# login_to_zipfile_btn.click(
# fn=None,
# inputs=None,
# outputs=[login, zipfile],
# js=change.login_to_zipfile_js
# )
# zipfile_to_login_btn.click(
# fn=None,
# inputs=None,
# outputs=[login, zipfile],
# js=change.zipfile_to_login_js
# )
# zipfile_to_trigger_btn.click(
# fn=None,
# inputs=None,
# outputs=[zipfile, trigger],
# js=change.zipfile_to_trigger_js
# )
# trigger_to_zipfile_btn.click(
# fn=None,
# inputs=None,
# outputs=[zipfile, trigger],
# js=change.trigger_to_zipfile_js
# )
# zipfile_to_offline_btn.click(
# fn=None,
# inputs=None,
# outputs=[zipfile, offline],
# js=change.zipfile_to_offline_js
# )
# trigger_to_offline_btn.click(
# fn=None,
# inputs=None,
# outputs=[trigger, offline],
# js=change.trigger_to_offline_js
# )
# offline_to_zipfile_btn.click(
# fn=None,
# inputs=None,
# outputs=[zipfile, offline],
# js=change.offline_to_zipfile_js
# )
# offline_to_trigger_btn.click(
# fn=None,
# inputs=None,
# outputs=[trigger, offline],
# js=change.offline_to_trigger_js
# )
block.queue(default_concurrency_limit=5)
block.launch()