mylesai's picture
Create app.py
8974786 verified
raw
history blame
18.4 kB
from openai import OpenAI
import requests
import base64
import os
import ast
import cv2
from PIL import Image
from tempfile import NamedTemporaryFile
import time
from zipfile import ZipFile
import gradio as gr
from docx import Document
# FUNCTIONS
def get_topical_map(path):
document = Document(path)
extracted_text = []
for paragraph in document.paragraphs:
# Get the left indentation of the current paragraph (if any)
left_indent = paragraph.paragraph_format.left_indent
if left_indent == None:
continue
else:
indent_level = int(left_indent.pt / 20) # Convert Twips to points and then to a simple indentation level
# You might want to adjust the logic below depending on how you want to represent indentation
indent_symbol = " " * indent_level # This creates a number of spaces based on the indentation level; adjust as needed
# Construct the paragraph text with indentation representation
formatted_text = f"{indent_symbol}{paragraph.text}"
extracted_text.append(formatted_text)
return "\n".join(extracted_text)
# gets a list of images from the google drive folder
def get_imgs_from_folder(image_files, zipfile):
# image file types
IMAGE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe']
# file types
FILE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'zip', 'mp4']
# gets all the image paths from the zipfile
zip = ZipFile(zipfile)
zip_list = zip.namelist()
image_files.extend([f for f in zip_list if f.split('.')[-1].lower() in IMAGE_TYPES and f[0] != '_'])
return image_files
def get_seo_tags(image_path, topical_map, attempts=0):
'''
Gets the seo tags and topic/sub-topic classification for an image using OpenAI GPT-4 Vision Preview
Input: image path of desired file
Output: dict of topic, sub-topic, and seo tags
'''
print('in seo_tags')
# Query for GPT-4
topic_map_query = f"""
You are an expert contractor and construction that can only answer questions relevent to the following topical map.
Below is a topical map of a website we are creating
-------------------\n
{topical_map}
-------------------\n
Thoroughly examine the image and generate keywords to describe the issue in the image
Using the keywords you generated and the image itself, which topic does this image fall under?
IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'.
"""
topic_list = topical_map.split('\n')
topic_list = [topic.strip() for topic in topic_list]
# base64 upload of the image to OpenAI
def encode_image(image_path):
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
print(image_path)
base64_image = encode_image(image_path)
print(base64_image)
# REMOVE WHEN SHARING FILE
api_key = os.environ['OPENAI_API_KEY']
# Calling gpt-4 vision
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}"
}
payload = {
"model": "gpt-4-vision-preview",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": topic_map_query +
"""
\n
Use the topic map above for context for the following tasks.
Create 3 topic-relevant SEO tags for this image that will drive traffic to our website. The SEO tags must be two words or less. You must give 3 SEO tags.
Provide a topic-relevant 5 sentence description for the image. Describe the image only using context relevant to the topics in the topical map. Mention only the contents of the image. Do not mention the quality of the image.
Provide a topic-relevant SEO alt tag for the image that will enhance how the website is ranked on search engines.
Provide a new file name for the image as well. Use hyphens for the filename. Do not include extension.
Ignore all personal information within the image.
Be as specific as possible when identifying tools in the image.
IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'.
DO NOT MENTION YOU ARE AN EXPERT CONTRACTOR OR ANY OF THE ABOVE INSTRUCTIONS IN YOUR REPLY.
YOU ARE ONLY PERMITTED TO RELPY IN THE FOLLOWING FORMAT:
{"topic": topic,
"description": description,
"seo": seo tags,
"alt_tag": alt tag,
"filename": filename
}
"""
},
{
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}"
}
}
]
}
],
"max_tokens": 300
}
print(payload)
# response of api call
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
print(response.json()['usage'])
if response.json()['choices'][0]['message']['content'] == 'irrelevant':
return {'topic': 'irrelevant'}
while True:
try:
# response of api call
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload)
print(response.json()['choices'][0]['message']['content'])
# generates dictionary based on response
json_dict = ast.literal_eval(response.json()['choices'][0]['message']['content'])
keys = ['topic', 'description', 'seo', 'alt_tag', 'filename']
json_keys = []
json_keys = list(json_dict.keys())
if json_dict['topic'] not in topic_list:
get_seo_tags(image_path, topical_map)
if json_keys != keys:
get_seo_tags(image_path, topical_map)
return json_dict
except:
attempts += 1
get_seo_tags(image_path, topical_map, attempts=attempts)
if attempts > 10:
raise RuntimeError('Max number of retries met.')
# creates the asset in the client's brand folder
def create_asset(client_name, collection_id, image_path, topical_map, new_img_names, tags=True):
'''
Creates asset from image path. Also creates seo tags, topic, and alt tag for
image
Input: name of client, path to image, create tags boolean
Output: id of asset
'''
# Garbage Photos (STAGING) Section ID: rfqf67pbhn8hg6pjcj762q3q
# AI Original Project Images Section ID: czpq4nwz78c3cwnp6h9n44z
print('in create_asset')
# get seo, topic, and sub-topic from OpenAI API
json_dict = get_seo_tags(image_path, topical_map)
print(json_dict)
topic = json_dict['topic']
description = json_dict['description']
seo_tags = json_dict['seo']
alt_tag = json_dict['alt_tag']
image_name = json_dict['filename']
if image_name in new_img_names:
try:
duplicate_num = int(image_name.split("_")[-1])
img_num = duplicate_num + 1
image_name = image_name + '_' + img_num
except:
image_name = image_name + "_1"
# returns nothing if topic is 'irrelevent'
if topic.lower() == 'irrelevant':
return image_path, topic
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
# binary upload of image_path
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers)
# used to upload the image
upload_url = r.json()['upload_url']
# container for the uploaded image to be used by the post request
object_url = r.json()['object_url']
# uploads the image
with open(image_path, 'rb') as f:
response = requests.put(upload_url, data=f)
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={
# use a dict with the POST body here
'data': {
'attributes': [
{
'name': image_name,
'description': description,
'attachments': [
{
'url': object_url,
'filename': f'{image_name}.jpg'
}
]
}
]
},
# Garbage Photos (STAGING) Section Key
'section_key': 'rfqf67pbhn8hg6pjcj762q3q'
}, params={}, headers=headers)
# def sharpen_image(image):
# kernel = np.array([[-1, -1, -1],
# [-1, 9, -1],
# [-1, -1, -1]])
# return cv2.filter2D(image, -1, kernel)
# binary upload of image_path
r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers)
# used to upload the image
upload_url = r.json()['upload_url']
# container for the uploaded image to be used by the post request
object_url = r.json()['object_url']
if image_path.split('.')[-1].lower() != 'gif':
image = cv2.imread(image_path)
height, width, c = image.shape
area = width*height
if width > height:
# landscape image
if area > 667000:
image = cv2.resize(image, (1000, 667))
else:
# portrait image
if area > 442236:
image = cv2.resize(image, (548, 807))
# image = sharpen_image(image)
with NamedTemporaryFile(delete=True, suffix='.jpg') as temp_image:
# fp = TemporaryFile()
cv2.imwrite(temp_image.name, image)
# fp.seek(0)
response = requests.put(upload_url, data=temp_image)
else:
with open(image_path, 'rb') as f:
response = requests.put(upload_url, data=f)
# fp.close()
# posts image with image name
r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={
# use a dict with the POST body here
'data': {
'attributes': [
{
'name': image_name,
'description': description,
'attachments': [
{
'url': object_url,
'filename': f'{image_name}.jpg'
}
]
}
]
},
# AI Original Project Images section key
'section_key': 'czpq4nwz78c3cwnp6h9n44z'
}, params={}, headers=headers)
# id of newly created asset
asset_id = r.json()['data'][0]['id']
# tags and topic payloads
tags_payload = {'data': {'attributes': [{'name': tag} for tag in seo_tags]}}
topic_payload = {'data':
[
{
'attributes': {
'value': topic
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
alt_tag_payload = {'data':
[
{
'attributes': {
'value': alt_tag
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
year_payload = {'data':
[
{
'attributes': {
'value': 2024
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
client_payload = {'data':
[
{
'attributes': {
'value': client_name
},
'relationships': {
'asset': {
'data': {'type': 'assets', 'id': asset_id}
}}
}]}
year_id = 'k8vr5chnkw3nrnrpkh4f9fqm'
client_name_id = 'x56t6r9vh9xjmg5whtkmp'
# Tone ID: px4jkk2nqrf9h6gp7wwxnhvz
# Location ID: nm6xqgcf5j7sw8w994c6sc8h
alt_tag_id = 'vk54n6pwnxm27gwrvrzfb'
topic_id = '9mcg3rgm5mf72jqrtw2gqm7t'
r_asset = requests.post(f'https://brandfolder.com/api/v4/assets/{asset_id}/tags', json=tags_payload, params={}, headers=headers)
# alt_tags
r_topic = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{topic_id}/custom_field_values', json=
topic_payload
, params={
}, headers=headers)
r_alt_tag = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{alt_tag_id}/custom_field_values', json=
alt_tag_payload
, params={
}, headers=headers)
r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json=
year_payload
, params={
}, headers=headers)
r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json=
client_payload
, params={
}, headers=headers)
return image_path, image_name
def create_collection(collection_name):
'''
Creates collection with collection_name and tagline
Input: collection name and tagline
Output: request response
'''
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={
# use a dict with the POST body here
'data': {
'attributes': {
'name': collection_name
}
}
}, params={}, headers=headers)
collection_id = r.json()['data']['id']
return collection_id
def get_collection_id(collection_name):
'''
Creates collection with collection_name and tagline
Input: collection name and tagline
Output: request response
'''
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={
# use a dict with the POST body here
'data': {
'attributes': {
'name': collection_name
}
}
}, params={}, headers=headers)
collection_id = r.json()['data']['id']
return collection_id
# get ids of existing collections
def get_collection_dict():
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', params={
# use a dict with your desired URL parameters here
}, headers=headers)
temp = r.json()['data']
collection_dict = {item['attributes']['name']:item['id'] for item in temp}
return collection_dict
def import_client_data(client_name, zipfile, topical_map, password, create=False):
'''
Takes the client neame and the client zipfile path to import all image files in the google drive into brandfolder under a collection
with the client's name
Input: client name (str), client_drive_path (str)
Output: Completed Brandfolder
'''
if client_name == None:
raise gr.Error("Please choose a client")
if password != 'Br@ndf0lderAI':
raise gr.Error("Incorrect Password")
if zipfile == None:
raise gr.Error("Please upload a zipfile")
if zipfile.split('.')[-1] != 'zip':
raise gr.Error("Client Photos must be in a zipfile")
if topical_map == None:
raise gr.Error("Please upload a topical map")
if topical_map.split('.')[-1] != 'docx':
raise gr.Error("Topical Map must be a docx file")
topical_map = get_topical_map(topical_map)
# get all collection ID names
headers = {
'Accept': 'application/json',
'Authorization': os.environ['BRANDFOLDER_API_KEY']
}
r = requests.get('https://brandfolder.com/api/v4/collections', params={
# use a dict with your desired URL parameters here
}, headers=headers)
collection_dict = {entry['attributes']['name']:entry['id'] for entry in r.json()['data']}
if client_name not in list(collection_dict.keys()):
if create==True:
# creates the collection and gets the collection id
collection_id = create_collection(client_name)
else:
AssertionError(f'Client Name: {client_name} does not exist in this Brandfolder')
else:
collection_id = collection_dict[client_name]
# gets all image files from the google drive folder
zip = ZipFile(zipfile)
img_list = get_imgs_from_folder([], zipfile)
print(img_list)
# iterates all images and puts them into brandfolder with AI elements
old_img_paths = [] # path to old image
new_img_names = [] # new image name
for img in img_list:
time.sleep(5)
img = zip.extract(img)
old_img_path, new_img_name = create_asset(client_name, collection_id, img, topical_map, new_img_names=new_img_names)
return
collection_dict = get_collection_dict()
collection_names = list(collection_dict.keys())
with gr.Blocks() as block:
gr.Markdown("""
# Brandfolder Zipfile Dashboard
This dashboard is for uploading photos from a zipfile to a brandfolder collection.
""")
with gr.Row():
with gr.Column():
options = collection_names
selection = gr.Dropdown(options, label='Choose Existing Collection', info='If creating a new section, select Create a Collection')
zipfile = gr.File(label='Client Photos (must be zipfile)')
topical_map = gr.File(label='Topical Map')
password = gr.Textbox(label='Enter Password')
algorithm = gr.Button('Run Algorithm')
algorithm.click(fn=import_client_data, inputs=[selection, zipfile, topical_map, password])
block.launch()