Axon_OCR / app.py
Sage
added error fixes
c070304
raw
history blame
12.2 kB
import os
import openai
import gradio as gr
import requests
import datetime
from io import BytesIO
from google.api_core.client_options import ClientOptions
from google.cloud import documentai_v1 as documentai
import json
from google.cloud import vision
import time
from settings import char_remove, gpt_model, RPFAAP2, RPFAAP1, project_id, project_location, processor_id
from tqdm import tqdm
import logging
import google
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def chat_gpt_image(content, context):
openai.api_key = os.environ['GPT_API_KEY']
prompt = "You are an expert at identifying OCR errors and correcting them with the help of context, intuition and logic."
document = "The following text was scanned using OCR, your goal is to return a corrected version of the text"
prefix = "Additionally"
if context == "":
sequence = (document, content)
else:
sequence_1 = (prefix, context)
additional = (" ".join(sequence_1))
sequence = (additional, content)
final_content = (" ".join(sequence))
logging.info(final_content)
completion = openai.ChatCompletion.create(
model=gpt_model,
user="1",
temperature=0.1,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": final_content}
]
)
logging.info(completion.choices[0].message.content)
return(completion.choices[0].message.content)
def remove_na(string):
for char in char_remove:
string = string.replace(char, "")
return string
def chat_gpt_document(content, document_type, context):
openai.api_key = os.environ['GPT_API_KEY']
prompt = "You are an expert at identifying OCR errors and correcting them with the help of context, intuition and logic."
document_prefix = "The following text was scanned using OCR, your goal is to extract the important entities from the text and correct them with the help of the restrictions placed in the desired format. Remember to not make any changes on the labels of the desired format, simply extract the text, correct it and return only the desired format. Text:"
additional_prefix = "Additionally the text"
content_info = content[0]
content_name = content[1]
if document_type == "RPFAA Building P1":
document = "RPFAAP1.json"
desired_format = RPFAAP1
elif document_type == "RPFAA Building P2":
document = "RPFAAP2.json"
desired_format = RPFAAP2
else:
property_info = ["Please Select a Document Type"]
return json.dumps(property_info, indent=4)
if context == "":
sequence_1 = (document_prefix, content_info, desired_format)
else:
sequence_1 = (document_prefix, content_info, desired_format, additional_prefix, context)
content_1 = (" ".join(sequence_1))
logging.info(content_1)
completion_1 = openai.ChatCompletion.create(
model=gpt_model,
user="1",
temperature=0.1,
messages=[
{"role": "system", "content": prompt},
{"role": "user", "content": content_1}
]
)
logging.info(completion_1.choices[0].message.content)
input_string = remove_na(completion_1.choices[0].message.content)
with open(document) as f:
property_info = json.load(f)
#Adds the name of the file
property_info["File Name"] = content_name
#Fills in the information
for line in input_string.split('\n'):
if ':' in line:
key, value = line.split(':', 1)
key = key.strip()
for category in property_info:
if key in property_info[category]:
property_info[category][key] = value.strip()
break
else:
if key == "File Name":
property_info[key] = value.strip()
return json.dumps(property_info, indent=4)
def detect_image(content, lang):
credentials = json.loads(os.environ['CREDENTIALS'])
temp_file_path = 'temp_credentials.json'
with open(temp_file_path, 'w') as file:
json.dump(credentials, file)
os.environ['GOOGLE_APPLICATION_CREDENTIALS']=r'temp_credentials.json'
client = vision.ImageAnnotatorClient()
buffer = BytesIO()
content.save(buffer, format="PNG")
content = buffer.getvalue()
if lang == "Filpino":
hints = "tl"
else:
hints = "en"
image = vision.Image(content=content)
response = client.document_text_detection(image=image, image_context={"language_hints": [hints]})
if response.error.message:
raise Exception(
'{}\nFor more info on error messages, check: '
'https://cloud.google.com/apis/design/errors'.format(
response.error.message))
os.remove(temp_file_path)
logging.info(response)
return(response.full_text_annotation.text)
def detect_document(content):
credentials = json.loads(os.environ['CREDENTIALS'])
temp_file_path = 'temp_credentials.json'
with open(temp_file_path, 'w') as file:
json.dump(credentials, file)
os.environ['GOOGLE_APPLICATION_CREDENTIALS']=r'temp_credentials.json'
PROJECT_ID = project_id
LOCATION = project_location # Format is 'us' or 'eu'
PROCESSOR_ID = processor_id # Create processor in Cloud Console
content_extension = content.name.split(".")[-1]
if content_extension.upper() == "TIFF":
MIME_TYPE = "image/tiff"
elif content_extension.upper() =="PDF":
MIME_TYPE = "application/pdf"
elif content_extension.upper() =="PNG":
MIME_TYPE = "image/png"
elif content_extension.upper() =="JPG":
MIME_TYPE = "image/jpg"
else:
return("Please upload a valid MIME type")
docai_client = documentai.DocumentProcessorServiceClient(
client_options=ClientOptions(api_endpoint=f"{LOCATION}-documentai.googleapis.com")
)
RESOURCE_NAME = docai_client.processor_path(PROJECT_ID, LOCATION, PROCESSOR_ID)
with open(content.name, "rb") as image:
image_content = image.read()
raw_document = documentai.RawDocument(content=image_content, mime_type=MIME_TYPE)
request = documentai.ProcessRequest(name=RESOURCE_NAME, raw_document=raw_document)
result = docai_client.process_document(request=request)
document_object = result.document
name = content.name.split('\\')[-1]
name = name.split("/")[-1]
name = name.split('.')[0]
os.remove(temp_file_path)
return(document_object.text, name)
def image(content, lang, context):
return chat_gpt_image(detect_image(content, lang), context)
def document(content, document_type, context):
return chat_gpt_document(detect_document(content),document_type,context)
unprocessed_documents = []
global_document_type = None
global_context = None
def batch_document(content, document_type, context, progress = gr.Progress()):
logging.info(content)
combined_data = []
global global_document_type
global global_context
global_document_type = document_type
global_context = context
if progress == "None":
for x in content:
retries = 1
timeout = 1
i = 0
while True:
try:
data = json.loads(chat_gpt_document(detect_document(x),document_type,context))
combined_data.append(data)
break
except (openai.error.APIConnectionError, openai.error.AuthenticationError, openai.error.RateLimitError, google.api_core.exceptions.RetryError, requests.exceptions.RequestException) as e:
logging.error(f'Retry {i+1} failed: {e}')
if i < retries - 1:
logging.error(f'Retrying in {timeout} seconds...')
time.sleep(timeout)
i += 1
else:
unprocessed_documents.append(x)
break
else:
progress(0, desc="Starting")
for x in progress.tqdm(content, desc="Processing"):
retries = 1
timeout = 1
i = 0
while True:
try:
data = json.loads(chat_gpt_document(detect_document(x),document_type,context))
combined_data.append(data)
break
except (openai.error.APIConnectionError, openai.error.AuthenticationError, openai.error.RateLimitError, google.api_core.exceptions.RetryError, requests.exceptions.RequestException) as e:
logging.error(f'Retry {i+1} failed: {e}')
if i < retries - 1:
logging.error(f'Retrying in {timeout} seconds...')
time.sleep(timeout)
i += 1
else:
unprocessed_documents.append(x)
break
logging.info(combined_data)
logging.info(unprocessed_documents)
if document_type == "":
document_type = "error"
return save_json(combined_data, document_type)
def retry_unprocessed_documents():
# This function will use the documents stored in unprocessed_documents
# and call batch_document on them
global global_document_type
global global_context
global unprocessed_documents
if unprocessed_documents:
output = batch_document(unprocessed_documents, global_document_type, global_context, "None")
unprocessed_documents = []
return output
else:
unprocessed_documents = []
return save_json("No Unprocessed Documents", "No Unprocessed Documents")
def save_json(text, filename):
filename = filename+".json"
with open(filename, "w", encoding='utf-8') as outfile:
json.dump(text, outfile, ensure_ascii=False)
return filename
with gr.Blocks(title="Axon OCR", css=".markdown {text-align: center;}") as app:
gr.Markdown("""# Axon OCR
Attach Images or Files below and convert them to Text.""", elem_classes="markdown")
with gr.Tab("Scan Image"):
with gr.Row():
with gr.Column():
image_input = [gr.Image(type="pil"),
gr.Radio(["English", "Filipino"], label="Language", info="What is the document language? (Optional)"),
gr.Textbox(label="What kind of Image is this? (Optional)", placeholder="This is an image of an Official Reciept")]
image_output = gr.Textbox(label="Result")
image_button = gr.Button("Scan")
with gr.Tab("Scan Document"):
with gr.Row():
with gr.Column():
document_input = [gr.File(file_types=["pdf","tiff","image","text"]),
gr.Dropdown(["RPFAA Building P1", "RPFAA Building P2"], label="File Type", info="What type of document is this?"),
gr.Textbox(label="Any additional information? (Optional)", placeholder="This is document is an Official Reciept")]
document_output = gr.Textbox(label="Result")
document_button = gr.Button("Scan")
with gr.Tab("Batch Scan"):
with gr.Row():
with gr.Column():
batch_document_input = [gr.File(file_types=["pdf","tiff","image","text"], file_count="multiple"),
gr.Dropdown(["RPFAA Building P1", "RPFAA Building P2"], label="File Type", info="What type of document is this?"),
gr.Textbox(label="Any additional information? (Optional)", placeholder="This is document is an Official Reciept")]
batch_document_output = gr.File(label="Result")
batch_document_button = gr.Button("Scan")
retry_button = gr.Button("Retry Unprocessed Documents", label="Retry")
image_button.click(image, inputs=image_input, outputs=image_output)
document_button.click(document, inputs=document_input, outputs=document_output)
batch_document_button.click(batch_document, inputs=batch_document_input, outputs=batch_document_output)
retry_button.click(retry_unprocessed_documents, outputs=batch_document_output)
app.queue()
app.launch(auth=("username", "password"))