import openai from openai import OpenAI import requests import base64 import os import ast import cv2 from PIL import Image, ImageSequence from tempfile import NamedTemporaryFile import time from zipfile import ZipFile import gradio as gr from docx import Document from io import BytesIO import pyheif import pandas as pd import numpy as np from tenacity import ( retry, stop_after_attempt, wait_random_exponential, ) import bf_trigger import chat_engine as chat_gen import content_generator as con_gen # for exponential backoff # FUNCTIONS brandfolder_api = os.environ['BRANDFOLDER_API_KEY'] def get_asset_info(asset_id): ''' Takes information from asset_id Input: asset_id Output: collection_id, collection_name, section_id ''' # asset_id = data['data']['attributes']['key'] headers = { 'Content-Type': 'application/json', 'Authorization': brandfolder_api } r = requests.get(f'https://brandfolder.com/api/v4/assets/{asset_id}?include=section,collections,custom_fields,attachments', params={}, headers=headers) # gets section_id try: section_id = r.json()['data']['relationships']['section']['data']['id'] except: section_id = '' # gets collection_id # gets collection_name try: collection_id = r.json()['data']['relationships']['collections']['data'][0]['id'] collection_name = [item['attributes']['name'] for item in r.json()['included'] if item['type']=='collections'][0] except: collection_id = '' collection_name = '' # gets asset_name, asset_type, and asset_url try: asset_type = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['value']=='Photo'][0] except: asset_type = '' try: asset_name = r.json()['data']['attributes']['name'] except: asset_name = '' try: access_key = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'What is your Access Code?'][0] except: access_key = '' try: asset_url = [item['attributes']['url'] for item in r.json()['included'] if item['type'] == 'attachments'][0] except: asset_url = '' try: client_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'Client Name'][0] except: client_name = '' try: project_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'List Project Name Photos Belong To'][0] except: project_name = '' return_dict = { "section_id": section_id, "collection_id": collection_id, "collection_name": collection_name, "asset_type": asset_type, "asset_name": asset_name, "access_key": access_key, "image_url": asset_url, "client_name": client_name, "project_name": project_name } return return_dict def get_all_collection_dict(): headers = { 'Accept': 'application/json', 'Authorization': brandfolder_api } r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=300', params={ # use a dict with your desired URL parameters here }, headers=headers) temp = r.json()['data'] collection_dict = {item['attributes']['name']:item['id'] for item in temp} return collection_dict def get_collection_names(): collection_dict = get_collection_dict() return list(collection_dict.keys()) def rename(filename): client = OpenAI() completion = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "You are a helpful assistant specializing in renaming files."}, {"role": "user", "content": f"Provide a similar name for this filename: {filename}. Only return the filename and use hyphens in the filename."} ] ) return completion.choices[0].message.content def get_topical_map(path): document = Document(path) extracted_text = [] for paragraph in document.paragraphs: # Get the left indentation of the current paragraph (if any) left_indent = paragraph.paragraph_format.left_indent if left_indent == None: continue else: indent_level = int(left_indent.pt / 20) # Convert Twips to points and then to a simple indentation level # You might want to adjust the logic below depending on how you want to represent indentation indent_symbol = " " * indent_level # This creates a number of spaces based on the indentation level; adjust as needed # Construct the paragraph text with indentation representation formatted_text = f"{indent_symbol}{paragraph.text}" extracted_text.append(formatted_text) return "\n".join(extracted_text) # gets a list of images from the google drive folder def get_imgs_from_folder(image_files, zipfile): # image file types IMAGE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd'] # file types FILE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'zip', 'mp4', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd'] # gets all the image paths from the zipfile zip = ZipFile(zipfile) zip_list = zip.namelist() image_files.extend([f for f in zip_list if f.split('.')[-1].lower() in IMAGE_TYPES and f[0] != '_']) return image_files def get_seo_tags(image_path, topical_map, new_imgs, attempts=0, max_attempts=6): ''' Gets the seo tags and topic/sub-topic classification for an image using OpenAI GPT-4 Vision Preview Input: image path of desired file Output: dict of topic, sub-topic, and seo tags ''' if attempts > max_attempts: print("Maximum number of retries exceeded.") return {"error": "Max retries exceeded, operation failed."} print('in seo_tags') filenames = ', '.join(new_imgs) # Query for GPT-4 topic_map_query = f""" % You are an expert web designer that can only answer questions relevent to the following Topical Map. % Goal: Output the topic, description, caption, seo tags, alt_tags, and filename for this image using the Topical Map provided. % TOPCIAL MAP ```{topical_map}``` """ # IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. topic_list = topical_map.split('\n') topic_list = [topic.strip() for topic in topic_list] topic_list.insert(0, "irrelevant") def encode_image(image_path): # Check the file size (in bytes) file_size = os.path.getsize(image_path) # Define the maximum file size for compression (20 MB) max_size = 20 * 1024 * 1024 # 20 MB in bytes if image_path.lower().endswith('.heic'): # Read the HEIC file heif_file = pyheif.read(image_path) # Convert to a PIL image image = Image.frombytes( heif_file.mode, heif_file.size, heif_file.data, "raw", heif_file.mode, heif_file.stride, ) elif image_path.lower().endswith('.gif'): # Open GIF image with Image.open(image_path) as img: # Extract the first frame of the GIF image = img.convert('RGB') else: # Open other image types with PIL directly image = Image.open(image_path) # Convert image to RGB if it has an incompatible mode if image.mode not in ['RGB', 'L']: # L is for grayscale image = image.convert('RGB') # Use in-memory buffer for processing with BytesIO() as img_buffer: if file_size > max_size: # Adjust the quality to reduce the file size image.save(img_buffer, format='JPEG', quality=75) else: # Save the image without changing the quality if not needed image.save(img_buffer, format='JPEG', quality=85) # Seek to the beginning of the stream img_buffer.seek(0) # Read the JPEG image data and encode it in base64 return base64.b64encode(img_buffer.read()).decode('utf-8') print(image_path) base64_image = encode_image(image_path) # REMOVE WHEN SHARING FILE api_key = os.environ['OPENAI_API_KEY'] # Calling gpt-4 vision headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } # IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. payload = { "model": "gpt-4o", "response_format": {"type": "json_object"}, "messages": [ {'role': 'system', 'content': 'You are an expert web designer that can only answer questions relevent to the following topical map.' }, { "role": "user", "content": [ { "type": "text", "text": topic_map_query + """ % INSTRUCTIONS Step 1 - Generate keywords to describe this image Step 2 - Decide which topic in the Topicla Map this image fall under, using the keywords you generated and the image itself. You are only permitted to use the exact wording of the topic in the topical map. Step 3 - Provide a topic-relevant 5 sentence description for the image. Describe the image only using context relevant to the topics in the topical map. Adhere to the following guidelines when crafting the 5 sentence description: - Mention only the contents of the image. - Avoid mentioning the quality of the image. - Ignore all personal information within the image. - Be as specific as possible when identifying tools/items in the image. Step 4 - Using the description in Step 3, create one sentence caption. Make sure the caption is less than 160 characters. Adhere to the following guidelines when crafting the caption: - Use the active present tense to create a sense of immediacy and impact - Provide additional context or insights that the image alone cannot convey through descriptions and relevant mini-stories. - Identify the subject of the image and provide context without repeating what's already in the image - Use conversational language and complete sentences - Make the caption less than 160 characters. Use python to confirm the character count % Example caption: ``` "An experienced emergency plumber swiftly addresses a kitchen flood, minimizing damage and ensuring quick restoration." ``` Step 5 - Using the description in Step 3, create 3 topic-relevant SEO tags for this image that will drive traffic to our website. The SEO tags must be two words or less. You must give 3 SEO tags. Step 6 - Using the keywords in Step 1, provide a topic-relevant SEO alt tag for the image that will enhance how the website is ranked on search engines. Adhere to the following guidelines when crafting the alt tag - Maximize the difference between this alt tag and caption in Step 4 while maintaining accuracy and relevance. - Emphasize the expertise of the company through the alt tag by demonstrating a mastery of the subject matter of the image - Describe the image content by naturally incorporating relevant keywords into the alt tag - Make the alt tag less than 125 characters. Use Python to confirm the character coun % Example alt tag: ``` "Professional flood cleanup in a kitchen using a high-power vacuum by an expert with 20+ years in emergency plumbing services." ``` Step 7 - Review both the caption and the alt tag to ensure they are completely unique from each other, with no repeated phrases or sentences. Rephrase if necessary to maximize the difference between the alt tag and caption while maintaining accuracy and relevance. Step 8 - Using the description in Step 3, provide a new and unique filename for the image as well. Use hyphens for the filename. Do not include extension. Step 9 - YOU ARE ONLY PERMITTED TO OUTPUT THE TOPIC, DESCRIPTION, CAPTION, SEO, ALT_TAG, AND FILENAME IN THE FOLLOWING JSON FORMAT: % OUTPUT FORMAT: {"topic": topic, "description": description, "caption": caption, "seo": [seo], "alt_tag": [alt tag], "filename": filename } """ }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64, {base64_image}" } } ] } ], "max_tokens": 300 } try: response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) response_data = response.json() print(response_data) if response.status_code == 200 and 'choices' in response_data and len(response_data['choices']) > 0: keys = ['topic', 'description', 'caption', 'seo', 'alt_tag', 'filename'] json_dict = ast.literal_eval(response.json()['choices'][0]['message']['content']) if json_dict['topic'] not in topic_list: return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1) if set(json_dict.keys()) != set(keys): return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1) return json_dict else: print("API call failed or bad data, retrying...") return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1) except Exception as e: time.sleep(5*attempts) print("Exception during API call:", str(e)) return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1) def read_image(image_path): if image_path.lower().endswith('.heic'): # Read and convert HEIC file heif_file = pyheif.read(image_path) image = Image.frombytes( heif_file.mode, heif_file.size, heif_file.data, "raw", heif_file.mode, heif_file.stride, ) # Convert PIL image to OpenCV format image = np.array(image) image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) elif image_path.lower().endswith('.gif'): # Open GIF and convert the first frame to RGB with Image.open(image_path) as img: for frame in ImageSequence.Iterator(img): frame = frame.convert('RGB') # Convert PIL image to OpenCV format image = np.array(frame) image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) break # Only process the first frame else: # Use OpenCV for other formats image = cv2.imread(image_path) return image def process_image(image_path): image = read_image(image_path) height, width, c = image.shape area = width * height if width > height: # Landscape image if area > 667000: image = cv2.resize(image, (1000, 667)) else: # Portrait image if area > 442236: image = cv2.resize(image, (548, 807)) return image def convert_heic_to_jpeg(heic_path): # Read the HEIC file heif_file = pyheif.read(heic_path) # Convert to a PIL image image = Image.frombytes( heif_file.mode, heif_file.size, heif_file.data, "raw", heif_file.mode, heif_file.stride, ) # Convert image to JPEG in memory jpeg_buffer = BytesIO() image.save(jpeg_buffer, format="JPEG") jpeg_buffer.seek(0) return jpeg_buffer def upload_image(image_path, upload_url): # Check if the image is a HEIC file print(image_path) if image_path.lower().endswith('.heic'): # Convert HEIC to JPEG data = convert_heic_to_jpeg(image_path) else: # Open other image types directly data = open(image_path, 'rb') # Upload the image response = requests.put(upload_url, data=data) # Ensure you close the file stream if opened directly if not image_path.lower().endswith('.heic'): data.close() return response def personalize_answer(answer, query_engine): if query_engine: prompt = f''' % You are an expert construction contracter describing this image % Goal: Add relevant company information to the Answer based on the context provided. ONLY OUTPUT THE ENHANCEMENT. DO NOT INCLUDE ANSWERS FROM STEP 1-3 IN OUTPUT. % Answer: ```{answer}``` % Instructions: Step 1 - Identify what relevant company information from the context is relevant to the Answer Step 2 - Enhance the Answer with relevant company information. This will be known as the Enhancement. Mention the expertise of the company in the Enhancement within the context of the Answer. Step 3 - Make the Enhancement the same character length as the Answer. Use Python to check that they are the same character length. Step 4 - Remove the triple quotes from the output. ONLY OUTPUT THE ENHANCEMENT. ''' response = query_engine.query(prompt) return response.response.replace("`", "") else: return answer # creates the asset in the client's brand folder def create_asset(client_name, collection_id, image_path, topical_map, new_imgs, query_engine=None, tags=True, project_bool=False): ''' Creates asset from image path. Also creates seo tags, topic, and alt tag for image Input: name of client, path to image, create tags boolean Output: id of asset ''' # get seo, topic, and sub-topic from OpenAI API json_dict = get_seo_tags(image_path, topical_map, new_imgs) if not json_dict: json_dict = get_seo_tags(image_path, topical_map, new_imgs) topic = json_dict['topic'] description = json_dict['description'] caption = json_dict['caption'] seo_tags = json_dict['seo'] alt_tag = json_dict['alt_tag'] image_name = json_dict['filename'] description = personalize_answer(description, query_engine) caption = personalize_answer(caption, query_engine) alt_tag = personalize_answer(alt_tag, query_engine) counter = 1 while image_name in new_imgs: image_name = f'{image_name}_{counter}' counter += 1 headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.get(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', params={ # use a dict with your desired URL parameters here }, headers=headers) asset_names = [item['attributes']['name'] for item in r.json()['data']] asset_names = new_imgs + asset_names while image_name in asset_names: image_name = rename(image_name) # binary upload of image_path r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) # used to upload the image upload_url = r.json()['upload_url'] # container for the uploaded image to be used by the post request og_object_url = r.json()['object_url'] response = upload_image(image_path, upload_url) # binary upload of image_path r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) # used to upload the image upload_url = r.json()['upload_url'] # container for the uploaded image to be used by the post request object_url = r.json()['object_url'] image = process_image(image_path) # image = sharpen_image(image) with NamedTemporaryFile(delete=True, suffix='.jpg') as temp_image: # fp = TemporaryFile() cv2.imwrite(temp_image.name, image, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) # fp.seek(0) response = requests.put(upload_url, data=temp_image) # fp.close() # posts image with image name r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={ # use a dict with the POST body here 'data': { 'attributes': [ { 'name': image_name, 'description': description, 'attachments': [ { 'url': object_url, 'filename': f'{image_name}.jpg' }, { 'url': og_object_url, 'filename': f'{image_name}-original.jpg' } ] } ] }, # AI Processed section key 'section_key': 'czpq4nwz78c3cwnp6h9n44z' }, params={}, headers=headers) # id of newly created asset asset_id = r.json()['data'][0]['id'] # tags and topic payloads tags_payload = {'data': {'attributes': [{'name': tag} for tag in seo_tags]}} topic_payload = {'data': [ { 'attributes': { 'value': topic }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} alt_tag_payload = {'data': [ { 'attributes': { 'value': alt_tag }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} year_payload = {'data': [ { 'attributes': { 'value': 2024 }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} client_payload = {'data': [ { 'attributes': { 'value': client_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} caption_payload = {'data': [ { 'attributes': { 'value': caption }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} year_id = 'k8vr5chnkw3nrnrpkh4f9fqm' client_name_id = 'x56t6r9vh9xjmg5whtkmp' # Tone ID: px4jkk2nqrf9h6gp7wwxnhvz # Location ID: nm6xqgcf5j7sw8w994c6sc8h alt_tag_id = 'vk54n6pwnxm27gwrvrzfb' topic_id = '9mcg3rgm5mf72jqrtw2gqm7t' project_name_id = '5zpqwt2r348sjbnc6rpxc96' caption_id = 'cmcbhcc5nmm72v57vrxppw2x' # Original Project Images Section ID: c5vm8cnh9jvkjbh7r43qxkv # Edited Project Images Section ID: 5wpz2s9m3g7ctcjpm4vrt46 r_asset = requests.post(f'https://brandfolder.com/api/v4/assets/{asset_id}/tags', json=tags_payload, params={}, headers=headers) # alt_tags r_topic = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{topic_id}/custom_field_values', json= topic_payload , params={ }, headers=headers) r_alt_tag = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{alt_tag_id}/custom_field_values', json= alt_tag_payload , params={ }, headers=headers) r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json= year_payload , params={ }, headers=headers) r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json= client_payload , params={ }, headers=headers) r_caption = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{caption_id}/custom_field_values', json= caption_payload , params={ }, headers=headers) if project_bool == 'Yes': project_name = str(image_path).split('/')[-2] project_payload = {'data': [ { 'attributes': { 'value': project_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json= project_payload , params={ }, headers=headers) return image_name def create_asset_no_ai(client_name, collection_id, image_path, project_bool=False): ''' Creates an asset without going through the AI process ''' image_name = str(image_path).split('/')[-1].split('.')[0] headers = { 'Accept': 'application/json', 'Authorization': 'eyJhbGciOiJIUzI1NiJ9.eyJvcmdhbml6YXRpb25fa2V5IjoiZmY0cmt0NDNoMzRtMjVoa2duNWJteDlmIiwiaWF0IjoxNzA1OTQ4NjI3LCJ1c2VyX2tleSI6IjhyNnhxeDR6bTdyN2Z4NnJqY25jM2IzIiwic3VwZXJ1c2VyIjpmYWxzZX0.xUPT9j08a0THBwW_0GkQjllJxmjeDGtcPeoIOu_w9Zs' } # binary upload of image_path r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) # used to upload the image upload_url = r.json()['upload_url'] # container for the uploaded image to be used by the post request object_url = r.json()['object_url'] # uploads the image response = upload_image(image_path, upload_url) r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={ # use a dict with the POST body here 'data': { 'attributes': [ { 'name': image_name, 'attachments': [ { 'url': object_url, 'filename': f'{image_name}.jpg' } ] } ] }, # Original Project Assets 'section_key': 'c5vm8cnh9jvkjbh7r43qxkv' }, params={}, headers=headers) # id of newly created asset asset_id = r.json()['data'][0]['id'] year_payload = {'data': [ { 'attributes': { 'value': 2024 }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} client_payload = {'data': [ { 'attributes': { 'value': client_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} year_id = 'k8vr5chnkw3nrnrpkh4f9fqm' client_name_id = 'x56t6r9vh9xjmg5whtkmp' r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json= year_payload , params={ }, headers=headers) r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json= client_payload , params={ }, headers=headers) if project_bool.lower() == 'yes': project_name_id = '5zpqwt2r348sjbnc6rpxc96' project_name = str(image_path).split('/')[-2] project_payload = {'data': [ { 'attributes': { 'value': project_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json= project_payload , params={ }, headers=headers) return def create_collection(collection_name): ''' Creates collection with collection_name and tagline Input: collection name and tagline Output: request response ''' headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ # use a dict with the POST body here 'data': { 'attributes': { 'name': collection_name } } }, params={}, headers=headers) collection_id = r.json()['data']['id'] return collection_id def get_collection_id(collection_name): ''' Creates collection with collection_name and tagline Input: collection name and tagline Output: request response ''' headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ # use a dict with the POST body here 'data': { 'attributes': { 'name': collection_name } } }, params={}, headers=headers) collection_id = r.json()['data']['id'] return collection_id # get ids of existing collections def get_collection_dict(): headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=200', params={ # use a dict with your desired URL parameters here }, headers=headers) temp = r.json()['data'] collection_dict = dict(sorted({item['attributes']['name']:item['id'] for item in temp}.items())) return collection_dict def import_client_data(client_name, zipfile, topical_map, password, project_bool, ai_bool, query_engine, progress=gr.Progress(), create=False): ''' Takes the client neame and the client zipfile path to import all image files in the google drive into brandfolder under a collection with the client's name Input: client name (str), client_drive_path (str) Output: Completed Brandfolder ''' print(zipfile) if client_name == None: raise gr.Error("Please choose a client") if password != os.environ['BRANDFOLDER_PASSWORD']: raise gr.Error("Incorrect Password") if zipfile == None: raise gr.Error("Please upload a zipfile") if zipfile.split('.')[-1] != 'zip': raise gr.Error("Client Photos must be in a zipfile") if ai_bool.lower() == 'on': if topical_map == None: raise gr.Error("Please upload a topical map") if topical_map.split('.')[-1] != 'docx': raise gr.Error("Topical Map must be a docx file") topical_map = get_topical_map(topical_map) # get all collection ID names headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.get('https://brandfolder.com/api/v4/collections?per=200', params={ # use a dict with your desired URL parameters here }, headers=headers) collection_dict = {entry['attributes']['name']:entry['id'] for entry in r.json()['data']} if client_name not in list(collection_dict.keys()): if create==True: # creates the collection and gets the collection id collection_id = create_collection(client_name) else: AssertionError(f'Client Name: {client_name} does not exist in this Brandfolder') else: collection_id = collection_dict[client_name] # gets all image files from the google drive folder img_lists = [] img_dict = {} for zip in zipfile: zip_name = ZipFile(zip.name) unpack_list = get_imgs_from_folder([], zip) for img in unpack_list: img_dict.update({img:zip_name}) img_lists.append(unpack_list) img_list = sum(img_lists, []) new_imgs = [] error_imgs = [] error_imgs_text = 'No errors detected.' # iterates all images and puts them into brandfolder with AI elements for img in progress.tqdm(img_list, desc="Uploading..."): zip = img_dict[img] img = zip.extract(img) print(client_name) try: if ai_bool.lower() == 'on': time.sleep(15) new_img = create_asset(client_name, collection_id, img, topical_map, new_imgs, project_bool=project_bool, query_engine=query_engine[-1]) new_imgs.append(new_img) elif ai_bool.lower() == 'off': create_asset_no_ai(client_name, collection_id, img, project_bool=project_bool) except Exception as e: error_imgs.append(f'{str(img)}; error: {e}\n') print(f'An unexpected error occured processing {img}: {e}') gr.Info('Images have been uploaded!') if error_imgs: error_imgs_text = '\n'.join(error_imgs) return "Images Uploaded", error_imgs_text def get_collection_names(): collection_dict = get_collection_dict() return list(collection_dict.keys()) def upload_file(files, progress=gr.Progress()): file_paths = [] for file in progress.tqdm(files, desc="Uploading Files..."): gr.Textbox(file.name) file_paths.append(file.name) # Simulate processing return file_paths def chatbot_response(message, history, chat_engine): stream = chat_engine.stream_chat(message, chat_history=history) return stream def generate_content(csv_file, query_engine): print(csv_file) df = con_gen.get_content_csv(csv_file, query_engine[-1]) data_preview = df.head(10) file_name = './output.csv' df.to_csv('./output.csv') completion_status = "Done" return completion_status, data_preview, gr.DownloadButton(label='Download AI Content', value=file_name, visible=True) collection_names = get_collection_names() theme = gr.themes.Soft(primary_hue="blue", secondary_hue="pink").set( button_primary_background_fill = "#FF0000", button_primary_background_fill_dark = "#FF0000", ) llms = ['openai-gpt-4o', 'openai-gpt-4o-mini', 'openai-gpt-4-turbo', 'claude-sonnet-3.5', 'claude-opus-3', 'claude-sonnet-3', 'claude-haiku-3'] with gr.Blocks(theme=theme) as block: gr.Markdown(""" # Brandfolder Zipfile Dashboard This dashboard is for uploading photos from a zipfile to a brandfolder collection. """) chat_engine = gr.State([]) query_engine = gr.State([]) def generate_chat_engine(dna_documents, chat_engine, query_engine, api_type): chat, query, response = chat_gen.get_chat_engine(dna_documents, api_type) chat_engine.append(chat) query_engine.append(query) return chat_engine, query_engine, response, f"Chatbot is using {api_type}" with gr.Column(visible=True, elem_id='login') as login: options = get_collection_names() selection = gr.Dropdown(options, label='Choose Client', info='Choose which Client you want to generate content for') api_type = gr.Dropdown(llms, label='Choose LLM', info='Choose which LLM you want to use for content') password = gr.Textbox(label='Enter Password') dna_documents = gr.File(label='Upload DNA Documents', file_count='multiple') chat_gen_btn = gr.Button("Generate DNA LLM", variant='primary') chat_gen_progress = gr.Label(label='LLM Created') gr.Markdown(""" # Useful Links ### [Link to Brandfolder](https://brandfolder.com/organizations/contractorwebsiteservices) ### [Link to ChatGPT](https://chatgpt.com/) ### [Link to Claude](https://claude.ai/new) """ ) with gr.Tab("B"): with gr.Column(visible=True, elem_id='zipfile') as zipfile: with gr.Row(): with gr.Column(): gr.Markdown('## Upload zipfile containing client photos below') zipfile = gr.File(label='Client Photos (must be zipfile)', file_count='multiple', file_types=['.zip']) # upload_btn = gr.UploadButton("Upload Zipfile(s)", file_count='multiple') ai_bool = gr.Radio(choices=['On', 'Off'], label='AI Algorithm?', info = 'Would you like to use the AI Algorithm to upload these images?') project_bool = gr.Radio(choices=['Yes', 'No'], label='Project Names?', info='Would you like to include project names for these images?') gr.Markdown('## Upload topical map document for the client below') topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) algorithm = gr.Button('Run Algorithm') upload = gr.Label(label='Uploader') err_imgs = gr.Textbox(label="Images Not Processed") stop = gr.Button("Stop Run") with gr.Tab("C"): with gr.Column(visible=True, elem_id='trigger') as trigger: gr.Markdown(''' # Run AI in Brandfolder This button runs the AI algorithm using all the images stored in the Pre-Processed Images section in Brandfolder. The algorithm will move the new processed images to the AI Processed Images. ALL COPIES OF THE IMAGES IN THE PRE-PROCESSED SECTION WILL BE DELETED AFTER PUSHING THIS BUTTON ''') bf_options = get_collection_names() bf_selection = gr.Dropdown(bf_options, label='Choose Existing Collection') section = gr.Radio(choices=['Pre-Processed Images', 'Original Project Assets'], label='Which Sections is the data in?') bf_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) bf_button = gr.Button('Run AI algorithm for Pre-Processed Images') bf_upload = gr.Label(label='Uploader') stop_bf = gr.Button('Stop Run') with gr.Tab("D"): with gr.Column(visible=True): gr.Markdown(''' # DNA LLM This DNA chatbot uses the uploaded dna documents to answer questions ''') chat_llm = gr.Radio(llms, value=api_type.value, label='Choose LLM', info='Choose which LLM you want to use for content') chatbot_status = gr.Label(label='Chabot_Status') chatbot = gr.Chatbot() msg = gr.Textbox() clear = gr.ClearButton([msg, chatbot]) def user(user_message, history): return "", history + [[user_message, None]] def bot(history, chat_engine): print(history) user_message = history[-1][0] # chat_history = [(ChatMessage(role=message[1],content=message['content'])) for message in history] bot_message = chatbot_response(user_message, history[-1][1], chat_engine[-1]) history[-1][1] = "" for character in bot_message.response_gen: history[-1][1] += character time.sleep(0.1) yield history def generate_new_chat_engine(dna_documents, chat_engine, chat_llm): chat = chat_gen.get_new_chat_engine(dna_documents, chat_llm) chat_engine[0] = chat return chat_engine, f"Chatbot is using {chat_llm}" with gr.Tab("E"): with gr.Column(visible=True): gr.Markdown(''' # Website Content Spreadsheet Upload a spreadsheet with descriptions of website content ''') website_layout_file = gr.File(label='Website Layout File') con_gen_btn = gr.Button('Generate Content') data_preview = gr.DataFrame(label='Processed DataFrame Preview') status = gr.Textbox(label='Completion Status') download_btn = gr.DownloadButton(label='Download Content', visible=False) # with gr.Column(visible=False, elem_id='offline') as offline: # gr.Markdown(''' # # AI Processed Images Algorithm # Runs the AI algorithm over the images in the AI Processed Images Section. # Use this only when the Brandfolder API is not uploading images properly. # The Images will not be reduced but the tags, descriptions, etc. for the images will be populated. # ''') # offline_options = get_collection_names() # offline_selection = gr.Dropdown(offline_options, label='Choose Existing Collection') # offline_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) # offline_button = gr.Button('Run AI algorithm for AI Processed Images Section') # offline_upload = gr.Label(label='Uploader') # stop_offline = gr.Button("Stop Run") # selection.select(fn=get_collection_names, outputs=[selection]) # download_btn.click(download_file) msg.submit(user, [msg, chatbot], [msg, chatbot], queue=False).then( bot, [chatbot, chat_engine], chatbot) api_type.change(generate_chat_engine) clear.click(lambda: None, None, chatbot, queue=False) chat_llm.change(fn=generate_new_chat_engine, inputs=[dna_documents, chat_engine, chat_llm], outputs=[chat_engine, chatbot_status]) con_gen_btn.click(generate_content, inputs=[website_layout_file, query_engine], outputs=[status, data_preview, download_btn]) algo_event = algorithm.click(fn=import_client_data, inputs=[selection, zipfile, topical_map, password, project_bool, ai_bool, query_engine], outputs=[upload, err_imgs]) bf_event = bf_button.click(fn=bf_trigger.run_preprocess_ai, inputs=[bf_topical_map, bf_selection, section, query_engine], outputs=[bf_upload]) # offline_event = offline_button.click(fn=offline_update.run_preprocess_ai, inputs=[offline_topical_map, offline_selection], outputs=[offline_upload]) stop.click(fn=None, inputs=None, outputs=None, cancels=[algo_event]) stop_bf.click(fn=None, inputs=None, outputs=None, cancels=[bf_event]) # upload_btn.upload(upload_file, upload_btn, zipfile) chat_gen_btn.click(generate_chat_engine, inputs=[dna_documents, chat_engine, query_engine, api_type], outputs=[chat_engine, query_engine, chat_gen_progress, chatbot_status]) # stop_offline.click(fn=None, inputs=None, outputs=None, cancels=[offline_event]) block.queue(default_concurrency_limit=5) block.launch()