Spaces:
Runtime error
Runtime error
from fastapi import FastAPI | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.responses import FileResponse | |
from fastapi import File, UploadFile | |
from fastapi.responses import StreamingResponse | |
from typing import List | |
from pdf2image import convert_from_bytes | |
import csv | |
import io | |
from transformers import pipeline | |
app = FastAPI() | |
"""## Poppler dir""" | |
poppler_path = "poppler-23.11.0/Library/bin" | |
async def classify_doc(files: List[UploadFile] = File(...)): | |
classificationResults = {} | |
for file in files: | |
try: | |
contents = file.file.read() | |
filename = file.filename | |
if filename.endswith('.pdf'): | |
try: | |
pages = convert_from_bytes(contents, poppler_path = poppler_path) | |
print(pages) | |
for pagenum, image in enumerate(pages): | |
classificationRes, dtype_conf = doctype_classify(image.convert('RGB'), filename) | |
# add/update classification result dictionary | |
if (classificationRes in classificationResults): | |
classificationResults.update({classificationRes : classificationResults[classificationRes] + 1}) | |
else: | |
classificationResults.update({classificationRes : 1}) | |
except Exception as err: | |
print(err) | |
return f"Error in opening {filename}, {err}" | |
# png, jpg, jpeg files | |
else: | |
classificationRes = classify_acct_dtype_str(contents, filename) | |
# add/update classification result dictionary | |
if (classificationRes in classificationResults): | |
classificationResults.update({classificationRes : classificationResults[classificationRes] + 1}) | |
else: | |
classificationResults.update({classificationRes : 1}) | |
except Exception as err: | |
print(Exception, err) | |
return {"message": "There was an error in uploading file(s)"} | |
finally: | |
file.file.close() | |
# Convert dictionary to CSV string | |
csv_data = io.StringIO() | |
csv_writer = csv.writer(csv_data) | |
csv_writer.writerow(["Type", "Count"]) # Header row | |
for key, value in classificationResults.items(): | |
csv_writer.writerow([key, value]) | |
return StreamingResponse( | |
iter([csv_data.getvalue()]), | |
media_type="text/csv", | |
headers={"Content-Disposition": f"attachment; filename=data.csv"} | |
) | |
# return {"message": f"{[file.filename for file in files]} : {[classifyFiles(file) for file in files]}"} | |
def classifyFiles(file): | |
try: | |
contents = file.file.read() | |
filename = file.filename | |
classificationResults = [] | |
if filename.endswith('.pdf'): | |
try: | |
pages = convert_from_bytes(open(file, 'rb').read()) | |
for pagenum, image in enumerate(pages): | |
if pagenum != 0 and pagenum < len(pages): | |
classificationRes = classify_acct_dtype_str(contents, filename) | |
# classificationResults[f"{pagenum:02d}"] = { | |
# 'doctype': classificationRes | |
# } | |
except: | |
return f"Error in opening {filename}" | |
else: | |
classificationRes = classify_acct_dtype_str(contents, filename) | |
# classificationResults[f"{0:02d}"] = { | |
# 'doctype' : classificationRes | |
# } | |
except Exception as err: | |
print(Exception, err) | |
return {"message": "There was an error in uploading file(s)"} | |
finally: | |
file.file.close() | |
return classificationRes | |
app.mount("/", StaticFiles(directory="static", html=True), name="static") | |
def index() -> FileResponse: | |
return FileResponse(path="/app/static/index.html", media_type="text/html") | |
import re | |
import torch | |
from transformers import DonutProcessor, VisionEncoderDecoderModel | |
from datasets import load_dataset | |
import os | |
from PIL import Image | |
# Doc classifier model | |
classifier_doctype_processor = DonutProcessor.from_pretrained("calumpianojericho/donutclassifier_acctdocs_by_doctype") | |
classifier_doctype_model = VisionEncoderDecoderModel.from_pretrained("calumpianojericho/donutclassifier_acctdocs_by_doctype") | |
"""### Inference Code""" | |
def inference(input, model, processor, threshold=1.0, task_prompt="", get_confidence=False): | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
model.to(device) | |
is_confident = True | |
decoder_input_ids = processor.tokenizer(task_prompt, add_special_tokens=False, return_tensors="pt").input_ids | |
pil_img=input | |
image = np.array(pil_img) | |
pixel_values = processor(image, return_tensors="pt").pixel_values | |
outputs = model.generate( | |
pixel_values.to(device), | |
decoder_input_ids=decoder_input_ids.to(device), | |
max_length=model.decoder.config.max_position_embeddings, | |
early_stopping=True, | |
pad_token_id=processor.tokenizer.pad_token_id, | |
eos_token_id= processor.tokenizer.eos_token_id, | |
use_cache=True, | |
num_beams=1, | |
bad_words_ids=[[processor.tokenizer.unk_token_id]], | |
return_dict_in_generate=True, | |
output_scores=True, | |
) | |
sequence = processor.batch_decode(outputs.sequences)[0] | |
sequence = sequence.replace(processor.tokenizer.eos_token, "").replace(processor.tokenizer.pad_token, "") | |
sequence = re.sub(r"<.*?>", "", sequence, count=1).strip() # remove first task start token | |
seq = processor.token2json(sequence) | |
if get_confidence: | |
return seq, pred_confidence(outputs.scores, threshold) | |
return seq | |
def pred_confidence(output_scores, threshold): | |
is_confident=True | |
for score in output_scores: | |
exp_scores = np.exp(score[0].cpu().numpy()) # scores are logits, we use the exp function so that all values are positive | |
sum_exp = np.sum(exp_scores) # taking the sum of the token scores | |
idx = np.argmax(exp_scores) # taking the index of the token with the highest score | |
prob_max = exp_scores[idx]/sum_exp # normalizing the token with the highest score wrt the sum of all scores. Returns probability | |
if prob_max < threshold: | |
is_confident = False | |
# print(prob_max) | |
return is_confident | |
CUDA_LAUNCH_BLOCKING=1 | |
def parse_text(input, filename): | |
model = base_model | |
processor = base_processor | |
seq = inference(input, model, processor, task_prompt="<s_synthdog>") | |
return str(seq) | |
def doctype_classify(input, filename): | |
model = classifier_doctype_model | |
processor = classifier_doctype_processor | |
seq, is_confident = inference(input, model, processor, threshold=0.90, task_prompt="<s_classifier_acct>", get_confidence=True) | |
return seq.get('class'), is_confident | |
def account_classify(input, filename): | |
model = classifier_account_model | |
processor = classifier_account_processor | |
seq, is_confident = inference(input, model, processor, threshold=0.999, task_prompt="<s_classifier_acct>", get_confidence=True) | |
return seq.get('class'), is_confident | |
"""## Text processing/string matcher code""" | |
import locale | |
locale.getpreferredencoding = lambda: "UTF-8" | |
"""## Classify Document Images""" | |
import numpy as np | |
import csv | |
import re | |
import os | |
import requests | |
from io import BytesIO | |
def classify_acct_dtype_str(content, filename): | |
# response = requests.get("https://huggingface.co/datasets/Xenova/transformers.js-docs/resolve/main/city-streets.jpg") | |
# ipt = Image.open(BytesIO(response.content)) | |
try: | |
ipt = Image.open(BytesIO(content)) | |
dtype_inf, dtype_conf = doctype_classify(ipt, filename) | |
except Exception as err: | |
return f"Error in opening {filename}, {err}" | |
return dtype_inf | |
# classify_acct_dtype_str("https://huggingface.co/datasets/Xenova/transformers.js-docs/resolve/main/city-streets.jpg") |