|
from azure.core.credentials import AzureKeyCredential |
|
from azure.ai.formrecognizer import DocumentAnalysisClient |
|
from io import BytesIO |
|
from helpers import format_polygon, Logger |
|
import logging |
|
import os |
|
import sys |
|
|
|
logging.basicConfig(filename='app.log', level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') |
|
sys.stdout = Logger("output.log") |
|
|
|
endpoint = os.environ['AZURE_API_ENDPOINT'] |
|
key = os.environ['AZURE_API_KEY'] |
|
|
|
def detect_document(content): |
|
document_analysis_client = DocumentAnalysisClient( |
|
endpoint=endpoint, credential=AzureKeyCredential(key) |
|
) |
|
|
|
with open(content.name, "rb") as f: |
|
poller = document_analysis_client.begin_analyze_document( |
|
"prebuilt-document", document=f |
|
) |
|
result = poller.result() |
|
|
|
text_content = result.content |
|
logging.info(result.content) |
|
print(text_content) |
|
|
|
pair_content = "----Key-value pairs found in document----\n" |
|
for kv_pair in result.key_value_pairs: |
|
if kv_pair.key and kv_pair.value: |
|
pair_content += "Key '{}' with Value '{}' \n".format( |
|
kv_pair.key.content, |
|
kv_pair.value.content |
|
) |
|
logging.info(pair_content) |
|
print(pair_content) |
|
document_content = "----Lines found in document----\n" |
|
for page in result.pages: |
|
for line_idx, line in enumerate(page.lines): |
|
document_content += "...Line # {} text '{}' within bounding polygon '{}' \n".format( |
|
line_idx, |
|
line.content, |
|
format_polygon(line.polygon), |
|
) |
|
logging.info(document_content) |
|
print(document_content) |
|
table_content = "----Tables found in document----\n" |
|
for table_idx, table in enumerate(result.tables): |
|
table_content += "Table # {} has {} rows and {} columns\n".format( |
|
table_idx, table.row_count, table.column_count |
|
) |
|
for cell in table.cells: |
|
table_content += "...Cell[{}][{}] has content '{}'\n".format( |
|
cell.row_index, |
|
cell.column_index, |
|
cell.content, |
|
) |
|
logging.info(table_content) |
|
print(table_content) |
|
name = content.name.split('\\')[-1] |
|
name = name.split("/")[-1] |
|
name = name.split('.')[0] |
|
return (text_content, pair_content, document_content, table_content, name) |
|
|
|
def detect_image(content): |
|
document_analysis_client = DocumentAnalysisClient( |
|
endpoint=endpoint, credential=AzureKeyCredential(key) |
|
) |
|
byte_stream = BytesIO() |
|
content.save(byte_stream, format='PNG') |
|
byte_stream.seek(0) |
|
poller = document_analysis_client.begin_analyze_document( |
|
"prebuilt-read", document=byte_stream |
|
) |
|
|
|
result = poller.result() |
|
logging.info(result.content) |
|
print(result.content) |
|
return(result.content) |
|
|