import os import openai import gradio as gr import requests import datetime from io import BytesIO from google.api_core.client_options import ClientOptions from google.cloud import documentai_v1 as documentai import json from google.cloud import vision import time from settings import char_remove, gpt_api_key, gpt_model, RPFAAP2, RPFAAP1, project_id, project_location, processor_id from tqdm import tqdm import logging logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def chat_gpt_image(content, context): openai.api_key = gpt_api_key prompt = "You are an expert at identifying OCR errors and correcting them with the help of context, intuition and logic." document = "The following text was scanned using OCR, your goal is to return a corrected version of the text" prefix = "Additionally" if context == "": sequence = (document, content) else: sequence_1 = (prefix, context) additional = (" ".join(sequence_1)) sequence = (additional, content) final_content = (" ".join(sequence)) logging.info(final_content) completion = openai.ChatCompletion.create( model=gpt_model, user="1", messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": final_content} ] ) logging.info(completion.choices[0].message.content) return(completion.choices[0].message.content) def remove_na(string): for char in char_remove: string = string.replace(char, "") return string def chat_gpt_document(content, document_type, context): openai.api_key = gpt_api_key prompt = "You are an expert at identifying OCR errors and correcting them with the help of context, intuition and logic." document_prefix = "The following text was scanned using OCR, your goal is to extract the important entities from the text and correct them with the help of the restrictions placed in the desired format. Remember to not make any changes on the labels of the desired format, simply extract the text, correct it and return only the desired format. Text:" additional_prefix = "Additionally the text" content_info = content[0] content_name = content[1] if document_type == "RPFAA Building P1": document = "RPFAAP1.json" desired_format = RPFAAP1 elif document_type == "RPFAA Building P2": document = "RPFAAP2.json" desired_format = RPFAAP2 else: document = "" desired_format = "" if context == "": sequence_1 = (document_prefix, content_info, desired_format) else: sequence_1 = (document_prefix, content_info, desired_format, additional_prefix, context) content_1 = (" ".join(sequence_1)) logging.info(content_1) completion_1 = openai.ChatCompletion.create( model=gpt_model, user="1", messages=[ {"role": "system", "content": prompt}, {"role": "user", "content": content_1} ] ) logging.info(completion_1.choices[0].message.content) input_string = remove_na(completion_1.choices[0].message.content) with open(document) as f: property_info = json.load(f) #Adds the name of the file property_info["File Name"] = content_name #Fills in the information for line in input_string.split('\n'): if ':' in line: key, value = line.split(':', 1) key = key.strip() for category in property_info: if key in property_info[category]: property_info[category][key] = value.strip() break else: if key == "File Name": property_info[key] = value.strip() return json.dumps(property_info, indent=4) # def get_openai_api_usage(): # openai.api_key = "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" # api_key = "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" # org_id = "org-lqZ72EJMjCjjXdRwPNfys6YO" # session = requests.Session() # headers = { # "Authorization": f"Bearer {api_key}", # "OpenAI-Organization": org_id # } # # Define the start and end dates for the usage data # today = datetime.date.today() # start_date = today - datetime.timedelta(days=30) # end_date = today # # Make the API call to retrieve the usage data # url = f"https://api.openai.com/v1/usage?date=2023-03-29" # response = session.get(url, headers=headers) # response.raise_for_status() # usage_data = response.json().get("data", []) # print(usage_data) # for item in usage_data: # print(f"Date: {item['aggregation_timestamp']}") # print(f"Requests: {item['n_requests']}") # print(f"Tokens: {item['n_context_tokens_total']}") # print(f"Model ID: {item['n_generated_tokens_total']}") def detect_image(content, lang): os.environ['GOOGLE_APPLICATION_CREDENTIALS']=r'advance-river-381411-c7be39c33cff.json' client = vision.ImageAnnotatorClient() buffer = BytesIO() content.save(buffer, format="PNG") content = buffer.getvalue() if lang == "Filpino": hints = "tl" else: hints = "en" image = vision.Image(content=content) response = client.document_text_detection(image=image, image_context={"language_hints": [hints]}) if response.error.message: raise Exception( '{}\nFor more info on error messages, check: ' 'https://cloud.google.com/apis/design/errors'.format( response.error.message)) return(response.full_text_annotation.text) def detect_document(content): os.environ['GOOGLE_APPLICATION_CREDENTIALS']=r'advance-river-381411-c7be39c33cff.json' PROJECT_ID = project_id LOCATION = project_location # Format is 'us' or 'eu' PROCESSOR_ID = processor_id # Create processor in Cloud Console content_extension = content.name.split(".")[-1] if content_extension.upper() == "TIFF": MIME_TYPE = "image/tiff" elif content_extension.upper() =="PDF": MIME_TYPE = "application/pdf" elif content_extension.upper() =="PNG": MIME_TYPE = "image/png" elif content_extension.upper() =="JPG": MIME_TYPE = "image/jpg" else: return("Please upload a valid MIME type") docai_client = documentai.DocumentProcessorServiceClient( client_options=ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com") ) RESOURCE_NAME = docai_client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID) with open(content.name, "rb") as image: image_content = image.read() raw_document = documentai.RawDocument(content=image_content, mime_type=MIME_TYPE) request = documentai.ProcessRequest(name=RESOURCE_NAME, raw_document=raw_document) result = docai_client.process_document(request=request) document_object = result.document name = content.name.split('\\')[-1] name = name.split('.')[0] return(document_object.text, name) def image(content, lang, context): return chat_gpt_image(detect_image(content, lang), context) def document(content, document_type, context): return chat_gpt_document(detect_document(content),document_type,context) def batch_document(content, document_type, context, progress=gr.Progress()): progress(0, desc="Starting") retries = 5 timeout = 5 i = 0 j = 0 combined_data = [] for x in progress.tqdm(content, desc="Processing"): while True: try: data = json.loads(chat_gpt_document(detect_document(x),document_type,context)) combined_data.append(data) break except openai.error.APIConnectionError: logging.error(f'Retry {i+1} failed: openai.error.APIConnectionError') if i < retries - 1: logging.error(f'Retrying in {timeout} seconds...') time.sleep(timeout) i += 1 except openai.error.RateLimitError: logging.error(f'Retry {j+1} failed: openai.error.RateLimitError') if j < retries - 1: logging.error(f'Retrying in {timeout} seconds...') time.sleep(timeout) j += 1 logging.info(combined_data) return save_json(combined_data, document_type) def save_json(text, filename): filename = filename+".json" with open(filename, "w") as outfile: json.dump(text, outfile) return filename with gr.Blocks(title="Ottico OCR", css=".markdown {text-align: center;}") as app: gr.Markdown("""# Ottico OCR Attach Images or Files below and convert them to Text.""", elem_classes="markdown") with gr.Tab("Scan Image"): with gr.Row(): with gr.Column(): image_input = [gr.Image(type="pil"), gr.Radio(["English", "Filipino"], label="Language", info="What is the document language? (Optional)"), gr.Textbox(label="What kind of Image is this? (Optional)", placeholder="This is an image of an Official Reciept")] image_output = gr.Textbox(label="Result") image_button = gr.Button("Scan") with gr.Tab("Scan Document"): with gr.Row(): with gr.Column(): document_input = [gr.File(file_types=["pdf","tiff","image","text"]), gr.Dropdown(["RPFAA Building P1", "RPFAA Building P2"], label="File Type", info="What type of document is this?"), gr.Textbox(label="Any additional information? (Optional)", placeholder="This is document is an Official Reciept")] document_output = gr.Textbox(label="Result") document_button = gr.Button("Scan") with gr.Tab("Batch Scan"): with gr.Row(): with gr.Column(): batch_document_input = [gr.File(file_types=["pdf","tiff","image","text"], file_count="multiple"), gr.Dropdown(["RPFAA Building P1", "RPFAA Building P2"], label="File Type", info="What type of document is this?"), gr.Textbox(label="Any additional information? (Optional)", placeholder="This is document is an Official Reciept")] batch_document_output = gr.File(label="Result") batch_document_button = gr.Button("Scan") image_button.click(image, inputs=image_input, outputs=image_output) document_button.click(document, inputs=document_input, outputs=document_output) batch_document_button.click(batch_document, inputs=batch_document_input, outputs=batch_document_output) app.queue() app.launch(share=True, auth=("username", "password"))