Spaces:
Running
Running
import openai | |
import os, sys, json, inspect, glob, tiktoken, shutil, yaml | |
import openpyxl | |
from openpyxl import Workbook, load_workbook | |
import google.generativeai as palm | |
from langchain.chat_models import AzureChatOpenAI | |
from google.oauth2 import service_account | |
currentdir = os.path.dirname(os.path.abspath( | |
inspect.getfile(inspect.currentframe()))) | |
parentdir = os.path.dirname(currentdir) | |
sys.path.append(parentdir) | |
parentdir = os.path.dirname(parentdir) | |
sys.path.append(parentdir) | |
from general_utils import get_cfg_from_full_path, num_tokens_from_string | |
from embeddings_db import VoucherVisionEmbedding | |
from OCR_google_cloud_vision import detect_text, overlay_boxes_on_image | |
from LLM_chatGPT_3_5 import OCR_to_dict, OCR_to_dict_16k | |
from LLM_PaLM import OCR_to_dict_PaLM | |
# from LLM_Falcon import OCR_to_dict_Falcon | |
from prompts import PROMPT_UMICH_skeleton_all_asia, PROMPT_OCR_Organized, PROMPT_UMICH_skeleton_all_asia_GPT4, PROMPT_OCR_Organized_GPT4, PROMPT_JSON | |
from prompt_catalog import PromptCatalog | |
''' | |
* For the prefix_removal, the image names have 'MICH-V-' prior to the barcode, so that is used for matching | |
but removed for output. | |
* There is also code active to replace the LLM-predicted "Catalog Number" with the correct number since it is known. | |
The LLMs to usually assign the barcode to the correct field, but it's not needed since it is already known. | |
- Look for ####################### Catalog Number pre-defined | |
''' | |
''' | |
Prior to StructuredOutputParser: | |
response = openai.ChatCompletion.create( | |
model=MODEL, | |
temperature = 0, | |
messages=[ | |
{"role": "system", "content": "You are a helpful assistant acting as a transcription expert and your job is to transcribe herbarium specimen labels based on OCR data and reformat it to meet Darwin Core Archive Standards into a Python dictionary based on certain rules."}, | |
{"role": "user", "content": prompt}, | |
], | |
max_tokens=2048, | |
) | |
# print the model's response | |
return response.choices[0].message['content'] | |
''' | |
class VoucherVision(): | |
def __init__(self, cfg, logger, dir_home, path_custom_prompts, Project, Dirs): | |
self.cfg = cfg | |
self.logger = logger | |
self.dir_home = dir_home | |
self.path_custom_prompts = path_custom_prompts | |
self.Project = Project | |
self.Dirs = Dirs | |
self.headers = None | |
self.prompt_version = None | |
self.client = None | |
self.set_API_keys() | |
self.setup() | |
def setup(self): | |
self.logger.name = f'[Transcription]' | |
self.logger.info(f'Setting up OCR and LLM') | |
self.db_name = self.cfg['leafmachine']['project']['embeddings_database_name'] | |
self.path_domain_knowledge = self.cfg['leafmachine']['project']['path_to_domain_knowledge_xlsx'] | |
self.build_new_db = self.cfg['leafmachine']['project']['build_new_embeddings_database'] | |
self.continue_run_from_partial_xlsx = self.cfg['leafmachine']['project']['continue_run_from_partial_xlsx'] | |
self.prefix_removal = self.cfg['leafmachine']['project']['prefix_removal'] | |
self.suffix_removal = self.cfg['leafmachine']['project']['suffix_removal'] | |
self.catalog_numerical_only = self.cfg['leafmachine']['project']['catalog_numerical_only'] | |
self.prompt_version0 = self.cfg['leafmachine']['project']['prompt_version'] | |
self.use_domain_knowledge = self.cfg['leafmachine']['project']['use_domain_knowledge'] | |
self.catalog_name_options = ["Catalog Number", "catalog_number"] | |
self.utility_headers = ["tokens_in", "tokens_out", "path_to_crop","path_to_original","path_to_content","path_to_helper",] | |
self.map_prompt_versions() | |
self.map_dir_labels() | |
self.map_API_options() | |
self.init_embeddings() | |
self.init_transcription_xlsx() | |
'''Logging''' | |
self.logger.info(f'Transcribing dataset --- {self.dir_labels}') | |
self.logger.info(f'Saving transcription batch to --- {self.path_transcription}') | |
self.logger.info(f'Saving individual transcription files to --- {self.Dirs.transcription_ind}') | |
self.logger.info(f'Starting transcription...') | |
self.logger.info(f' LLM MODEL --> {self.version_name}') | |
self.logger.info(f' Using Azure API --> {self.is_azure}') | |
self.logger.info(f' Model name passed to API --> {self.model_name}') | |
self.logger.info(f' API access token is found in PRIVATE_DATA.yaml --> {self.has_key}') | |
# def map_API_options(self): | |
# self.chat_version = self.cfg['leafmachine']['LLM_version'] | |
# version_mapping = { | |
# 'GPT 4': ('OpenAI GPT 4', False, 'GPT_4', self.has_key_openai), | |
# 'GPT 3.5': ('OpenAI GPT 3.5', False, 'GPT_3_5', self.has_key_openai), | |
# 'Azure GPT 3.5': ('(Azure) OpenAI GPT 3.5', True, 'Azure_GPT_3_5', self.has_key_azure_openai), | |
# 'Azure GPT 4': ('(Azure) OpenAI GPT 4', True, 'Azure_GPT_4', self.has_key_azure_openai), | |
# 'PaLM 2': ('Google PaLM 2', None, None, self.has_key_palm2) | |
# } | |
# if self.chat_version not in version_mapping: | |
# supported_LLMs = ", ".join(version_mapping.keys()) | |
# raise Exception(f"Unsupported LLM: {self.chat_version}. Requires one of: {supported_LLMs}") | |
# self.version_name, self.is_azure, self.model_name, self.has_key = version_mapping[self.chat_version] | |
def map_API_options(self): | |
self.chat_version = self.cfg['leafmachine']['LLM_version'] # Replace with your env variable for LLM version if needed | |
# Assuming you have set your environment variables for each key like 'OPENAI_API_KEY', 'AZURE_API_KEY', 'PALM_API_KEY' | |
openai_api_key = os.getenv('OPENAI_API_KEY') | |
azure_api_key = os.getenv('AZURE_API_KEY') | |
palm_api_key = os.getenv('PALM_API_KEY') | |
version_mapping = { | |
'GPT 4': ('OpenAI GPT 4', False, 'GPT_4', bool(openai_api_key)), | |
'GPT 3.5': ('OpenAI GPT 3.5', False, 'GPT_3_5', bool(openai_api_key)), | |
'Azure GPT 3.5': ('(Azure) OpenAI GPT 3.5', True, 'Azure_GPT_3_5', bool(azure_api_key)), | |
'Azure GPT 4': ('(Azure) OpenAI GPT 4', True, 'Azure_GPT_4', bool(azure_api_key)), | |
'PaLM 2': ('Google PaLM 2', None, None, bool(palm_api_key)) | |
} | |
if self.chat_version not in version_mapping: | |
supported_llms = ", ".join(version_mapping.keys()) | |
raise Exception(f"Unsupported LLM: {self.chat_version}. Requires one of: {supported_llms}") | |
self.version_name, self.is_azure, self.model_name, self.has_key = version_mapping[self.chat_version] | |
def map_prompt_versions(self): | |
self.prompt_version_map = { | |
"Version 1": "prompt_v1_verbose", | |
"Version 1 No Domain Knowledge": "prompt_v1_verbose_noDomainKnowledge", | |
"Version 2": "prompt_v2_json_rules", | |
"Version 1 PaLM 2": 'prompt_v1_palm2', | |
"Version 1 PaLM 2 No Domain Knowledge": 'prompt_v1_palm2_noDomainKnowledge', | |
"Version 2 PaLM 2": 'prompt_v2_palm2', | |
} | |
self.prompt_version = self.prompt_version_map.get(self.prompt_version0, self.path_custom_prompts) | |
self.is_predefined_prompt = self.is_in_prompt_version_map(self.prompt_version) | |
def is_in_prompt_version_map(self, value): | |
return value in self.prompt_version_map.values() | |
def init_embeddings(self): | |
if self.use_domain_knowledge: | |
self.logger.info(f'*** USING DOMAIN KNOWLEDGE ***') | |
self.logger.info(f'*** Initializing vector embeddings database ***') | |
self.initialize_embeddings() | |
else: | |
self.Voucher_Vision_Embedding = None | |
def map_dir_labels(self): | |
if self.cfg['leafmachine']['use_RGB_label_images']: | |
self.dir_labels = os.path.join(self.Dirs.save_per_annotation_class,'label') | |
else: | |
self.dir_labels = self.Dirs.save_original | |
# Use glob to get all image paths in the directory | |
self.img_paths = glob.glob(os.path.join(self.dir_labels, "*")) | |
def load_rules_config(self): | |
with open(self.path_custom_prompts, 'r') as stream: | |
try: | |
return yaml.safe_load(stream) | |
except yaml.YAMLError as exc: | |
print(exc) | |
return None | |
def generate_xlsx_headers(self): | |
# Extract headers from the 'Dictionary' keys in the JSON template rules | |
xlsx_headers = list(self.rules_config_json['rules']["Dictionary"].keys()) | |
xlsx_headers = xlsx_headers + self.utility_headers | |
return xlsx_headers | |
def init_transcription_xlsx(self): | |
self.HEADERS_v1_n22 = ["Catalog Number","Genus","Species","subspecies","variety","forma","Country","State","County","Locality Name","Min Elevation","Max Elevation","Elevation Units","Verbatim Coordinates","Datum","Cultivated","Habitat","Collectors","Collector Number","Verbatim Date","Date","End Date"] | |
self.HEADERS_v2_n26 = ["catalog_number","genus","species","subspecies","variety","forma","country","state","county","locality_name","min_elevation","max_elevation","elevation_units","verbatim_coordinates","decimal_coordinates","datum","cultivated","habitat","plant_description","collectors","collector_number","determined_by","multiple_names","verbatim_date","date","end_date"] | |
self.HEADERS_v1_n22 = self.HEADERS_v1_n22 + self.utility_headers | |
self.HEADERS_v2_n26 = self.HEADERS_v2_n26 + self.utility_headers | |
# Initialize output file | |
self.path_transcription = os.path.join(self.Dirs.transcription,"transcribed.xlsx") | |
if self.prompt_version in ['prompt_v2_json_rules','prompt_v2_palm2']: | |
self.headers = self.HEADERS_v2_n26 | |
self.headers_used = 'HEADERS_v2_n26' | |
elif self.prompt_version in ['prompt_v1_verbose', 'prompt_v1_verbose_noDomainKnowledge','prompt_v1_palm2', 'prompt_v1_palm2_noDomainKnowledge']: | |
self.headers = self.HEADERS_v1_n22 | |
self.headers_used = 'HEADERS_v1_n22' | |
else: | |
if not self.is_predefined_prompt: | |
# Load the rules configuration | |
self.rules_config_json = self.load_rules_config() | |
# Generate the headers from the configuration | |
self.headers = self.generate_xlsx_headers() | |
# Set the headers used to the dynamically generated headers | |
self.headers_used = 'CUSTOM' | |
else: | |
# If it's a predefined prompt, raise an exception as we don't have further instructions | |
raise ValueError("Predefined prompt is not handled in this context.") | |
self.create_or_load_excel_with_headers(os.path.join(self.Dirs.transcription,"transcribed.xlsx"), self.headers) | |
def pick_model(self, vendor, nt): | |
if vendor == 'GPT_3_5': | |
if nt > 6000: | |
return "gpt-3.5-turbo-16k-0613", True | |
else: | |
return "gpt-3.5-turbo", False | |
if vendor == 'GPT_4': | |
return "gpt-4", False | |
if vendor == 'Azure_GPT_3_5': | |
return "gpt-35-turbo", False | |
if vendor == 'Azure_GPT_4': | |
return "gpt-4", False | |
def create_or_load_excel_with_headers(self, file_path, headers, show_head=False): | |
output_dir_names = ['Archival_Components', 'Config_File', 'Cropped_Images', 'Logs', 'Original_Images', 'Transcription'] | |
self.completed_specimens = [] | |
# Check if the file exists and it's not None | |
if self.continue_run_from_partial_xlsx is not None and os.path.isfile(self.continue_run_from_partial_xlsx): | |
workbook = load_workbook(filename=self.continue_run_from_partial_xlsx) | |
sheet = workbook.active | |
show_head=True | |
# Identify the 'path_to_crop' column | |
try: | |
path_to_crop_col = headers.index('path_to_crop') + 1 | |
path_to_original_col = headers.index('path_to_original') + 1 | |
path_to_content_col = headers.index('path_to_content') + 1 | |
path_to_helper_col = headers.index('path_to_helper') + 1 | |
# self.completed_specimens = list(sheet.iter_cols(min_col=path_to_crop_col, max_col=path_to_crop_col, values_only=True, min_row=2)) | |
except ValueError: | |
print("'path_to_crop' not found in the header row.") | |
path_to_crop = list(sheet.iter_cols(min_col=path_to_crop_col, max_col=path_to_crop_col, values_only=True, min_row=2)) | |
path_to_original = list(sheet.iter_cols(min_col=path_to_original_col, max_col=path_to_original_col, values_only=True, min_row=2)) | |
path_to_content = list(sheet.iter_cols(min_col=path_to_content_col, max_col=path_to_content_col, values_only=True, min_row=2)) | |
path_to_helper = list(sheet.iter_cols(min_col=path_to_helper_col, max_col=path_to_helper_col, values_only=True, min_row=2)) | |
others = [path_to_crop_col, path_to_original_col, path_to_content_col, path_to_helper_col] | |
jsons = [path_to_content_col, path_to_helper_col] | |
for cell in path_to_crop[0]: | |
old_path = cell | |
new_path = file_path | |
for dir_name in output_dir_names: | |
if dir_name in old_path: | |
old_path_parts = old_path.split(dir_name) | |
new_path_parts = new_path.split('Transcription') | |
updated_path = new_path_parts[0] + dir_name + old_path_parts[1] | |
self.completed_specimens.append(os.path.basename(updated_path)) | |
print(f"{len(self.completed_specimens)} images are already completed") | |
### Copy the JSON files over | |
for colu in jsons: | |
cell = next(sheet.iter_rows(min_row=2, min_col=colu, max_col=colu))[0] | |
old_path = cell.value | |
new_path = file_path | |
old_path_parts = old_path.split('Transcription') | |
new_path_parts = new_path.split('Transcription') | |
updated_path = new_path_parts[0] + 'Transcription' + old_path_parts[1] | |
# Copy files | |
old_dir = os.path.dirname(old_path) | |
new_dir = os.path.dirname(updated_path) | |
# Check if old_dir exists and it's a directory | |
if os.path.exists(old_dir) and os.path.isdir(old_dir): | |
# Check if new_dir exists. If not, create it. | |
if not os.path.exists(new_dir): | |
os.makedirs(new_dir) | |
# Iterate through all files in old_dir and copy each to new_dir | |
for filename in os.listdir(old_dir): | |
shutil.copy2(os.path.join(old_dir, filename), new_dir) # copy2 preserves metadata | |
### Update the file names | |
for colu in others: | |
for row in sheet.iter_rows(min_row=2, min_col=colu, max_col=colu): | |
for cell in row: | |
old_path = cell.value | |
new_path = file_path | |
for dir_name in output_dir_names: | |
if dir_name in old_path: | |
old_path_parts = old_path.split(dir_name) | |
new_path_parts = new_path.split('Transcription') | |
updated_path = new_path_parts[0] + dir_name + old_path_parts[1] | |
cell.value = updated_path | |
show_head=True | |
else: | |
# Create a new workbook and select the active worksheet | |
workbook = Workbook() | |
sheet = workbook.active | |
# Write headers in the first row | |
for i, header in enumerate(headers, start=1): | |
sheet.cell(row=1, column=i, value=header) | |
self.completed_specimens = [] | |
# Save the workbook | |
workbook.save(file_path) | |
if show_head: | |
print("continue_run_from_partial_xlsx:") | |
for i, row in enumerate(sheet.iter_rows(values_only=True)): | |
print(row) | |
if i == 3: # print the first 5 rows (0-indexed) | |
print("\n") | |
break | |
def add_data_to_excel_from_response(self, path_transcription, response, filename_without_extension, path_to_crop, path_to_content, path_to_helper, nt_in, nt_out): | |
wb = openpyxl.load_workbook(path_transcription) | |
sheet = wb.active | |
# find the next empty row | |
next_row = sheet.max_row + 1 | |
if isinstance(response, str): | |
try: | |
response = json.loads(response) | |
except json.JSONDecodeError: | |
print(f"Failed to parse response: {response}") | |
return | |
# iterate over headers in the first row | |
for i, header in enumerate(sheet[1], start=1): | |
# check if header value is in response keys | |
if (header.value in response) and (header.value not in self.catalog_name_options): ####################### Catalog Number pre-defined | |
# check if the response value is a dictionary | |
if isinstance(response[header.value], dict): | |
# if it is a dictionary, extract the 'value' field | |
cell_value = response[header.value].get('value', '') | |
else: | |
# if it's not a dictionary, use it directly | |
cell_value = response[header.value] | |
try: | |
# write the value to the cell | |
sheet.cell(row=next_row, column=i, value=cell_value) | |
except: | |
sheet.cell(row=next_row, column=i, value=cell_value[0]) | |
elif header.value in self.catalog_name_options: | |
# if self.prefix_removal: | |
# filename_without_extension = filename_without_extension.replace(self.prefix_removal, "") | |
# if self.suffix_removal: | |
# filename_without_extension = filename_without_extension.replace(self.suffix_removal, "") | |
# if self.catalog_numerical_only: | |
# filename_without_extension = self.remove_non_numbers(filename_without_extension) | |
sheet.cell(row=next_row, column=i, value=filename_without_extension) | |
elif header.value == "path_to_crop": | |
sheet.cell(row=next_row, column=i, value=path_to_crop) | |
elif header.value == "path_to_original": | |
if self.cfg['leafmachine']['use_RGB_label_images']: | |
fname = os.path.basename(path_to_crop) | |
base = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(path_to_crop)))) | |
path_to_original = os.path.join(base, 'Original_Images', fname) | |
sheet.cell(row=next_row, column=i, value=path_to_original) | |
else: | |
fname = os.path.basename(path_to_crop) | |
base = os.path.dirname(os.path.dirname(path_to_crop)) | |
path_to_original = os.path.join(base, 'Original_Images', fname) | |
sheet.cell(row=next_row, column=i, value=path_to_original) | |
elif header.value == "path_to_content": | |
sheet.cell(row=next_row, column=i, value=path_to_content) | |
elif header.value == "path_to_helper": | |
sheet.cell(row=next_row, column=i, value=path_to_helper) | |
elif header.value == "tokens_in": | |
sheet.cell(row=next_row, column=i, value=nt_in) | |
elif header.value == "tokens_out": | |
sheet.cell(row=next_row, column=i, value=nt_out) | |
# save the workbook | |
wb.save(path_transcription) | |
def configure_azure_llm(self): | |
# Access the secrets from the environment | |
azure_api_version = os.getenv('AZURE_API_VERSION') | |
azure_api_key = os.getenv('AZURE_API_KEY') | |
azure_api_base = os.getenv('AZURE_API_BASE') | |
azure_organization = os.getenv('AZURE_ORGANIZATION') | |
azure_api_type = os.getenv('AZURE_API_TYPE') | |
azure_deployment_name = os.getenv('AZURE_DEPLOYMENT_NAME') | |
# Check if all required Azure configurations are present | |
if azure_api_version and azure_api_key and azure_api_base and azure_organization and azure_api_type and azure_deployment_name: | |
self.llm = AzureChatOpenAI( | |
deployment_name=azure_deployment_name, | |
openai_api_version=azure_api_version, | |
openai_api_key=azure_api_key, | |
openai_api_base=azure_api_base, | |
openai_organization=azure_organization, | |
openai_api_type=azure_api_type | |
) | |
else: | |
raise ValueError("Missing Azure configuration in environment variables.") | |
def set_API_keys(self): | |
# Access secrets directly from the environment | |
openai_api_key = os.getenv('OPENAI_API_KEY') | |
google_application_credentials = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') | |
palm_api_key = os.getenv('PALM_API_KEY') | |
self.has_key_openai = openai_api_key is not None | |
self.has_key_google_OCR = google_application_credentials is not None | |
self.has_key_palm2 = palm_api_key is not None | |
if self.has_key_google_OCR: | |
# Get the credentials JSON from the environment variable | |
google_credentials_json = os.getenv('GOOGLE_APPLICATION_CREDENTIALS') | |
# Convert the JSON string into a Python dictionary | |
google_credentials_dict = json.loads(google_credentials_json) | |
# Create a credentials object | |
google_credentials = service_account.Credentials.from_service_account_info(google_credentials_dict) | |
# Now, use this `google_credentials` object to authenticate your Google Cloud services | |
# For example, if you are using the Google Vision API, it would look like this: | |
from google.cloud import vision | |
self.client = vision.ImageAnnotatorClient(credentials=google_credentials) | |
if os.getenv('AZURE_API_KEY') is not None: | |
self.configure_azure_llm() | |
if self.has_key_palm2: | |
palm.configure(api_key=palm_api_key) | |
if self.has_key_openai: | |
openai.api_key = openai_api_key | |
os.environ["OPENAI_API_KEY"] = openai_api_key | |
def initialize_embeddings(self): | |
'''Loading embedding search __init__(self, db_name, path_domain_knowledge, logger, build_new_db=False, model_name="hkunlp/instructor-xl", device="cuda")''' | |
self.Voucher_Vision_Embedding = VoucherVisionEmbedding(self.db_name, self.path_domain_knowledge, logger=self.logger, build_new_db=self.build_new_db) | |
def clean_catalog_number(self, data, filename_without_extension): | |
#Cleans up the catalog number in data if it's a dict | |
def modify_catalog_key(catalog_key, filename_without_extension, data): | |
# Helper function to apply modifications on catalog number | |
if catalog_key not in data: | |
new_data = {catalog_key: None} | |
data = {**new_data, **data} | |
if self.prefix_removal: | |
filename_without_extension = filename_without_extension.replace(self.prefix_removal, "") | |
if self.suffix_removal: | |
filename_without_extension = filename_without_extension.replace(self.suffix_removal, "") | |
if self.catalog_numerical_only: | |
filename_without_extension = self.remove_non_numbers(data[catalog_key]) | |
data[catalog_key] = filename_without_extension | |
return data | |
if isinstance(data, dict): | |
if self.headers_used == 'HEADERS_v1_n22': | |
return modify_catalog_key("Catalog Number", filename_without_extension, data) | |
elif self.headers_used in ['HEADERS_v2_n26', 'CUSTOM']: | |
return modify_catalog_key("catalog_number", filename_without_extension, data) | |
else: | |
raise ValueError("Invalid headers used.") | |
else: | |
raise TypeError("Data is not of type dict.") | |
def write_json_to_file(self, filepath, data): | |
'''Writes dictionary data to a JSON file.''' | |
with open(filepath, 'w') as txt_file: | |
if isinstance(data, dict): | |
data = json.dumps(data, indent=4, sort_keys=False) | |
txt_file.write(data) | |
def create_null_json(self): | |
return {} | |
def remove_non_numbers(self, s): | |
return ''.join([char for char in s if char.isdigit()]) | |
def create_null_row(self, filename_without_extension, path_to_crop, path_to_content, path_to_helper): | |
json_dict = {header: '' for header in self.headers} | |
for header, value in json_dict.items(): | |
if header in self.catalog_name_options: | |
if self.prefix_removal: | |
json_dict[header] = filename_without_extension.replace(self.prefix_removal, "") | |
if self.suffix_removal: | |
json_dict[header] = filename_without_extension.replace(self.suffix_removal, "") | |
if self.catalog_numerical_only: | |
json_dict[header] = self.remove_non_numbers(json_dict[header]) | |
elif header == "path_to_crop": | |
json_dict[header] = path_to_crop | |
elif header == "path_to_original": | |
fname = os.path.basename(path_to_crop) | |
base = os.path.dirname(os.path.dirname(os.path.dirname(os.path.dirname(path_to_crop)))) | |
path_to_original = os.path.join(base, 'Original_Images', fname) | |
json_dict[header] = path_to_original | |
elif header == "path_to_content": | |
json_dict[header] = path_to_content | |
elif header == "path_to_helper": | |
json_dict[header] = path_to_helper | |
return json_dict | |
def setup_GPT(self, prompt_version, gpt): | |
Catalog = PromptCatalog() | |
self.logger.info(f'Length of OCR raw -- {len(self.OCR)}') | |
# if prompt_version == 'prompt_v1_verbose': | |
if self.is_predefined_prompt: | |
if self.use_domain_knowledge: | |
# Find a similar example from the domain knowledge | |
domain_knowledge_example = self.Voucher_Vision_Embedding.query_db(self.OCR, 1) | |
similarity= self.Voucher_Vision_Embedding.get_similarity() | |
if prompt_version == 'prompt_v1_verbose': | |
prompt, n_fields, xlsx_headers = Catalog.prompt_v1_verbose(OCR=self.OCR,domain_knowledge_example=domain_knowledge_example,similarity=similarity) | |
else: | |
if prompt_version == 'prompt_v1_verbose_noDomainKnowledge': | |
prompt, n_fields, xlsx_headers = Catalog.prompt_v1_verbose_noDomainKnowledge(OCR=self.OCR) | |
elif prompt_version == 'prompt_v2_json_rules': | |
prompt, n_fields, xlsx_headers = Catalog.prompt_v2_json_rules(OCR=self.OCR) | |
else: | |
prompt, n_fields, xlsx_headers = Catalog.prompt_v2_custom(self.path_custom_prompts, OCR=self.OCR) | |
nt = num_tokens_from_string(prompt, "cl100k_base") | |
self.logger.info(f'Prompt token length --- {nt}') | |
MODEL, use_long_form = self.pick_model(gpt, nt) | |
self.logger.info(f'Waiting for {gpt} API call --- Using {MODEL}') | |
return MODEL, prompt, use_long_form, n_fields, xlsx_headers, nt | |
# def setup_GPT(self, opt, gpt): | |
# if opt == 'dict': | |
# # Find a similar example from the domain knowledge | |
# domain_knowledge_example = self.Voucher_Vision_Embedding.query_db(self.OCR, 1) | |
# similarity= self.Voucher_Vision_Embedding.get_similarity() | |
# self.logger.info(f'Length of OCR raw -- {len(self.OCR)}') | |
# # prompt = PROMPT_UMICH_skeleton_all_asia_GPT4(self.OCR, domain_knowledge_example, similarity) | |
# prompt, n_fields, xlsx_headers = | |
# nt = num_tokens_from_string(prompt, "cl100k_base") | |
# self.logger.info(f'Prompt token length --- {nt}') | |
# MODEL, use_long_form = self.pick_model(gpt, nt) | |
# ### Direct GPT ### | |
# self.logger.info(f'Waiting for {MODEL} API call --- Using chatGPT --- Content') | |
# return MODEL, prompt, use_long_form | |
# elif opt == 'helper': | |
# prompt = PROMPT_OCR_Organized_GPT4(self.OCR) | |
# nt = num_tokens_from_string(prompt, "cl100k_base") | |
# MODEL, use_long_form = self.pick_model(gpt, nt) | |
# self.logger.info(f'Length of OCR raw -- {len(self.OCR)}') | |
# self.logger.info(f'Prompt token length --- {nt}') | |
# self.logger.info(f'Waiting for {MODEL} API call --- Using chatGPT --- Helper') | |
# return MODEL, prompt, use_long_form | |
def use_chatGPT(self, is_azure, progress_report, gpt): | |
total_tokens_in = 0 | |
total_tokens_out = 0 | |
final_JSON_response = None | |
if progress_report is not None: | |
progress_report.set_n_batches(len(self.img_paths)) | |
for i, path_to_crop in enumerate(self.img_paths): | |
if progress_report is not None: | |
progress_report.update_batch(f"Working on image {i+1} of {len(self.img_paths)}") | |
if os.path.basename(path_to_crop) in self.completed_specimens: | |
self.logger.info(f'[Skipping] specimen {os.path.basename(path_to_crop)} already processed') | |
else: | |
filename_without_extension, txt_file_path, txt_file_path_OCR, txt_file_path_OCR_bounds, jpg_file_path_OCR_helper = self.generate_paths(path_to_crop, i) | |
# Use Google Vision API to get OCR | |
# self.OCR = detect_text(path_to_crop) | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Starting OCR') | |
self.OCR, self.bounds, self.text_to_box_mapping = detect_text(path_to_crop, self.client) | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Finished OCR') | |
if len(self.OCR) > 0: | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Creating OCR Overlay Image') | |
self.overlay_image = overlay_boxes_on_image(path_to_crop, self.bounds, self.cfg['leafmachine']['do_create_OCR_helper_image']) | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Saved OCR Overlay Image') | |
self.write_json_to_file(txt_file_path_OCR, {"OCR":self.OCR}) | |
self.write_json_to_file(txt_file_path_OCR_bounds, {"OCR_Bounds":self.bounds}) | |
self.overlay_image.save(jpg_file_path_OCR_helper) | |
# Setup Dict | |
MODEL, prompt, use_long_form, n_fields, xlsx_headers, nt_in = self.setup_GPT(self.prompt_version, gpt) | |
if is_azure: | |
self.llm.deployment_name = MODEL | |
else: | |
self.llm = None | |
# Send OCR to chatGPT and return formatted dictonary | |
if use_long_form: | |
response_candidate = OCR_to_dict_16k(is_azure, self.logger, MODEL, prompt, self.llm, self.prompt_version) | |
nt_out = num_tokens_from_string(response_candidate, "cl100k_base") | |
else: | |
response_candidate = OCR_to_dict(is_azure, self.logger, MODEL, prompt, self.llm, self.prompt_version) | |
nt_out = num_tokens_from_string(response_candidate, "cl100k_base") | |
else: | |
response_candidate = None | |
nt_out = 0 | |
total_tokens_in += nt_in | |
total_tokens_out += nt_out | |
final_JSON_response0 = self.save_json_and_xlsx(response_candidate, filename_without_extension, path_to_crop, txt_file_path, jpg_file_path_OCR_helper, nt_in, nt_out) | |
if response_candidate is not None: | |
final_JSON_response = final_JSON_response0 | |
self.logger.info(f'Formatted JSON\n{final_JSON_response}') | |
self.logger.info(f'Finished {MODEL} API calls\n') | |
if progress_report is not None: | |
progress_report.reset_batch(f"Batch Complete") | |
try: | |
final_JSON_response = json.loads(final_JSON_response.strip('```').replace('json\n', '', 1).replace('json', '', 1)) | |
except: | |
pass | |
return final_JSON_response, total_tokens_in, total_tokens_out | |
def use_PaLM(self, progress_report): | |
total_tokens_in = 0 | |
total_tokens_out = 0 | |
final_JSON_response = None | |
if progress_report is not None: | |
progress_report.set_n_batches(len(self.img_paths)) | |
for i, path_to_crop in enumerate(self.img_paths): | |
if progress_report is not None: | |
progress_report.update_batch(f"Working on image {i+1} of {len(self.img_paths)}") | |
if os.path.basename(path_to_crop) in self.completed_specimens: | |
self.logger.info(f'[Skipping] specimen {os.path.basename(path_to_crop)} already processed') | |
else: | |
filename_without_extension, txt_file_path, txt_file_path_OCR, txt_file_path_OCR_bounds, jpg_file_path_OCR_helper = self.generate_paths(path_to_crop, i) | |
# Use Google Vision API to get OCR | |
self.OCR, self.bounds, self.text_to_box_mapping = detect_text(path_to_crop, self.client) | |
if len(self.OCR) > 0: | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Starting OCR') | |
self.OCR = self.OCR.replace("\'", "Minutes").replace('\"', "Seconds") | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Finished OCR') | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Creating OCR Overlay Image') | |
self.overlay_image = overlay_boxes_on_image(path_to_crop, self.bounds, self.cfg['leafmachine']['do_create_OCR_helper_image']) | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- Saved OCR Overlay Image') | |
self.write_json_to_file(txt_file_path_OCR, {"OCR":self.OCR}) | |
self.write_json_to_file(txt_file_path_OCR_bounds, {"OCR_Bounds":self.bounds}) | |
self.overlay_image.save(jpg_file_path_OCR_helper) | |
# Send OCR to chatGPT and return formatted dictonary | |
response_candidate, nt_in = OCR_to_dict_PaLM(self.logger, self.OCR, self.prompt_version, self.Voucher_Vision_Embedding) | |
nt_out = num_tokens_from_string(response_candidate, "cl100k_base") | |
else: | |
response_candidate = None | |
nt_out = 0 | |
total_tokens_in += nt_in | |
total_tokens_out += nt_out | |
final_JSON_response0 = self.save_json_and_xlsx(response_candidate, filename_without_extension, path_to_crop, txt_file_path, jpg_file_path_OCR_helper, nt_in, nt_out) | |
if response_candidate is not None: | |
final_JSON_response = final_JSON_response0 | |
self.logger.info(f'Formatted JSON\n{final_JSON_response}') | |
self.logger.info(f'Finished PaLM 2 API calls\n') | |
if progress_report is not None: | |
progress_report.reset_batch(f"Batch Complete") | |
return final_JSON_response, total_tokens_in, total_tokens_out | |
''' | |
def use_falcon(self, progress_report): | |
for i, path_to_crop in enumerate(self.img_paths): | |
progress_report.update_batch(f"Working on image {i+1} of {len(self.img_paths)}") | |
if os.path.basename(path_to_crop) in self.completed_specimens: | |
self.logger.info(f'[Skipping] specimen {os.path.basename(path_to_crop)} already processed') | |
else: | |
filename_without_extension = os.path.splitext(os.path.basename(path_to_crop))[0] | |
txt_file_path = os.path.join(self.Dirs.transcription_ind, filename_without_extension + '.json') | |
txt_file_path_helper = os.path.join(self.Dirs.transcription_ind_helper, filename_without_extension + '.json') | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- {filename_without_extension}') | |
# Use Google Vision API to get OCR | |
self.OCR, self.bounds, self.text_to_box_mapping = detect_text(path_to_crop) | |
if len(self.OCR) > 0: | |
self.OCR = self.OCR.replace("\'", "Minutes").replace('\"', "Seconds") | |
# Send OCR to Falcon and return formatted dictionary | |
response = OCR_to_dict_Falcon(self.logger, self.OCR, self.Voucher_Vision_Embedding) | |
# response_helper = OCR_to_helper_Falcon(self.logger, OCR) # Assuming you have a similar helper function for Falcon | |
response_helper = None | |
self.logger.info(f'Finished Falcon API calls\n') | |
else: | |
response = None | |
if (response is not None) and (response_helper is not None): | |
# Save transcriptions to json files | |
self.write_json_to_file(txt_file_path, response) | |
# self.write_json_to_file(txt_file_path_helper, response_helper) | |
# add to the xlsx file | |
self.add_data_to_excel_from_response(self.path_transcription, response, filename_without_extension, path_to_crop, txt_file_path, txt_file_path_helper) | |
progress_report.reset_batch() | |
''' | |
def generate_paths(self, path_to_crop, i): | |
filename_without_extension = os.path.splitext(os.path.basename(path_to_crop))[0] | |
txt_file_path = os.path.join(self.Dirs.transcription_ind, filename_without_extension + '.json') | |
txt_file_path_OCR = os.path.join(self.Dirs.transcription_ind_OCR, filename_without_extension + '.json') | |
txt_file_path_OCR_bounds = os.path.join(self.Dirs.transcription_ind_OCR_bounds, filename_without_extension + '.json') | |
jpg_file_path_OCR_helper = os.path.join(self.Dirs.transcription_ind_OCR_helper, filename_without_extension + '.jpg') | |
self.logger.info(f'Working on {i+1}/{len(self.img_paths)} --- {filename_without_extension}') | |
return filename_without_extension, txt_file_path, txt_file_path_OCR, txt_file_path_OCR_bounds, jpg_file_path_OCR_helper | |
def save_json_and_xlsx(self, response, filename_without_extension, path_to_crop, txt_file_path, jpg_file_path_OCR_helper, nt_in, nt_out): | |
if response is None: | |
response = self.create_null_json() | |
self.write_json_to_file(txt_file_path, response) | |
# Then add the null info to the spreadsheet | |
response_null = self.create_null_row(filename_without_extension, path_to_crop, txt_file_path, jpg_file_path_OCR_helper) | |
self.add_data_to_excel_from_response(self.path_transcription, response_null, filename_without_extension, path_to_crop, txt_file_path, jpg_file_path_OCR_helper, nt_in=0, nt_out=0) | |
### Set completed JSON | |
else: | |
response = self.clean_catalog_number(response, filename_without_extension) | |
self.write_json_to_file(txt_file_path, response) | |
# add to the xlsx file | |
self.add_data_to_excel_from_response(self.path_transcription, response, filename_without_extension, path_to_crop, txt_file_path, jpg_file_path_OCR_helper, nt_in, nt_out) | |
return response | |
def process_specimen_batch(self, progress_report, is_real_run=False): | |
if is_real_run: | |
progress_report.update_overall(f"Transcribing Labels") | |
try: | |
if self.has_key: | |
if self.model_name: | |
final_json_response, total_tokens_in, total_tokens_out = self.use_chatGPT(self.is_azure, progress_report, self.model_name) | |
else: | |
final_json_response, total_tokens_in, total_tokens_out = self.use_PaLM(progress_report) | |
return final_json_response, total_tokens_in, total_tokens_out | |
else: | |
self.logger.info(f'No API key found for {self.version_name}') | |
raise Exception(f"No API key found for {self.version_name}") | |
except: | |
if progress_report is not None: | |
progress_report.reset_batch(f"Batch Failed") | |
self.logger.error("LLM call failed. Ending batch. process_specimen_batch()") | |
for handler in self.logger.handlers[:]: | |
handler.close() | |
self.logger.removeHandler(handler) | |
raise | |
def process_specimen_batch_OCR_test(self, path_to_crop): | |
for img_filename in os.listdir(path_to_crop): | |
img_path = os.path.join(path_to_crop, img_filename) | |
self.OCR, self.bounds, self.text_to_box_mapping = detect_text(img_path, self.client) | |
def space_saver(cfg, Dirs, logger): | |
dir_out = cfg['leafmachine']['project']['dir_output'] | |
run_name = Dirs.run_name | |
path_project = os.path.join(dir_out, run_name) | |
if cfg['leafmachine']['project']['delete_temps_keep_VVE']: | |
logger.name = '[DELETE TEMP FILES]' | |
logger.info("Deleting temporary files. Keeping files required for VoucherVisionEditor.") | |
delete_dirs = ['Archival_Components', 'Config_File'] | |
for d in delete_dirs: | |
path_delete = os.path.join(path_project, d) | |
if os.path.exists(path_delete): | |
shutil.rmtree(path_delete) | |
elif cfg['leafmachine']['project']['delete_all_temps']: | |
logger.name = '[DELETE TEMP FILES]' | |
logger.info("Deleting ALL temporary files!") | |
delete_dirs = ['Archival_Components', 'Config_File', 'Original_Images', 'Cropped_Images'] | |
for d in delete_dirs: | |
path_delete = os.path.join(path_project, d) | |
if os.path.exists(path_delete): | |
shutil.rmtree(path_delete) | |
# Delete the transctiption folder, but keep the xlsx | |
transcription_path = os.path.join(path_project, 'Transcription') | |
if os.path.exists(transcription_path): | |
for item in os.listdir(transcription_path): | |
item_path = os.path.join(transcription_path, item) | |
if os.path.isdir(item_path): # if the item is a directory | |
if os.path.exists(item_path): | |
shutil.rmtree(item_path) # delete the directory | |