mylesai's picture
Update app.py
1e6e8f4 verified
raw
history blame
20 kB
from openai import OpenAI
import requests
import base64
import os
import ast
import cv2
from PIL import Image
from tempfile import NamedTemporaryFile
import time
from zipfile import ZipFile
import gradio as gr
from docx import Document
from io import BytesIO
# FUNCTIONS
def get_topical_map(path):
document = Document(path)
extracted_text = []
for paragraph in document.paragraphs:
# Get the left indentation of the current paragraph (if any)
left_indent = paragraph.paragraph_format.left_indent
if left_indent == None:
continue
else:
indent_level = int(left_indent.pt / 20) # Convert Twips to points and then to a simple indentation level
# You might want to adjust the logic below depending on how you want to represent indentation
indent_symbol = " " * indent_level # This creates a number of spaces based on the indentation level; adjust as needed
# Construct the paragraph text with indentation representation
formatted_text = f"{indent_symbol}{paragraph.text}"
extracted_text.append(formatted_text)
return "\n".join(extracted_text)
# gets a list of images from the google drive folder
def get_imgs_from_folder(image_files, zipfile):
# image file types
IMAGE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'heic']
# file types
FILE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'zip', 'mp4']
# gets all the image paths from the zipfile
zip = ZipFile(zipfile)
zip_list = zip.namelist()
image_files.extend([f for f in zip_list if f.split('.')[-1].lower() in IMAGE_TYPES and f[0] != '_'])
return image_files
def get_seo_tags(image_path, topical_map, attempts=0):
'''
Gets the seo tags and topic/sub-topic classification for an image using OpenAI GPT-4 Vision Preview
Input: image path of desired file
Output: dict of topic, sub-topic, and seo tags
'''
print('in seo_tags')
# Query for GPT-4
topic_map_query = f"""
You are an expert contractor and construction that can only answer questions relevent to the following topical map.
Below is a topical map of a website we are creating
-------------------\n
{topical_map}
-------------------\n
Thoroughly examine the image and generate keywords to describe the issue in the image
Using the keywords you generated and the image itself, which topic does this image fall under?
"""
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'.
topic_list = topical_map.split('\n')
topic_list = [topic.strip() for topic in topic_list]
topic_list.insert(0, "irrelevant")
def encode_image(image_path):
# Check the file extension to determine if it is a HEIC file
if image_path.lower().endswith('.heic'):
# Load the HEIC image
image = Image.open(image_path)
# Convert the image to JPEG format in memory
with BytesIO() as img_buffer:
image.save(img_buffer, format='JPEG')
# Seek to the beginning of the stream
img_buffer.seek(0)
# Read the JPEG image data and encode it in base64
return base64.b64encode(img_buffer.read()).decode('utf-8')
else:
# Handle other image types directly
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
print(image_path)
base64_image = encode_image(image_path)
print(base64_image)
# REMOVE WHEN SHARING FILE
api_key = os.environ['OPENAI_API_KEY']
# Calling gpt-4 vision
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
# IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'.
payload = {
"model": "gpt-4-turbo",
"response_format": {"type": "json_object"},
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": topic_map_query +
"""
\n
Use the topic map above for context for the following tasks.
Provide a topic-relevant 5 sentence description for the image. Describe the image only using context relevant to the topics in the topical map. Mention only the contents of the image. Do not mention the quality of the image.
Using the description, create a 160 character caption. Make sure the caption is less than 160 characters.
Using the description, create 3 topic-relevant SEO tags for this image that will drive traffic to our website. The SEO tags must be two words or less. You must give 3 SEO tags.
Using the description, provide a topic-relevant SEO alt tag for the image that will enhance how the website is ranked on search engines.
Ignore all personal information within the image.
Be as specific as possible when identifying tools in the image.
DO NOT MENTION YOU ARE AN EXPERT CONTRACTOR OR ANY OF THE ABOVE INSTRUCTIONS IN YOUR REPLY.
YOU ARE ONLY PERMITTED TO RELPY IN THE FOLLOWING JSON FORMAT:
{"topic": topic,
"description": description,
"caption": caption,
"seo": [seo],
"alt_tag": [alt tag],
}
"""
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
# print(payload)
# response of api call
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
print(response.json())
while True:
try:
# response of api call
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
print(response.json()['choices'][0]['message']['content'])
# generates dictionary based on response
json_dict = ast.literal_eval(response.json()['choices'][0]['message']['content'])
keys = ['topic', 'description', 'caption', 'seo', 'alt_tag']
json_keys = []
json_keys = list(json_dict.keys())
if json_dict['topic'] not in topic_list:
print('wrong_topic')
attempts += 1
get_seo_tags(image_path, topical_map)
if json_keys != keys:
print(f'{str(json_dict)} does not equal {str(keys)}')
attempts += 1
get_seo_tags(image_path, topical_map, attempts=attempts)
return json_dict
except:
attempts += 1
get_seo_tags(image_path, topical_map, attempts=attempts)
if attempts > 5:
raise RuntimeError('Max number of retries met.')
# creates the asset in the client's brand folder
def create_asset(client_name, collection_id, image_path, topical_map, tags=True, project_bool=False):
'''
Creates asset from image path. Also creates seo tags, topic, and alt tag for
image
Input: name of client, path to image, create tags boolean
Output: id of asset
'''
# get seo, topic, and sub-topic from OpenAI API
json_dict = get_seo_tags(image_path, topical_map)
topic = json_dict['topic']
description = json_dict['description']
caption = json_dict['caption']
seo_tags = json_dict['seo']
alt_tag = json_dict['alt_tag']
image_name = str(image_path).split('/')[-1].split('.')[0]
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
# binary upload of image_path
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers)
# used to upload the image
upload_url = r.json()['upload_url']
# container for the uploaded image to be used by the post request
og_object_url = r.json()['object_url']
# uploads the image
with open(image_path, 'rb') as f:
response = requests.put(upload_url, data=f)
# binary upload of image_path
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers)
# used to upload the image
upload_url = r.json()['upload_url']
# container for the uploaded image to be used by the post request
object_url = r.json()['object_url']
image = cv2.imread(image_path)
height, width, c = image.shape
area = width*height
if width > height:
# landscape image
if area > 667000:
image = cv2.resize(image, (1000, 667))
else:
# portrait image
if area > 442236:
image = cv2.resize(image, (548, 807))
# image = sharpen_image(image)
with NamedTemporaryFile(delete=True, suffix='.jpg') as temp_image:
# fp = TemporaryFile()
cv2.imwrite(temp_image.name, image)
# fp.seek(0)
response = requests.put(upload_url, data=temp_image)
# fp.close()
# posts image with image name
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={
# use a dict with the POST body here
'data': {
'attributes': [
{
'name': image_name,
'description': description,
'attachments': [
{
'url': object_url,
'filename': f'{image_name}.jpg'
},
{
'url': og_object_url,
'filename': f'{image_name}-original.jpg'
}
]
}
]
},
# AI Original section key
'section_key': 'czpq4nwz78c3cwnp6h9n44z'
}, params={}, headers=headers)
# id of newly created asset
asset_id = r.json()['data'][0]['id']
# tags and topic payloads
tags_payload = {'data': {'attributes': [{'name': tag} for tag in seo_tags]}}
topic_payload = {'data':
[
{
'attributes': {
'value': topic
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
alt_tag_payload = {'data':
[
{
'attributes': {
'value': alt_tag
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
year_payload = {'data':
[
{
'attributes': {
'value': 2024
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
client_payload = {'data':
[
{
'attributes': {
'value': client_name
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
caption_payload = {'data':
[
{
'attributes': {
'value': caption
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
year_id = 'k8vr5chnkw3nrnrpkh4f9fqm'
client_name_id = 'x56t6r9vh9xjmg5whtkmp'
# Tone ID: px4jkk2nqrf9h6gp7wwxnhvz
# Location ID: nm6xqgcf5j7sw8w994c6sc8h
alt_tag_id = 'vk54n6pwnxm27gwrvrzfb'
topic_id = '9mcg3rgm5mf72jqrtw2gqm7t'
project_name_id = '5zpqwt2r348sjbnc6rpxc96'
caption_id = 'cmcbhcc5nmm72v57vrxppw2x'
# Original Project Images Section ID: c5vm8cnh9jvkjbh7r43qxkv
# Edited Project Images Section ID: 5wpz2s9m3g7ctcjpm4vrt46
r_asset = requests.post(f'https://brandfolder.com/api/v4/assets/{asset_id}/tags', json=tags_payload, params={}, headers=headers)
# alt_tags
r_topic = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{topic_id}/custom_field_values', json=
topic_payload
, params={
}, headers=headers)
r_alt_tag = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{alt_tag_id}/custom_field_values', json=
alt_tag_payload
, params={
}, headers=headers)
r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json=
year_payload
, params={
}, headers=headers)
r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json=
client_payload
, params={
}, headers=headers)
r_caption = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{caption_id}/custom_field_values', json=
caption_payload
, params={
}, headers=headers)
if project_bool == 'Yes':
project_name = str(image_path).split('/')[-2]
project_payload = {'data':
[
{
'attributes': {
'value': project_name
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json=
project_payload
, params={
}, headers=headers)
return
def create_collection(collection_name):
'''
Creates collection with collection_name and tagline
Input: collection name and tagline
Output: request response
'''
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={
# use a dict with the POST body here
'data': {
'attributes': {
'name': collection_name
}
}
}, params={}, headers=headers)
collection_id = r.json()['data']['id']
return collection_id
def get_collection_id(collection_name):
'''
Creates collection with collection_name and tagline
Input: collection name and tagline
Output: request response
'''
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={
# use a dict with the POST body here
'data': {
'attributes': {
'name': collection_name
}
}
}, params={}, headers=headers)
collection_id = r.json()['data']['id']
return collection_id
# get ids of existing collections
def get_collection_dict():
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=200', params={
# use a dict with your desired URL parameters here
}, headers=headers)
temp = r.json()['data']
collection_dict = dict(sorted({item['attributes']['name']:item['id'] for item in temp}.items()))
return collection_dict
def import_client_data(client_name, zipfile, topical_map, password, project_bool, progress=gr.Progress(), create=False):
'''
Takes the client neame and the client zipfile path to import all image files in the google drive into brandfolder under a collection
with the client's name
Input: client name (str), client_drive_path (str)
Output: Completed Brandfolder
'''
if client_name == None:
raise gr.Error("Please choose a client")
if password != os.environ['BRANDFOLDER_PASSWORD']:
raise gr.Error("Incorrect Password")
if zipfile == None:
raise gr.Error("Please upload a zipfile")
if zipfile.split('.')[-1] != 'zip':
raise gr.Error("Client Photos must be in a zipfile")
if topical_map == None:
raise gr.Error("Please upload a topical map")
if topical_map.split('.')[-1] != 'docx':
raise gr.Error("Topical Map must be a docx file")
topical_map = get_topical_map(topical_map)
# get all collection ID names
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.get('https://brandfolder.com/api/v4/collections', params={
# use a dict with your desired URL parameters here
}, headers=headers)
collection_dict = {entry['attributes']['name']:entry['id'] for entry in r.json()['data']}
if client_name not in list(collection_dict.keys()):
if create==True:
# creates the collection and gets the collection id
collection_id = create_collection(client_name)
else:
AssertionError(f'Client Name: {client_name} does not exist in this Brandfolder')
else:
collection_id = collection_dict[client_name]
# gets all image files from the google drive folder
zip = ZipFile(zipfile.name)
img_list = get_imgs_from_folder([], zipfile)
print(img_list)
# iterates all images and puts them into brandfolder with AI elements
for img in progress.tqdm(img_list, desc="Uploading..."):
time.sleep(5)
img = zip.extract(img)
create_asset(client_name, collection_id, img, topical_map, project_bool=project_bool)
gr.Info('Images have been uploaded!')
return "Images Uploaded"
def get_collection_names():
collection_dict = get_collection_dict()
return list(collection_dict.keys())
collection_names = get_collection_names()
with gr.Blocks() as block:
gr.Markdown("""
# Brandfolder Zipfile Dashboard
This dashboard is for uploading photos from a zipfile to a brandfolder collection.
""")
with gr.Row():
with gr.Column():
options = get_collection_names()
selection = gr.Dropdown(options, label='Choose Existing Collection', info='If creating a new section, select Create a Collection')
gr.Markdown('Upload zipfile containing client photos below')
zipfile = gr.File(label='Client Photos (must be zipfile)', file_types=['.zip'])
project_bool = gr.Radio(choices=['Yes', 'No'], value='No', label='Project Names?', info='Would you like to include project names for these images?')
gr.Markdown('Upload topical map document for the client below')
topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx'])
password = gr.Textbox(label='Enter Password')
algorithm = gr.Button('Run Algorithm')
upload = gr.Label(label='Uploader')
# selection.select(fn=get_collection_names, outputs=[selection])
algorithm.click(fn=import_client_data, inputs=[selection, zipfile, topical_map, password, project_bool], outputs=[upload])
block.launch()