Spaces:
Runtime error
Runtime error
import openai | |
from openai import OpenAI | |
import requests | |
import base64 | |
import os | |
import ast | |
import cv2 | |
from PIL import Image, ImageSequence | |
from tempfile import NamedTemporaryFile | |
import time | |
from zipfile import ZipFile | |
import gradio as gr | |
from docx import Document | |
from io import BytesIO | |
import pyheif | |
import pandas as pd | |
import numpy as np | |
from tenacity import ( | |
retry, | |
stop_after_attempt, | |
wait_random_exponential, | |
) | |
import bf_trigger | |
import chat_engine as chat_gen | |
import content_generator as con_gen | |
# for exponential backoff | |
# FUNCTIONS | |
brandfolder_api = os.environ['BRANDFOLDER_API_KEY'] | |
def get_asset_info(asset_id): | |
''' | |
Takes information from asset_id | |
Input: asset_id | |
Output: collection_id, collection_name, section_id | |
''' | |
# asset_id = data['data']['attributes']['key'] | |
headers = { | |
'Content-Type': 'application/json', | |
'Authorization': brandfolder_api | |
} | |
r = requests.get(f'https://brandfolder.com/api/v4/assets/{asset_id}?include=section,collections,custom_fields,attachments', params={}, headers=headers) | |
# gets section_id | |
try: | |
section_id = r.json()['data']['relationships']['section']['data']['id'] | |
except: | |
section_id = '' | |
# gets collection_id | |
# gets collection_name | |
try: | |
collection_id = r.json()['data']['relationships']['collections']['data'][0]['id'] | |
collection_name = [item['attributes']['name'] for item in r.json()['included'] if item['type']=='collections'][0] | |
except: | |
collection_id = '' | |
collection_name = '' | |
# gets asset_name, asset_type, and asset_url | |
try: | |
asset_type = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['value']=='Photo'][0] | |
except: | |
asset_type = '' | |
try: | |
asset_name = r.json()['data']['attributes']['name'] | |
except: | |
asset_name = '' | |
try: | |
access_key = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'What is your Access Code?'][0] | |
except: | |
access_key = '' | |
try: | |
asset_url = [item['attributes']['url'] for item in r.json()['included'] if item['type'] == 'attachments'][0] | |
except: | |
asset_url = '' | |
try: | |
client_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'Client Name'][0] | |
except: | |
client_name = '' | |
try: | |
project_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'List Project Name Photos Belong To'][0] | |
except: | |
project_name = '' | |
return_dict = { | |
"section_id": section_id, | |
"collection_id": collection_id, | |
"collection_name": collection_name, | |
"asset_type": asset_type, | |
"asset_name": asset_name, | |
"access_key": access_key, | |
"image_url": asset_url, | |
"client_name": client_name, | |
"project_name": project_name | |
} | |
return return_dict | |
def get_all_collection_dict(): | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': brandfolder_api | |
} | |
r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=300', params={ | |
# use a dict with your desired URL parameters here | |
}, headers=headers) | |
temp = r.json()['data'] | |
collection_dict = {item['attributes']['name']:item['id'] for item in temp} | |
return collection_dict | |
def get_collection_names(): | |
collection_dict = get_collection_dict() | |
return list(collection_dict.keys()) | |
def rename(filename): | |
client = OpenAI() | |
completion = client.chat.completions.create( | |
model="gpt-4o", | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant specializing in renaming files."}, | |
{"role": "user", "content": f"Provide a similar name for this filename: {filename}. Only return the filename and use hyphens in the filename."} | |
] | |
) | |
return completion.choices[0].message.content | |
def get_topical_map(path): | |
document = Document(path) | |
extracted_text = [] | |
for paragraph in document.paragraphs: | |
# Get the left indentation of the current paragraph (if any) | |
left_indent = paragraph.paragraph_format.left_indent | |
if left_indent == None: | |
continue | |
else: | |
indent_level = int(left_indent.pt / 20) # Convert Twips to points and then to a simple indentation level | |
# You might want to adjust the logic below depending on how you want to represent indentation | |
indent_symbol = " " * indent_level # This creates a number of spaces based on the indentation level; adjust as needed | |
# Construct the paragraph text with indentation representation | |
formatted_text = f"{indent_symbol}{paragraph.text}" | |
extracted_text.append(formatted_text) | |
return "\n".join(extracted_text) | |
# gets a list of images from the google drive folder | |
def get_imgs_from_folder(image_files, zipfile): | |
# image file types | |
IMAGE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd'] | |
# file types | |
FILE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'zip', 'mp4', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd'] | |
# gets all the image paths from the zipfile | |
zip = ZipFile(zipfile) | |
zip_list = zip.namelist() | |
image_files.extend([f for f in zip_list if f.split('.')[-1].lower() in IMAGE_TYPES and f[0] != '_']) | |
return image_files | |
def get_seo_tags(image_path, topical_map, new_imgs, attempts=0, max_attempts=6): | |
''' | |
Gets the seo tags and topic/sub-topic classification for an image using OpenAI GPT-4 Vision Preview | |
Input: image path of desired file | |
Output: dict of topic, sub-topic, and seo tags | |
''' | |
if attempts > max_attempts: | |
print("Maximum number of retries exceeded.") | |
return {"error": "Max retries exceeded, operation failed."} | |
print('in seo_tags') | |
filenames = ', '.join(new_imgs) | |
# Query for GPT-4 | |
topic_map_query = f""" | |
% You are an expert web designer that can only answer questions relevent to the following Topical Map. | |
% Goal: Output the topic, description, caption, seo tags, alt_tags, and filename for this image using the Topical Map provided. | |
% TOPCIAL MAP | |
```{topical_map}``` | |
""" | |
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. | |
topic_list = topical_map.split('\n') | |
topic_list = [topic.strip() for topic in topic_list] | |
topic_list.insert(0, "irrelevant") | |
def encode_image(image_path): | |
# Check the file size (in bytes) | |
file_size = os.path.getsize(image_path) | |
# Define the maximum file size for compression (20 MB) | |
max_size = 20 * 1024 * 1024 # 20 MB in bytes | |
if image_path.lower().endswith('.heic'): | |
# Read the HEIC file | |
heif_file = pyheif.read(image_path) | |
# Convert to a PIL image | |
image = Image.frombytes( | |
heif_file.mode, | |
heif_file.size, | |
heif_file.data, | |
"raw", | |
heif_file.mode, | |
heif_file.stride, | |
) | |
elif image_path.lower().endswith('.gif'): | |
# Open GIF image | |
with Image.open(image_path) as img: | |
# Extract the first frame of the GIF | |
image = img.convert('RGB') | |
else: | |
# Open other image types with PIL directly | |
image = Image.open(image_path) | |
# Convert image to RGB if it has an incompatible mode | |
if image.mode not in ['RGB', 'L']: # L is for grayscale | |
image = image.convert('RGB') | |
# Use in-memory buffer for processing | |
with BytesIO() as img_buffer: | |
if file_size > max_size: | |
# Adjust the quality to reduce the file size | |
image.save(img_buffer, format='JPEG', quality=75) | |
else: | |
# Save the image without changing the quality if not needed | |
image.save(img_buffer, format='JPEG', quality=85) | |
# Seek to the beginning of the stream | |
img_buffer.seek(0) | |
# Read the JPEG image data and encode it in base64 | |
return base64.b64encode(img_buffer.read()).decode('utf-8') | |
print(image_path) | |
base64_image = encode_image(image_path) | |
# REMOVE WHEN SHARING FILE | |
api_key = os.environ['OPENAI_API_KEY'] | |
# Calling gpt-4 vision | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {api_key}" | |
} | |
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. | |
payload = { | |
"model": "gpt-4o", | |
"response_format": {"type": "json_object"}, | |
"messages": [ | |
{'role': 'system', 'content': 'You are an expert web designer that can only answer questions relevent to the following topical map.' | |
}, | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": topic_map_query + | |
""" | |
% INSTRUCTIONS | |
Step 1 - Generate keywords to describe this image | |
Step 2 - Decide which topic in the Topicla Map this image fall under, using the keywords you generated and the image itself. You are only permitted to use the exact wording of the topic in the topical map. | |
Step 2 - Provide a topic-relevant 5 sentence description for the image. Describe the image only using context relevant to the topics in the topical map. | |
Adhere to the following guidelines when crafting your 5 sentence description: | |
- Mention only the contents of the image. | |
- Do not mention the quality of the image. | |
- Ignore all personal information within the image. | |
- Be as specific as possible when identifying tools/items in the image. | |
Step 3 - Using the description in Step 1, create a 160 character caption. Make sure the caption is less than 160 characters. | |
Step 4 - Using the description in Step 1, create 3 topic-relevant SEO tags for this image that will drive traffic to our website. The SEO tags must be two words or less. You must give 3 SEO tags. | |
Step 5 - Using the description in Step 1, provide a topic-relevant SEO alt tag for the image that will enhance how the website is ranked on search engines. | |
Step 6 - Using the description in Step 1, provide a new and unique filename for the image as well. Use hyphens for the filename. Do not include extension. | |
Step 7 - YOU ARE ONLY PERMITTED TO OUTPUT THE TOPIC, DESCRIPTION, CAPTION, SEO, ALT_TAG, AND FILENAME IN THE FOLLOWING JSON FORMAT: | |
% OUTPUT FORMAT: | |
{"topic": topic, | |
"description": description, | |
"caption": caption, | |
"seo": [seo], | |
"alt_tag": [alt tag], | |
"filename": filename | |
} | |
""" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64, {base64_image}" | |
} | |
} | |
] | |
} | |
], | |
"max_tokens": 300 | |
} | |
try: | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
response_data = response.json() | |
print(response_data) | |
if response.status_code == 200 and 'choices' in response_data and len(response_data['choices']) > 0: | |
keys = ['topic', 'description', 'caption', 'seo', 'alt_tag', 'filename'] | |
json_dict = ast.literal_eval(response.json()['choices'][0]['message']['content']) | |
if json_dict['topic'] not in topic_list: | |
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1) | |
if set(json_dict.keys()) != set(keys): | |
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1) | |
return json_dict | |
else: | |
print("API call failed or bad data, retrying...") | |
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1) | |
except Exception as e: | |
time.sleep(5*attempts) | |
print("Exception during API call:", str(e)) | |
return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1) | |
def read_image(image_path): | |
if image_path.lower().endswith('.heic'): | |
# Read and convert HEIC file | |
heif_file = pyheif.read(image_path) | |
image = Image.frombytes( | |
heif_file.mode, | |
heif_file.size, | |
heif_file.data, | |
"raw", | |
heif_file.mode, | |
heif_file.stride, | |
) | |
# Convert PIL image to OpenCV format | |
image = np.array(image) | |
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
elif image_path.lower().endswith('.gif'): | |
# Open GIF and convert the first frame to RGB | |
with Image.open(image_path) as img: | |
for frame in ImageSequence.Iterator(img): | |
frame = frame.convert('RGB') | |
# Convert PIL image to OpenCV format | |
image = np.array(frame) | |
image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) | |
break # Only process the first frame | |
else: | |
# Use OpenCV for other formats | |
image = cv2.imread(image_path) | |
return image | |
def process_image(image_path): | |
image = read_image(image_path) | |
height, width, c = image.shape | |
area = width * height | |
if width > height: | |
# Landscape image | |
if area > 667000: | |
image = cv2.resize(image, (1000, 667)) | |
else: | |
# Portrait image | |
if area > 442236: | |
image = cv2.resize(image, (548, 807)) | |
return image | |
def convert_heic_to_jpeg(heic_path): | |
# Read the HEIC file | |
heif_file = pyheif.read(heic_path) | |
# Convert to a PIL image | |
image = Image.frombytes( | |
heif_file.mode, | |
heif_file.size, | |
heif_file.data, | |
"raw", | |
heif_file.mode, | |
heif_file.stride, | |
) | |
# Convert image to JPEG in memory | |
jpeg_buffer = BytesIO() | |
image.save(jpeg_buffer, format="JPEG") | |
jpeg_buffer.seek(0) | |
return jpeg_buffer | |
def upload_image(image_path, upload_url): | |
# Check if the image is a HEIC file | |
print(image_path) | |
if image_path.lower().endswith('.heic'): | |
# Convert HEIC to JPEG | |
data = convert_heic_to_jpeg(image_path) | |
else: | |
# Open other image types directly | |
data = open(image_path, 'rb') | |
# Upload the image | |
response = requests.put(upload_url, data=data) | |
# Ensure you close the file stream if opened directly | |
if not image_path.lower().endswith('.heic'): | |
data.close() | |
return response | |
# creates the asset in the client's brand folder | |
def create_asset(client_name, collection_id, image_path, topical_map, new_imgs, tags=True, project_bool=False): | |
''' | |
Creates asset from image path. Also creates seo tags, topic, and alt tag for | |
image | |
Input: name of client, path to image, create tags boolean | |
Output: id of asset | |
''' | |
# get seo, topic, and sub-topic from OpenAI API | |
json_dict = get_seo_tags(image_path, topical_map, new_imgs) | |
if not json_dict: | |
json_dict = get_seo_tags(image_path, topical_map, new_imgs) | |
topic = json_dict['topic'] | |
description = json_dict['description'] | |
caption = json_dict['caption'] | |
seo_tags = json_dict['seo'] | |
alt_tag = json_dict['alt_tag'] | |
image_name = json_dict['filename'] | |
counter = 1 | |
while image_name in new_imgs: | |
image_name = f'{image_name}_{counter}' | |
counter += 1 | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.get(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', params={ | |
# use a dict with your desired URL parameters here | |
}, headers=headers) | |
asset_names = [item['attributes']['name'] for item in r.json()['data']] | |
asset_names = new_imgs + asset_names | |
while image_name in asset_names: | |
image_name = rename(image_name) | |
# binary upload of image_path | |
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) | |
# used to upload the image | |
upload_url = r.json()['upload_url'] | |
# container for the uploaded image to be used by the post request | |
og_object_url = r.json()['object_url'] | |
response = upload_image(image_path, upload_url) | |
# binary upload of image_path | |
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) | |
# used to upload the image | |
upload_url = r.json()['upload_url'] | |
# container for the uploaded image to be used by the post request | |
object_url = r.json()['object_url'] | |
image = process_image(image_path) | |
# image = sharpen_image(image) | |
with NamedTemporaryFile(delete=True, suffix='.jpg') as temp_image: | |
# fp = TemporaryFile() | |
cv2.imwrite(temp_image.name, image, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) | |
# fp.seek(0) | |
response = requests.put(upload_url, data=temp_image) | |
# fp.close() | |
# posts image with image name | |
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={ | |
# use a dict with the POST body here | |
'data': { | |
'attributes': [ | |
{ | |
'name': image_name, | |
'description': description, | |
'attachments': [ | |
{ | |
'url': object_url, | |
'filename': f'{image_name}.jpg' | |
}, | |
{ | |
'url': og_object_url, | |
'filename': f'{image_name}-original.jpg' | |
} | |
] | |
} | |
] | |
}, | |
# AI Processed section key | |
'section_key': 'czpq4nwz78c3cwnp6h9n44z' | |
}, params={}, headers=headers) | |
# id of newly created asset | |
asset_id = r.json()['data'][0]['id'] | |
# tags and topic payloads | |
tags_payload = {'data': {'attributes': [{'name': tag} for tag in seo_tags]}} | |
topic_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': topic | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
alt_tag_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': alt_tag | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
year_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': 2024 | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
client_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': client_name | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
caption_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': caption | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
year_id = 'k8vr5chnkw3nrnrpkh4f9fqm' | |
client_name_id = 'x56t6r9vh9xjmg5whtkmp' | |
# Tone ID: px4jkk2nqrf9h6gp7wwxnhvz | |
# Location ID: nm6xqgcf5j7sw8w994c6sc8h | |
alt_tag_id = 'vk54n6pwnxm27gwrvrzfb' | |
topic_id = '9mcg3rgm5mf72jqrtw2gqm7t' | |
project_name_id = '5zpqwt2r348sjbnc6rpxc96' | |
caption_id = 'cmcbhcc5nmm72v57vrxppw2x' | |
# Original Project Images Section ID: c5vm8cnh9jvkjbh7r43qxkv | |
# Edited Project Images Section ID: 5wpz2s9m3g7ctcjpm4vrt46 | |
r_asset = requests.post(f'https://brandfolder.com/api/v4/assets/{asset_id}/tags', json=tags_payload, params={}, headers=headers) | |
# alt_tags | |
r_topic = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{topic_id}/custom_field_values', json= | |
topic_payload | |
, params={ | |
}, headers=headers) | |
r_alt_tag = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{alt_tag_id}/custom_field_values', json= | |
alt_tag_payload | |
, params={ | |
}, headers=headers) | |
r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json= | |
year_payload | |
, params={ | |
}, headers=headers) | |
r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json= | |
client_payload | |
, params={ | |
}, headers=headers) | |
r_caption = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{caption_id}/custom_field_values', json= | |
caption_payload | |
, params={ | |
}, headers=headers) | |
if project_bool == 'Yes': | |
project_name = str(image_path).split('/')[-2] | |
project_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': project_name | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json= | |
project_payload | |
, params={ | |
}, headers=headers) | |
return image_name | |
def create_asset_no_ai(client_name, collection_id, image_path, project_bool=False): | |
''' | |
Creates an asset without going through the AI process | |
''' | |
image_name = str(image_path).split('/')[-1].split('.')[0] | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': 'eyJhbGciOiJIUzI1NiJ9.eyJvcmdhbml6YXRpb25fa2V5IjoiZmY0cmt0NDNoMzRtMjVoa2duNWJteDlmIiwiaWF0IjoxNzA1OTQ4NjI3LCJ1c2VyX2tleSI6IjhyNnhxeDR6bTdyN2Z4NnJqY25jM2IzIiwic3VwZXJ1c2VyIjpmYWxzZX0.xUPT9j08a0THBwW_0GkQjllJxmjeDGtcPeoIOu_w9Zs' | |
} | |
# binary upload of image_path | |
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) | |
# used to upload the image | |
upload_url = r.json()['upload_url'] | |
# container for the uploaded image to be used by the post request | |
object_url = r.json()['object_url'] | |
# uploads the image | |
response = upload_image(image_path, upload_url) | |
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={ | |
# use a dict with the POST body here | |
'data': { | |
'attributes': [ | |
{ | |
'name': image_name, | |
'attachments': [ | |
{ | |
'url': object_url, | |
'filename': f'{image_name}.jpg' | |
} | |
] | |
} | |
] | |
}, | |
# Original Project Assets | |
'section_key': 'c5vm8cnh9jvkjbh7r43qxkv' | |
}, params={}, headers=headers) | |
# id of newly created asset | |
asset_id = r.json()['data'][0]['id'] | |
year_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': 2024 | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
client_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': client_name | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
year_id = 'k8vr5chnkw3nrnrpkh4f9fqm' | |
client_name_id = 'x56t6r9vh9xjmg5whtkmp' | |
r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json= | |
year_payload | |
, params={ | |
}, headers=headers) | |
r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json= | |
client_payload | |
, params={ | |
}, headers=headers) | |
if project_bool.lower() == 'yes': | |
project_name_id = '5zpqwt2r348sjbnc6rpxc96' | |
project_name = str(image_path).split('/')[-2] | |
project_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': project_name | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json= | |
project_payload | |
, params={ | |
}, headers=headers) | |
return | |
def create_collection(collection_name): | |
''' | |
Creates collection with collection_name and tagline | |
Input: collection name and tagline | |
Output: request response | |
''' | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ | |
# use a dict with the POST body here | |
'data': { | |
'attributes': { | |
'name': collection_name | |
} | |
} | |
}, params={}, headers=headers) | |
collection_id = r.json()['data']['id'] | |
return collection_id | |
def get_collection_id(collection_name): | |
''' | |
Creates collection with collection_name and tagline | |
Input: collection name and tagline | |
Output: request response | |
''' | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ | |
# use a dict with the POST body here | |
'data': { | |
'attributes': { | |
'name': collection_name | |
} | |
} | |
}, params={}, headers=headers) | |
collection_id = r.json()['data']['id'] | |
return collection_id | |
# get ids of existing collections | |
def get_collection_dict(): | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=200', params={ | |
# use a dict with your desired URL parameters here | |
}, headers=headers) | |
temp = r.json()['data'] | |
collection_dict = dict(sorted({item['attributes']['name']:item['id'] for item in temp}.items())) | |
return collection_dict | |
def import_client_data(client_name, zipfile, topical_map, password, project_bool, ai_bool, progress=gr.Progress(), create=False): | |
''' | |
Takes the client neame and the client zipfile path to import all image files in the google drive into brandfolder under a collection | |
with the client's name | |
Input: client name (str), client_drive_path (str) | |
Output: Completed Brandfolder | |
''' | |
print(zipfile) | |
if client_name == None: | |
raise gr.Error("Please choose a client") | |
if password != os.environ['BRANDFOLDER_PASSWORD']: | |
raise gr.Error("Incorrect Password") | |
if zipfile == None: | |
raise gr.Error("Please upload a zipfile") | |
if zipfile.split('.')[-1] != 'zip': | |
raise gr.Error("Client Photos must be in a zipfile") | |
if ai_bool.lower() == 'on': | |
if topical_map == None: | |
raise gr.Error("Please upload a topical map") | |
if topical_map.split('.')[-1] != 'docx': | |
raise gr.Error("Topical Map must be a docx file") | |
topical_map = get_topical_map(topical_map) | |
# get all collection ID names | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.get('https://brandfolder.com/api/v4/collections?per=200', params={ | |
# use a dict with your desired URL parameters here | |
}, headers=headers) | |
collection_dict = {entry['attributes']['name']:entry['id'] for entry in r.json()['data']} | |
if client_name not in list(collection_dict.keys()): | |
if create==True: | |
# creates the collection and gets the collection id | |
collection_id = create_collection(client_name) | |
else: | |
AssertionError(f'Client Name: {client_name} does not exist in this Brandfolder') | |
else: | |
collection_id = collection_dict[client_name] | |
# gets all image files from the google drive folder | |
img_lists = [] | |
img_dict = {} | |
for zip in zipfile: | |
zip_name = ZipFile(zip.name) | |
unpack_list = get_imgs_from_folder([], zip) | |
for img in unpack_list: | |
img_dict.update({img:zip_name}) | |
img_lists.append(unpack_list) | |
img_list = sum(img_lists, []) | |
new_imgs = [] | |
error_imgs = [] | |
error_imgs_text = 'No errors detected.' | |
# iterates all images and puts them into brandfolder with AI elements | |
for img in progress.tqdm(img_list, desc="Uploading..."): | |
zip = img_dict[img] | |
img = zip.extract(img) | |
print(client_name) | |
try: | |
if ai_bool.lower() == 'on': | |
time.sleep(15) | |
new_img = create_asset(client_name, collection_id, img, topical_map, new_imgs, project_bool=project_bool) | |
new_imgs.append(new_img) | |
elif ai_bool.lower() == 'off': | |
create_asset_no_ai(client_name, collection_id, img, project_bool=project_bool) | |
except Exception as e: | |
error_imgs.append(f'{str(img)}; error: {e}\n') | |
print(f'An unexpected error occured processing {img}: {e}') | |
gr.Info('Images have been uploaded!') | |
if error_imgs: | |
error_imgs_text = '\n'.join(error_imgs) | |
return "Images Uploaded", error_imgs_text | |
def get_collection_names(): | |
collection_dict = get_collection_dict() | |
return list(collection_dict.keys()) | |
def upload_file(files): | |
file_paths = [file.name for file in files] | |
return file_paths | |
def chatbot_response(message, history, chat_engine): | |
stream = chat_engine.stream_chat(message, chat_history=history) | |
return stream | |
def generate_content(csv_file, query_engine): | |
print(csv_file) | |
df = con_gen.get_content_csv(csv_file, query_engine[-1]) | |
data_preview = df.head(10) | |
file_name = './output.csv' | |
df.to_csv('./output.csv') | |
completion_status = "Done" | |
return completion_status, data_preview, gr.DownloadButton(label='Download AI Content', value=file_name, visible=True) | |
collection_names = get_collection_names() | |
with gr.Blocks() as block: | |
gr.Markdown(""" | |
# Brandfolder Zipfile Dashboard | |
This dashboard is for uploading photos from a zipfile to a brandfolder collection. | |
""") | |
chat_engine = gr.State([]) | |
query_engine = gr.State([]) | |
def generate_chat_engine(dna_documents, chat_engine, query_engine): | |
chat, query, response = chat_gen.get_chat_engine(dna_documents) | |
chat_engine.append(chat) | |
query_engine.append(query) | |
return chat_engine, query_engine, response | |
with gr.Column(visible=True, elem_id='login') as login: | |
password = gr.Textbox(label='Enter Password') | |
dna_documents = gr.File(label='Upload DNA Documents', file_count='multiple') | |
chat_gen_btn = gr.Button("Generate DNA LLM") | |
chat_gen_progress = gr.Label(label='LLM Created') | |
with gr.Tab("Zipfile Upload"): | |
with gr.Column(visible=True, elem_id='zipfile') as zipfile: | |
with gr.Row(): | |
with gr.Column(): | |
options = get_collection_names() | |
selection = gr.Dropdown(options, label='Choose Existing Collection', info='If creating a new section, select Create a Collection') | |
gr.Markdown('## Upload zipfile containing client photos below') | |
zipfile = gr.File(label='Client Photos (must be zipfile)', file_count='multiple', file_types=['.zip'], interactive=False) | |
upload_btn = gr.UploadButton("Upload Zipfile(s)", file_count='multiple') | |
ai_bool = gr.Radio(choices=['On', 'Off'], label='AI Algorithm?', info = 'Would you like to use the AI Algorithm to upload these images?') | |
project_bool = gr.Radio(choices=['Yes', 'No'], label='Project Names?', info='Would you like to include project names for these images?') | |
gr.Markdown('## Upload topical map document for the client below') | |
topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) | |
algorithm = gr.Button('Run Algorithm') | |
upload = gr.Label(label='Uploader') | |
err_imgs = gr.Textbox(label="Images Not Processed") | |
stop = gr.Button("Stop Run") | |
with gr.Tab("Brandfolder AI Trigger"): | |
with gr.Column(visible=True, elem_id='trigger') as trigger: | |
gr.Markdown(''' | |
# Run AI in Brandfolder | |
This button runs the AI algorithm using all the images stored in the Pre-Processed Images section in Brandfolder. | |
The algorithm will move the new processed images to the AI Processed Images. | |
ALL COPIES OF THE IMAGES IN THE PRE-PROCESSED SECTION WILL BE DELETED AFTER PUSHING THIS BUTTON | |
''') | |
bf_options = get_collection_names() | |
bf_selection = gr.Dropdown(bf_options, label='Choose Existing Collection') | |
section = gr.Radio(choices=['Pre-Processed Images', 'Original Project Assets'], label='Which Sections is the data in?') | |
bf_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) | |
bf_button = gr.Button('Run AI algorithm for Pre-Processed Images') | |
bf_upload = gr.Label(label='Uploader') | |
stop_bf = gr.Button('Stop Run') | |
with gr.Tab("DNA LLM"): | |
with gr.Column(visible=True): | |
gr.Markdown(''' | |
# DNA LLM | |
This DNA chatbot uses the uploaded dna documents to answer questions | |
''') | |
chatbot = gr.Chatbot() | |
msg = gr.Textbox() | |
clear = gr.ClearButton([msg, chatbot]) | |
def user(user_message, history): | |
return "", history + [[user_message, None]] | |
def bot(history, chat_engine): | |
print(history) | |
user_message = history[-1][0] | |
# chat_history = [(ChatMessage(role=message[1],content=message['content'])) for message in history] | |
bot_message = chatbot_response(user_message, history[-1][1], chat_engine[-1]) | |
history[-1][1] = "" | |
for character in bot_message.response_gen: | |
history[-1][1] += character | |
time.sleep(0.1) | |
yield history | |
with gr.Tab("Website Content Spreadsheet"): | |
with gr.Column(visible=True): | |
gr.Markdown(''' | |
# Website Content Spreadsheet | |
Upload a spreadsheet with descriptions of website content | |
''') | |
website_layout_file = gr.File(label='Website Layout File') | |
con_gen_btn = gr.Button('Generate Content') | |
data_preview = gr.DataFrame(label='Processed DataFrame Preview') | |
status = gr.Textbox(label='Completion Status') | |
download_btn = gr.DownloadButton(label='Download Content', visible=False) | |
# with gr.Column(visible=False, elem_id='offline') as offline: | |
# gr.Markdown(''' | |
# # AI Processed Images Algorithm | |
# Runs the AI algorithm over the images in the AI Processed Images Section. | |
# Use this only when the Brandfolder API is not uploading images properly. | |
# The Images will not be reduced but the tags, descriptions, etc. for the images will be populated. | |
# ''') | |
# offline_options = get_collection_names() | |
# offline_selection = gr.Dropdown(offline_options, label='Choose Existing Collection') | |
# offline_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) | |
# offline_button = gr.Button('Run AI algorithm for AI Processed Images Section') | |
# offline_upload = gr.Label(label='Uploader') | |
# stop_offline = gr.Button("Stop Run") | |
# selection.select(fn=get_collection_names, outputs=[selection]) | |
# download_btn.click(download_file) | |
msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( | |
bot, [chatbot, chat_engine], chatbot) | |
clear.click(lambda: None, None, chatbot, queue=False) | |
con_gen_btn.click(generate_content, inputs=[website_layout_file, query_engine], outputs=[status, data_preview, download_btn]) | |
algo_event = algorithm.click(fn=import_client_data, inputs=[selection, zipfile, topical_map, password, project_bool, ai_bool], outputs=[upload, err_imgs]) | |
bf_event = bf_button.click(fn=bf_trigger.run_preprocess_ai, inputs=[bf_topical_map, bf_selection, section], outputs=[bf_upload]) | |
# offline_event = offline_button.click(fn=offline_update.run_preprocess_ai, inputs=[offline_topical_map, offline_selection], outputs=[offline_upload]) | |
stop.click(fn=None, inputs=None, outputs=None, cancels=[algo_event]) | |
stop_bf.click(fn=None, inputs=None, outputs=None, cancels=[bf_event]) | |
upload_btn.upload(upload_file, upload_btn, zipfile) | |
chat_gen_btn.click(generate_chat_engine, inputs=[dna_documents, chat_engine, query_engine], outputs=[chat_engine, query_engine, chat_gen_progress]) | |
# stop_offline.click(fn=None, inputs=None, outputs=None, cancels=[offline_event]) | |
block.queue(default_concurrency_limit=5) | |
block.launch() |