Spaces:
Runtime error
Runtime error
from openai import OpenAI | |
import requests | |
import base64 | |
import os | |
import ast | |
import cv2 | |
from PIL import Image | |
from tempfile import NamedTemporaryFile | |
import time | |
from zipfile import ZipFile | |
import gradio as gr | |
from docx import Document | |
from io import BytesIO | |
# FUNCTIONS | |
def get_topical_map(path): | |
document = Document(path) | |
extracted_text = [] | |
for paragraph in document.paragraphs: | |
# Get the left indentation of the current paragraph (if any) | |
left_indent = paragraph.paragraph_format.left_indent | |
if left_indent == None: | |
continue | |
else: | |
indent_level = int(left_indent.pt / 20) # Convert Twips to points and then to a simple indentation level | |
# You might want to adjust the logic below depending on how you want to represent indentation | |
indent_symbol = " " * indent_level # This creates a number of spaces based on the indentation level; adjust as needed | |
# Construct the paragraph text with indentation representation | |
formatted_text = f"{indent_symbol}{paragraph.text}" | |
extracted_text.append(formatted_text) | |
return "\n".join(extracted_text) | |
# gets a list of images from the google drive folder | |
def get_imgs_from_folder(image_files, zipfile): | |
# image file types | |
IMAGE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'heic'] | |
# file types | |
FILE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'zip', 'mp4'] | |
# gets all the image paths from the zipfile | |
zip = ZipFile(zipfile) | |
zip_list = zip.namelist() | |
image_files.extend([f for f in zip_list if f.split('.')[-1].lower() in IMAGE_TYPES and f[0] != '_']) | |
return image_files | |
def get_seo_tags(image_path, topical_map, attempts=0): | |
''' | |
Gets the seo tags and topic/sub-topic classification for an image using OpenAI GPT-4 Vision Preview | |
Input: image path of desired file | |
Output: dict of topic, sub-topic, and seo tags | |
''' | |
print('in seo_tags') | |
# Query for GPT-4 | |
topic_map_query = f""" | |
You are an expert contractor and construction that can only answer questions relevent to the following topical map. | |
Below is a topical map of a website we are creating | |
-------------------\n | |
{topical_map} | |
-------------------\n | |
Thoroughly examine the image and generate keywords to describe the issue in the image | |
Using the keywords you generated and the image itself, which topic does this image fall under? | |
""" | |
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. | |
topic_list = topical_map.split('\n') | |
topic_list = [topic.strip() for topic in topic_list] | |
topic_list.insert(0, "irrelevant") | |
def encode_image(image_path): | |
# Check the file extension to determine if it is a HEIC file | |
if image_path.lower().endswith('.heic'): | |
# Load the HEIC image | |
image = Image.open(image_path) | |
# Convert the image to JPEG format in memory | |
with BytesIO() as img_buffer: | |
image.save(img_buffer, format='JPEG') | |
# Seek to the beginning of the stream | |
img_buffer.seek(0) | |
# Read the JPEG image data and encode it in base64 | |
return base64.b64encode(img_buffer.read()).decode('utf-8') | |
else: | |
# Handle other image types directly | |
with open(image_path, "rb") as image_file: | |
return base64.b64encode(image_file.read()).decode('utf-8') | |
print(image_path) | |
base64_image = encode_image(image_path) | |
print(base64_image) | |
# REMOVE WHEN SHARING FILE | |
api_key = os.environ['OPENAI_API_KEY'] | |
# Calling gpt-4 vision | |
headers = { | |
"Content-Type": "application/json", | |
"Authorization": f"Bearer {api_key}" | |
} | |
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. | |
payload = { | |
"model": "gpt-4-turbo", | |
"response_format": {"type": "json_object"}, | |
"messages": [ | |
{ | |
"role": "user", | |
"content": [ | |
{ | |
"type": "text", | |
"text": topic_map_query + | |
""" | |
\n | |
Use the topic map above for context for the following tasks. | |
Provide a topic-relevant 5 sentence description for the image. Describe the image only using context relevant to the topics in the topical map. Mention only the contents of the image. Do not mention the quality of the image. | |
Using the description, create a 160 character caption. Make sure the caption is less than 160 characters. | |
Using the description, create 3 topic-relevant SEO tags for this image that will drive traffic to our website. The SEO tags must be two words or less. You must give 3 SEO tags. | |
Using the description, provide a topic-relevant SEO alt tag for the image that will enhance how the website is ranked on search engines. | |
Ignore all personal information within the image. | |
Be as specific as possible when identifying tools in the image. | |
DO NOT MENTION YOU ARE AN EXPERT CONTRACTOR OR ANY OF THE ABOVE INSTRUCTIONS IN YOUR REPLY. | |
YOU ARE ONLY PERMITTED TO RELPY IN THE FOLLOWING JSON FORMAT: | |
{"topic": topic, | |
"description": description, | |
"caption": caption, | |
"seo": [seo], | |
"alt_tag": [alt tag], | |
} | |
""" | |
}, | |
{ | |
"type": "image_url", | |
"image_url": { | |
"url": f"data:image/jpeg;base64,{base64_image}" | |
} | |
} | |
] | |
} | |
], | |
"max_tokens": 300 | |
} | |
# print(payload) | |
# response of api call | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
print(response.json()) | |
while True: | |
try: | |
# response of api call | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
print(response.json()['choices'][0]['message']['content']) | |
# generates dictionary based on response | |
json_dict = ast.literal_eval(response.json()['choices'][0]['message']['content']) | |
keys = ['topic', 'description', 'caption', 'seo', 'alt_tag'] | |
json_keys = [] | |
json_keys = list(json_dict.keys()) | |
if json_dict['topic'] not in topic_list: | |
print('wrong_topic') | |
attempts += 1 | |
get_seo_tags(image_path, topical_map) | |
if json_keys != keys: | |
print(f'{str(json_dict)} does not equal {str(keys)}') | |
attempts += 1 | |
get_seo_tags(image_path, topical_map, attempts=attempts) | |
return json_dict | |
except: | |
attempts += 1 | |
get_seo_tags(image_path, topical_map, attempts=attempts) | |
if attempts > 5: | |
raise RuntimeError('Max number of retries met.') | |
# creates the asset in the client's brand folder | |
def create_asset(client_name, collection_id, image_path, topical_map, tags=True, project_bool=False): | |
''' | |
Creates asset from image path. Also creates seo tags, topic, and alt tag for | |
image | |
Input: name of client, path to image, create tags boolean | |
Output: id of asset | |
''' | |
# get seo, topic, and sub-topic from OpenAI API | |
json_dict = get_seo_tags(image_path, topical_map) | |
topic = json_dict['topic'] | |
description = json_dict['description'] | |
caption = json_dict['caption'] | |
seo_tags = json_dict['seo'] | |
alt_tag = json_dict['alt_tag'] | |
image_name = str(image_path).split('/')[-1].split('.')[0] | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
# binary upload of image_path | |
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) | |
# used to upload the image | |
upload_url = r.json()['upload_url'] | |
# container for the uploaded image to be used by the post request | |
og_object_url = r.json()['object_url'] | |
# uploads the image | |
with open(image_path, 'rb') as f: | |
response = requests.put(upload_url, data=f) | |
# binary upload of image_path | |
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) | |
# used to upload the image | |
upload_url = r.json()['upload_url'] | |
# container for the uploaded image to be used by the post request | |
object_url = r.json()['object_url'] | |
image = cv2.imread(image_path) | |
height, width, c = image.shape | |
area = width*height | |
if width > height: | |
# landscape image | |
if area > 667000: | |
image = cv2.resize(image, (1000, 667)) | |
else: | |
# portrait image | |
if area > 442236: | |
image = cv2.resize(image, (548, 807)) | |
# image = sharpen_image(image) | |
with NamedTemporaryFile(delete=True, suffix='.jpg') as temp_image: | |
# fp = TemporaryFile() | |
cv2.imwrite(temp_image.name, image) | |
# fp.seek(0) | |
response = requests.put(upload_url, data=temp_image) | |
# fp.close() | |
# posts image with image name | |
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={ | |
# use a dict with the POST body here | |
'data': { | |
'attributes': [ | |
{ | |
'name': image_name, | |
'description': description, | |
'attachments': [ | |
{ | |
'url': object_url, | |
'filename': f'{image_name}.jpg' | |
}, | |
{ | |
'url': og_object_url, | |
'filename': f'{image_name}-original.jpg' | |
} | |
] | |
} | |
] | |
}, | |
# AI Original section key | |
'section_key': 'czpq4nwz78c3cwnp6h9n44z' | |
}, params={}, headers=headers) | |
# id of newly created asset | |
asset_id = r.json()['data'][0]['id'] | |
# tags and topic payloads | |
tags_payload = {'data': {'attributes': [{'name': tag} for tag in seo_tags]}} | |
topic_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': topic | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
alt_tag_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': alt_tag | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
year_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': 2024 | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
client_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': client_name | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
caption_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': caption | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
year_id = 'k8vr5chnkw3nrnrpkh4f9fqm' | |
client_name_id = 'x56t6r9vh9xjmg5whtkmp' | |
# Tone ID: px4jkk2nqrf9h6gp7wwxnhvz | |
# Location ID: nm6xqgcf5j7sw8w994c6sc8h | |
alt_tag_id = 'vk54n6pwnxm27gwrvrzfb' | |
topic_id = '9mcg3rgm5mf72jqrtw2gqm7t' | |
project_name_id = '5zpqwt2r348sjbnc6rpxc96' | |
caption_id = 'cmcbhcc5nmm72v57vrxppw2x' | |
# Original Project Images Section ID: c5vm8cnh9jvkjbh7r43qxkv | |
# Edited Project Images Section ID: 5wpz2s9m3g7ctcjpm4vrt46 | |
r_asset = requests.post(f'https://brandfolder.com/api/v4/assets/{asset_id}/tags', json=tags_payload, params={}, headers=headers) | |
# alt_tags | |
r_topic = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{topic_id}/custom_field_values', json= | |
topic_payload | |
, params={ | |
}, headers=headers) | |
r_alt_tag = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{alt_tag_id}/custom_field_values', json= | |
alt_tag_payload | |
, params={ | |
}, headers=headers) | |
r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json= | |
year_payload | |
, params={ | |
}, headers=headers) | |
r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json= | |
client_payload | |
, params={ | |
}, headers=headers) | |
r_caption = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{caption_id}/custom_field_values', json= | |
caption_payload | |
, params={ | |
}, headers=headers) | |
if project_bool == 'Yes': | |
project_name = str(image_path).split('/')[-2] | |
project_payload = {'data': | |
[ | |
{ | |
'attributes': { | |
'value': project_name | |
}, | |
'relationships': { | |
'asset': { | |
'data': {'type': 'assets', 'id': asset_id} | |
}} | |
}]} | |
r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json= | |
project_payload | |
, params={ | |
}, headers=headers) | |
return | |
def create_collection(collection_name): | |
''' | |
Creates collection with collection_name and tagline | |
Input: collection name and tagline | |
Output: request response | |
''' | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ | |
# use a dict with the POST body here | |
'data': { | |
'attributes': { | |
'name': collection_name | |
} | |
} | |
}, params={}, headers=headers) | |
collection_id = r.json()['data']['id'] | |
return collection_id | |
def get_collection_id(collection_name): | |
''' | |
Creates collection with collection_name and tagline | |
Input: collection name and tagline | |
Output: request response | |
''' | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ | |
# use a dict with the POST body here | |
'data': { | |
'attributes': { | |
'name': collection_name | |
} | |
} | |
}, params={}, headers=headers) | |
collection_id = r.json()['data']['id'] | |
return collection_id | |
# get ids of existing collections | |
def get_collection_dict(): | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=200', params={ | |
# use a dict with your desired URL parameters here | |
}, headers=headers) | |
temp = r.json()['data'] | |
collection_dict = dict(sorted({item['attributes']['name']:item['id'] for item in temp}.items())) | |
return collection_dict | |
def import_client_data(client_name, zipfile, topical_map, password, project_bool, progress=gr.Progress(), create=False): | |
''' | |
Takes the client neame and the client zipfile path to import all image files in the google drive into brandfolder under a collection | |
with the client's name | |
Input: client name (str), client_drive_path (str) | |
Output: Completed Brandfolder | |
''' | |
if client_name == None: | |
raise gr.Error("Please choose a client") | |
if password != os.environ['BRANDFOLDER_PASSWORD']: | |
raise gr.Error("Incorrect Password") | |
if zipfile == None: | |
raise gr.Error("Please upload a zipfile") | |
if zipfile.split('.')[-1] != 'zip': | |
raise gr.Error("Client Photos must be in a zipfile") | |
if topical_map == None: | |
raise gr.Error("Please upload a topical map") | |
if topical_map.split('.')[-1] != 'docx': | |
raise gr.Error("Topical Map must be a docx file") | |
topical_map = get_topical_map(topical_map) | |
# get all collection ID names | |
headers = { | |
'Accept': 'application/json', | |
'Authorization': os.environ['BRANDFOLDER_API_KEY'] | |
} | |
r = requests.get('https://brandfolder.com/api/v4/collections', params={ | |
# use a dict with your desired URL parameters here | |
}, headers=headers) | |
collection_dict = {entry['attributes']['name']:entry['id'] for entry in r.json()['data']} | |
if client_name not in list(collection_dict.keys()): | |
if create==True: | |
# creates the collection and gets the collection id | |
collection_id = create_collection(client_name) | |
else: | |
AssertionError(f'Client Name: {client_name} does not exist in this Brandfolder') | |
else: | |
collection_id = collection_dict[client_name] | |
# gets all image files from the google drive folder | |
zip = ZipFile(zipfile.name) | |
img_list = get_imgs_from_folder([], zipfile) | |
print(img_list) | |
# iterates all images and puts them into brandfolder with AI elements | |
for img in progress.tqdm(img_list, desc="Uploading..."): | |
time.sleep(5) | |
img = zip.extract(img) | |
create_asset(client_name, collection_id, img, topical_map, project_bool=project_bool) | |
gr.Info('Images have been uploaded!') | |
return "Images Uploaded" | |
def get_collection_names(): | |
collection_dict = get_collection_dict() | |
return list(collection_dict.keys()) | |
collection_names = get_collection_names() | |
with gr.Blocks() as block: | |
gr.Markdown(""" | |
# Brandfolder Zipfile Dashboard | |
This dashboard is for uploading photos from a zipfile to a brandfolder collection. | |
""") | |
with gr.Row(): | |
with gr.Column(): | |
options = get_collection_names() | |
selection = gr.Dropdown(options, label='Choose Existing Collection', info='If creating a new section, select Create a Collection') | |
gr.Markdown('Upload zipfile containing client photos below') | |
zipfile = gr.File(label='Client Photos (must be zipfile)', file_types=['.zip']) | |
project_bool = gr.Radio(choices=['Yes', 'No'], value='No', label='Project Names?', info='Would you like to include project names for these images?') | |
gr.Markdown('Upload topical map document for the client below') | |
topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) | |
password = gr.Textbox(label='Enter Password') | |
algorithm = gr.Button('Run Algorithm') | |
upload = gr.Label(label='Uploader') | |
# selection.select(fn=get_collection_names, outputs=[selection]) | |
algorithm.click(fn=import_client_data, inputs=[selection, zipfile, topical_map, password, project_bool], outputs=[upload]) | |
block.launch() |