import openai from openai import OpenAI import requests import base64 import os import ast import cv2 from PIL import Image, ImageSequence from tempfile import NamedTemporaryFile import time from zipfile import ZipFile import gradio as gr from docx import Document from io import BytesIO import pyheif import numpy as np from tenacity import ( retry, stop_after_attempt, wait_random_exponential, ) import bf_trigger # import change_ui as change import offline_update # for exponential backoff # FUNCTIONS brandfolder_api = os.environ['BRANDFOLDER_API_KEY'] def get_asset_info(asset_id): ''' Takes information from asset_id Input: asset_id Output: collection_id, collection_name, section_id ''' # asset_id = data['data']['attributes']['key'] headers = { 'Content-Type': 'application/json', 'Authorization': brandfolder_api } r = requests.get(f'https://brandfolder.com/api/v4/assets/{asset_id}?include=section,collections,custom_fields,attachments', params={}, headers=headers) # gets section_id try: section_id = r.json()['data']['relationships']['section']['data']['id'] except: section_id = '' # gets collection_id # gets collection_name try: collection_id = r.json()['data']['relationships']['collections']['data'][0]['id'] collection_name = [item['attributes']['name'] for item in r.json()['included'] if item['type']=='collections'][0] except: collection_id = '' collection_name = '' # gets asset_name, asset_type, and asset_url try: asset_type = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['value']=='Photo'][0] except: asset_type = '' try: asset_name = r.json()['data']['attributes']['name'] except: asset_name = '' try: access_key = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'What is your Access Code?'][0] except: access_key = '' try: asset_url = [item['attributes']['url'] for item in r.json()['included'] if item['type'] == 'attachments'][0] except: asset_url = '' try: client_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'Client Name'][0] except: client_name = '' try: project_name = [item['attributes']['value'] for item in r.json()['included'] if item['type'] == 'custom_field_values' and item['attributes']['key'] == 'List Project Name Photos Belong To'][0] except: project_name = '' return_dict = { "section_id": section_id, "collection_id": collection_id, "collection_name": collection_name, "asset_type": asset_type, "asset_name": asset_name, "access_key": access_key, "image_url": asset_url, "client_name": client_name, "project_name": project_name } return return_dict def get_all_collection_dict(): headers = { 'Accept': 'application/json', 'Authorization': brandfolder_api } r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=300', params={ # use a dict with your desired URL parameters here }, headers=headers) temp = r.json()['data'] collection_dict = {item['attributes']['name']:item['id'] for item in temp} return collection_dict def get_collection_names(): collection_dict = get_collection_dict() return list(collection_dict.keys()) def rename(filename): client = OpenAI() completion = client.chat.completions.create( model="gpt-4o", messages=[ {"role": "system", "content": "You are a helpful assistant specializing in renaming files."}, {"role": "user", "content": f"Provide a similar name for this filename: {filename}. Only return the filename and use hyphens in the filename."} ] ) return completion.choices[0].message.content def get_topical_map(path): document = Document(path) extracted_text = [] for paragraph in document.paragraphs: # Get the left indentation of the current paragraph (if any) left_indent = paragraph.paragraph_format.left_indent if left_indent == None: continue else: indent_level = int(left_indent.pt / 20) # Convert Twips to points and then to a simple indentation level # You might want to adjust the logic below depending on how you want to represent indentation indent_symbol = " " * indent_level # This creates a number of spaces based on the indentation level; adjust as needed # Construct the paragraph text with indentation representation formatted_text = f"{indent_symbol}{paragraph.text}" extracted_text.append(formatted_text) return "\n".join(extracted_text) # gets a list of images from the google drive folder def get_imgs_from_folder(image_files, zipfile): # image file types IMAGE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd'] # file types FILE_TYPES = ['jpg','jpeg','gif','bmp','png', 'jpe', 'zip', 'mp4', 'heic', 'tiff', 'webp', 'heif', 'svg', 'raw', 'psd'] # gets all the image paths from the zipfile zip = ZipFile(zipfile) zip_list = zip.namelist() image_files.extend([f for f in zip_list if f.split('.')[-1].lower() in IMAGE_TYPES and f[0] != '_']) return image_files def get_seo_tags(image_path, topical_map, new_imgs, attempts=0, max_attempts=6): ''' Gets the seo tags and topic/sub-topic classification for an image using OpenAI GPT-4 Vision Preview Input: image path of desired file Output: dict of topic, sub-topic, and seo tags ''' if attempts > max_attempts: print("Maximum number of retries exceeded.") return {"error": "Max retries exceeded, operation failed."} print('in seo_tags') filenames = ', '.join(new_imgs) # Query for GPT-4 topic_map_query = f""" % You are an expert web designer that can only answer questions relevent to the following Topical Map. % Goal: Output the topic, description, caption, seo tags, alt_tags, and filename for this image using the Topical Map provided. % TOPCIAL MAP ```{topical_map}``` """ # IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. topic_list = topical_map.split('\n') topic_list = [topic.strip() for topic in topic_list] topic_list.insert(0, "irrelevant") def encode_image(image_path): # Check the file size (in bytes) file_size = os.path.getsize(image_path) # Define the maximum file size for compression (20 MB) max_size = 20 * 1024 * 1024 # 20 MB in bytes if image_path.lower().endswith('.heic'): # Read the HEIC file heif_file = pyheif.read(image_path) # Convert to a PIL image image = Image.frombytes( heif_file.mode, heif_file.size, heif_file.data, "raw", heif_file.mode, heif_file.stride, ) elif image_path.lower().endswith('.gif'): # Open GIF image with Image.open(image_path) as img: # Extract the first frame of the GIF image = img.convert('RGB') else: # Open other image types with PIL directly image = Image.open(image_path) # Convert image to RGB if it has an incompatible mode if image.mode not in ['RGB', 'L']: # L is for grayscale image = image.convert('RGB') # Use in-memory buffer for processing with BytesIO() as img_buffer: if file_size > max_size: # Adjust the quality to reduce the file size image.save(img_buffer, format='JPEG', quality=75) else: # Save the image without changing the quality if not needed image.save(img_buffer, format='JPEG', quality=85) # Seek to the beginning of the stream img_buffer.seek(0) # Read the JPEG image data and encode it in base64 return base64.b64encode(img_buffer.read()).decode('utf-8') print(image_path) base64_image = encode_image(image_path) # REMOVE WHEN SHARING FILE api_key = os.environ['OPENAI_API_KEY'] # Calling gpt-4 vision headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}" } # IF YOU CANNOT PROVIDE AN TOPIC FOR EVERY IMAGE AFTER 5 ATTEMPTS, REPLY WITH 'irrelevant'. payload = { "model": "gpt-4o", "response_format": {"type": "json_object"}, "messages": [ {'role': 'system', 'content': 'You are an expert web designer that can only answer questions relevent to the following topical map.' }, { "role": "user", "content": [ { "type": "text", "text": topic_map_query + """ % INSTRUCTIONS Step 1 - Generate keywords to describe this image Step 2 - Decide which topic in the Topicla Map this image fall under, using the keywords you generated and the image itself. You are only permitted to use the exact wording of the topic in the topical map. Step 2 - Provide a topic-relevant 5 sentence description for the image. Describe the image only using context relevant to the topics in the topical map. Adhere to the following guidelines when crafting your 5 sentence description: - Mention only the contents of the image. - Do not mention the quality of the image. - Ignore all personal information within the image. - Be as specific as possible when identifying tools/items in the image. Step 3 - Using the description in Step 1, create a 160 character caption. Make sure the caption is less than 160 characters. Step 4 - Using the description in Step 1, create 3 topic-relevant SEO tags for this image that will drive traffic to our website. The SEO tags must be two words or less. You must give 3 SEO tags. Step 5 - Using the description in Step 1, provide a topic-relevant SEO alt tag for the image that will enhance how the website is ranked on search engines. Step 6 - Using the description in Step 1, provide a new and unique filename for the image as well. Use hyphens for the filename. Do not include extension. Step 7 - YOU ARE ONLY PERMITTED TO OUTPUT THE TOPIC, DESCRIPTION, CAPTION, SEO, ALT_TAG, AND FILENAME IN THE FOLLOWING JSON FORMAT: % OUTPUT FORMAT: {"topic": topic, "description": description, "caption": caption, "seo": [seo], "alt_tag": [alt tag], "filename": filename } """ }, { "type": "image_url", "image_url": { "url": f"data:image/jpeg;base64, {base64_image}" } } ] } ], "max_tokens": 300 } try: response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) response_data = response.json() print(response_data) if response.status_code == 200 and 'choices' in response_data and len(response_data['choices']) > 0: keys = ['topic', 'description', 'caption', 'seo', 'alt_tag', 'filename'] json_dict = ast.literal_eval(response.json()['choices'][0]['message']['content']) if json_dict['topic'] not in topic_list: return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1) if set(json_dict.keys()) != set(keys): return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts+1) return json_dict else: print("API call failed or bad data, retrying...") return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1) except Exception as e: time.sleep(5*attempts) print("Exception during API call:", str(e)) return get_seo_tags(image_path, topical_map, new_imgs, attempts=attempts + 1) def read_image(image_path): if image_path.lower().endswith('.heic'): # Read and convert HEIC file heif_file = pyheif.read(image_path) image = Image.frombytes( heif_file.mode, heif_file.size, heif_file.data, "raw", heif_file.mode, heif_file.stride, ) # Convert PIL image to OpenCV format image = np.array(image) image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) elif image_path.lower().endswith('.gif'): # Open GIF and convert the first frame to RGB with Image.open(image_path) as img: for frame in ImageSequence.Iterator(img): frame = frame.convert('RGB') # Convert PIL image to OpenCV format image = np.array(frame) image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) break # Only process the first frame else: # Use OpenCV for other formats image = cv2.imread(image_path) return image def process_image(image_path): image = read_image(image_path) height, width, c = image.shape area = width * height if width > height: # Landscape image if area > 667000: image = cv2.resize(image, (1000, 667)) else: # Portrait image if area > 442236: image = cv2.resize(image, (548, 807)) return image def convert_heic_to_jpeg(heic_path): # Read the HEIC file heif_file = pyheif.read(heic_path) # Convert to a PIL image image = Image.frombytes( heif_file.mode, heif_file.size, heif_file.data, "raw", heif_file.mode, heif_file.stride, ) # Convert image to JPEG in memory jpeg_buffer = BytesIO() image.save(jpeg_buffer, format="JPEG") jpeg_buffer.seek(0) return jpeg_buffer def upload_image(image_path, upload_url): # Check if the image is a HEIC file print(image_path) if image_path.lower().endswith('.heic'): # Convert HEIC to JPEG data = convert_heic_to_jpeg(image_path) else: # Open other image types directly data = open(image_path, 'rb') # Upload the image response = requests.put(upload_url, data=data) # Ensure you close the file stream if opened directly if not image_path.lower().endswith('.heic'): data.close() return response # creates the asset in the client's brand folder def create_asset(client_name, collection_id, image_path, topical_map, new_imgs, tags=True, project_bool=False): ''' Creates asset from image path. Also creates seo tags, topic, and alt tag for image Input: name of client, path to image, create tags boolean Output: id of asset ''' # get seo, topic, and sub-topic from OpenAI API json_dict = get_seo_tags(image_path, topical_map, new_imgs) if not json_dict: json_dict = get_seo_tags(image_path, topical_map, new_imgs) topic = json_dict['topic'] description = json_dict['description'] caption = json_dict['caption'] seo_tags = json_dict['seo'] alt_tag = json_dict['alt_tag'] image_name = json_dict['filename'] counter = 1 while image_name in new_imgs: image_name = f'{image_name}_{counter}' counter += 1 headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.get(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', params={ # use a dict with your desired URL parameters here }, headers=headers) asset_names = [item['attributes']['name'] for item in r.json()['data']] asset_names = new_imgs + asset_names while image_name in asset_names: image_name = rename(image_name) # binary upload of image_path r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) # used to upload the image upload_url = r.json()['upload_url'] # container for the uploaded image to be used by the post request og_object_url = r.json()['object_url'] response = upload_image(image_path, upload_url) # binary upload of image_path r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) # used to upload the image upload_url = r.json()['upload_url'] # container for the uploaded image to be used by the post request object_url = r.json()['object_url'] image = process_image(image_path) # image = sharpen_image(image) with NamedTemporaryFile(delete=True, suffix='.jpg') as temp_image: # fp = TemporaryFile() cv2.imwrite(temp_image.name, image, [int(cv2.IMWRITE_JPEG_QUALITY), 70]) # fp.seek(0) response = requests.put(upload_url, data=temp_image) # fp.close() # posts image with image name r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={ # use a dict with the POST body here 'data': { 'attributes': [ { 'name': image_name, 'description': description, 'attachments': [ { 'url': object_url, 'filename': f'{image_name}.jpg' }, { 'url': og_object_url, 'filename': f'{image_name}-original.jpg' } ] } ] }, # AI Processed section key 'section_key': 'czpq4nwz78c3cwnp6h9n44z' }, params={}, headers=headers) # id of newly created asset asset_id = r.json()['data'][0]['id'] # tags and topic payloads tags_payload = {'data': {'attributes': [{'name': tag} for tag in seo_tags]}} topic_payload = {'data': [ { 'attributes': { 'value': topic }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} alt_tag_payload = {'data': [ { 'attributes': { 'value': alt_tag }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} year_payload = {'data': [ { 'attributes': { 'value': 2024 }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} client_payload = {'data': [ { 'attributes': { 'value': client_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} caption_payload = {'data': [ { 'attributes': { 'value': caption }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} year_id = 'k8vr5chnkw3nrnrpkh4f9fqm' client_name_id = 'x56t6r9vh9xjmg5whtkmp' # Tone ID: px4jkk2nqrf9h6gp7wwxnhvz # Location ID: nm6xqgcf5j7sw8w994c6sc8h alt_tag_id = 'vk54n6pwnxm27gwrvrzfb' topic_id = '9mcg3rgm5mf72jqrtw2gqm7t' project_name_id = '5zpqwt2r348sjbnc6rpxc96' caption_id = 'cmcbhcc5nmm72v57vrxppw2x' # Original Project Images Section ID: c5vm8cnh9jvkjbh7r43qxkv # Edited Project Images Section ID: 5wpz2s9m3g7ctcjpm4vrt46 r_asset = requests.post(f'https://brandfolder.com/api/v4/assets/{asset_id}/tags', json=tags_payload, params={}, headers=headers) # alt_tags r_topic = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{topic_id}/custom_field_values', json= topic_payload , params={ }, headers=headers) r_alt_tag = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{alt_tag_id}/custom_field_values', json= alt_tag_payload , params={ }, headers=headers) r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json= year_payload , params={ }, headers=headers) r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json= client_payload , params={ }, headers=headers) r_caption = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{caption_id}/custom_field_values', json= caption_payload , params={ }, headers=headers) if project_bool == 'Yes': project_name = str(image_path).split('/')[-2] project_payload = {'data': [ { 'attributes': { 'value': project_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json= project_payload , params={ }, headers=headers) return image_name def create_asset_no_ai(client_name, collection_id, image_path, project_bool=False): ''' Creates an asset without going through the AI process ''' image_name = str(image_path).split('/')[-1].split('.')[0] headers = { 'Accept': 'application/json', 'Authorization': 'eyJhbGciOiJIUzI1NiJ9.eyJvcmdhbml6YXRpb25fa2V5IjoiZmY0cmt0NDNoMzRtMjVoa2duNWJteDlmIiwiaWF0IjoxNzA1OTQ4NjI3LCJ1c2VyX2tleSI6IjhyNnhxeDR6bTdyN2Z4NnJqY25jM2IzIiwic3VwZXJ1c2VyIjpmYWxzZX0.xUPT9j08a0THBwW_0GkQjllJxmjeDGtcPeoIOu_w9Zs' } # binary upload of image_path r = requests.get('https://brandfolder.com/api/v4/upload_requests', params={}, headers=headers) # used to upload the image upload_url = r.json()['upload_url'] # container for the uploaded image to be used by the post request object_url = r.json()['object_url'] # uploads the image response = upload_image(image_path, upload_url) r = requests.post(f'https://brandfolder.com/api/v4/collections/{collection_id}/assets', json={ # use a dict with the POST body here 'data': { 'attributes': [ { 'name': image_name, 'attachments': [ { 'url': object_url, 'filename': f'{image_name}.jpg' } ] } ] }, # Original Project Assets 'section_key': 'c5vm8cnh9jvkjbh7r43qxkv' }, params={}, headers=headers) # id of newly created asset asset_id = r.json()['data'][0]['id'] year_payload = {'data': [ { 'attributes': { 'value': 2024 }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} client_payload = {'data': [ { 'attributes': { 'value': client_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} year_id = 'k8vr5chnkw3nrnrpkh4f9fqm' client_name_id = 'x56t6r9vh9xjmg5whtkmp' r_year = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{year_id}/custom_field_values', json= year_payload , params={ }, headers=headers) r_client = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{client_name_id}/custom_field_values', json= client_payload , params={ }, headers=headers) if project_bool.lower() == 'yes': project_name_id = '5zpqwt2r348sjbnc6rpxc96' project_name = str(image_path).split('/')[-2] project_payload = {'data': [ { 'attributes': { 'value': project_name }, 'relationships': { 'asset': { 'data': {'type': 'assets', 'id': asset_id} }} }]} r_project = requests.post(f'https://brandfolder.com/api/v4/custom_field_keys/{project_name_id}/custom_field_values', json= project_payload , params={ }, headers=headers) return def create_collection(collection_name): ''' Creates collection with collection_name and tagline Input: collection name and tagline Output: request response ''' headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ # use a dict with the POST body here 'data': { 'attributes': { 'name': collection_name } } }, params={}, headers=headers) collection_id = r.json()['data']['id'] return collection_id def get_collection_id(collection_name): ''' Creates collection with collection_name and tagline Input: collection name and tagline Output: request response ''' headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.post('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections', json={ # use a dict with the POST body here 'data': { 'attributes': { 'name': collection_name } } }, params={}, headers=headers) collection_id = r.json()['data']['id'] return collection_id # get ids of existing collections def get_collection_dict(): headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.get('https://brandfolder.com/api/v4/brandfolders/988cgqcg8xsrr5g9h7gtsqkg/collections?per=200', params={ # use a dict with your desired URL parameters here }, headers=headers) temp = r.json()['data'] collection_dict = dict(sorted({item['attributes']['name']:item['id'] for item in temp}.items())) return collection_dict def import_client_data(client_name, zipfile, topical_map, password, project_bool, ai_bool, progress=gr.Progress(), create=False): ''' Takes the client neame and the client zipfile path to import all image files in the google drive into brandfolder under a collection with the client's name Input: client name (str), client_drive_path (str) Output: Completed Brandfolder ''' print(zipfile) if client_name == None: raise gr.Error("Please choose a client") if password != os.environ['BRANDFOLDER_PASSWORD']: raise gr.Error("Incorrect Password") if zipfile == None: raise gr.Error("Please upload a zipfile") if zipfile.split('.')[-1] != 'zip': raise gr.Error("Client Photos must be in a zipfile") if ai_bool.lower() == 'on': if topical_map == None: raise gr.Error("Please upload a topical map") if topical_map.split('.')[-1] != 'docx': raise gr.Error("Topical Map must be a docx file") topical_map = get_topical_map(topical_map) # get all collection ID names headers = { 'Accept': 'application/json', 'Authorization': os.environ['BRANDFOLDER_API_KEY'] } r = requests.get('https://brandfolder.com/api/v4/collections?per=200', params={ # use a dict with your desired URL parameters here }, headers=headers) collection_dict = {entry['attributes']['name']:entry['id'] for entry in r.json()['data']} if client_name not in list(collection_dict.keys()): if create==True: # creates the collection and gets the collection id collection_id = create_collection(client_name) else: AssertionError(f'Client Name: {client_name} does not exist in this Brandfolder') else: collection_id = collection_dict[client_name] # gets all image files from the google drive folder img_lists = [] img_dict = {} for zip in zipfile: zip_name = ZipFile(zip.name) unpack_list = get_imgs_from_folder([], zip) for img in unpack_list: img_dict.update({img:zip_name}) img_lists.append(unpack_list) img_list = sum(img_lists, []) new_imgs = [] error_imgs = [] error_imgs_text = 'No errors detected.' # iterates all images and puts them into brandfolder with AI elements for img in progress.tqdm(img_list, desc="Uploading..."): zip = img_dict[img] img = zip.extract(img) print(client_name) try: if ai_bool.lower() == 'on': time.sleep(15) new_img = create_asset(client_name, collection_id, img, topical_map, new_imgs, project_bool=project_bool) new_imgs.append(new_img) elif ai_bool.lower() == 'off': create_asset_no_ai(client_name, collection_id, img, project_bool=project_bool) except Exception as e: error_imgs.append(f'{str(img)}; error: {e}\n') print(f'An unexpected error occured processing {img}: {e}') gr.Info('Images have been uploaded!') if error_imgs: error_imgs_text = '\n'.join(error_imgs) return "Images Uploaded", error_imgs_text def get_collection_names(): collection_dict = get_collection_dict() return list(collection_dict.keys()) def upload_file(files): file_paths = [file.name for file in files] return file_paths collection_names = get_collection_names() with gr.Blocks() as block: gr.Markdown(""" # Brandfolder Zipfile Dashboard This dashboard is for uploading photos from a zipfile to a brandfolder collection. """) with gr.Column(visible=True, elem_id='login') as login: password = gr.Textbox(label='Enter Password') # with gr.Row(): # login_to_zipfile_btn = gr.Button("Login") with gr.Column(visible=True, elem_id='zipfile') as zipfile: with gr.Row(): with gr.Column(): options = get_collection_names() selection = gr.Dropdown(options, label='Choose Existing Collection', info='If creating a new section, select Create a Collection') gr.Markdown('## Upload zipfile containing client photos below') zipfile = gr.File(label='Client Photos (must be zipfile)', file_count='multiple', file_types=['.zip'], interactive=False) upload_btn = gr.UploadButton("Upload Zipfile(s)", file_count='multiple') ai_bool = gr.Radio(choices=['On', 'Off'], label='AI Algorithm?', info = 'Would you like to use the AI Algorithm to upload these images?') project_bool = gr.Radio(choices=['Yes', 'No'], label='Project Names?', info='Would you like to include project names for these images?') gr.Markdown('## Upload topical map document for the client below') topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) algorithm = gr.Button('Run Algorithm') upload = gr.Label(label='Uploader') err_imgs = gr.Textbox(label="Images Not Processed") stop = gr.Button("Stop Run") # with gr.Row(): # zipfile_to_login_btn = gr.Button("Back to Login") # zipfile_to_trigger_btn = gr.Button("Brandfolder Trigger") # zipfile_to_offline_btn = gr.Button("Offline Image Update") with gr.Column(visible=True, elem_id='trigger') as trigger: gr.Markdown(''' # Run AI in Brandfolder This button runs the AI algorithm using all the images stored in the Pre-Processed Images section in Brandfolder. The algorithm will move the new processed images to the AI Processed Images. ALL COPIES OF THE IMAGES IN THE PRE-PROCESSED SECTION WILL BE DELETED AFTER PUSHING THIS BUTTON ''') bf_options = get_collection_names() bf_selection = gr.Dropdown(bf_options, label='Choose Existing Collection') section = gr.Radio(choices=['Pre-Processed Images', 'Original Project Assets'], label='Which Sections is the data in?') bf_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) bf_button = gr.Button('Run AI algorithm for Pre-Processed Images') bf_upload = gr.Label(label='Uploader') stop_bf = gr.Button('Stop Run') # with gr.Row(): # trigger_to_zipfile_btn = gr.Button("Zipfile Upload") # trigger_to_offline_btn = gr.Button("Offline Image Update") with gr.Column(visible=False, elem_id='offline') as offline: gr.Markdown(''' # AI Processed Images Algorithm Runs the AI algorithm over the images in the AI Processed Images Section. Use this only when the Brandfolder API is not uploading images properly. The Images will not be reduced but the tags, descriptions, etc. for the images will be populated. ''') offline_options = get_collection_names() offline_selection = gr.Dropdown(offline_options, label='Choose Existing Collection') offline_topical_map = gr.File(label='Topical Map (must be docx)', file_types=['.docx']) offline_button = gr.Button('Run AI algorithm for AI Processed Images Section') offline_upload = gr.Label(label='Uploader') stop_offline = gr.Button("Stop Run") # with gr.Row(): # offline_to_zipfile_btn = gr.Button("Zipfile Upload") # offline_to_trigger_btn = gr.Button("Brandfolder Trigger") # selection.select(fn=get_collection_names, outputs=[selection]) algo_event = algorithm.click(fn=import_client_data, inputs=[selection, zipfile, topical_map, password, project_bool, ai_bool], outputs=[upload, err_imgs]) bf_event = bf_button.click(fn=bf_trigger.run_preprocess_ai, inputs=[bf_topical_map, bf_selection, section], outputs=[bf_upload]) offline_event = offline_button.click(fn=offline_update.run_preprocess_ai, inputs=[offline_topical_map, offline_selection], outputs=[offline_upload]) stop.click(fn=None, inputs=None, outputs=None, cancels=[algo_event]) stop_bf.click(fn=None, inputs=None, outputs=None, cancels=[bf_event]) upload_btn.upload(upload_file, upload_btn, zipfile) # stop_offline.click(fn=None, inputs=None, outputs=None, cancels=[offline_event]) # login_to_zipfile_btn.click( # fn=None, # inputs=None, # outputs=[login, zipfile], # js=change.login_to_zipfile_js # ) # zipfile_to_login_btn.click( # fn=None, # inputs=None, # outputs=[login, zipfile], # js=change.zipfile_to_login_js # ) # zipfile_to_trigger_btn.click( # fn=None, # inputs=None, # outputs=[zipfile, trigger], # js=change.zipfile_to_trigger_js # ) # trigger_to_zipfile_btn.click( # fn=None, # inputs=None, # outputs=[zipfile, trigger], # js=change.trigger_to_zipfile_js # ) # zipfile_to_offline_btn.click( # fn=None, # inputs=None, # outputs=[zipfile, offline], # js=change.zipfile_to_offline_js # ) # trigger_to_offline_btn.click( # fn=None, # inputs=None, # outputs=[trigger, offline], # js=change.trigger_to_offline_js # ) # offline_to_zipfile_btn.click( # fn=None, # inputs=None, # outputs=[zipfile, offline], # js=change.offline_to_zipfile_js # ) # offline_to_trigger_btn.click( # fn=None, # inputs=None, # outputs=[trigger, offline], # js=change.offline_to_trigger_js # ) block.queue(default_concurrency_limit=5) block.launch()