Spaces:
Runtime error
Runtime error
| from fastapi import FastAPI | |
| from fastapi.staticfiles import StaticFiles | |
| from fastapi.responses import FileResponse | |
| from fastapi import File, UploadFile | |
| from fastapi.responses import StreamingResponse | |
| from typing import List | |
| from pdf2image import convert_from_bytes | |
| import csv | |
| import io | |
| from transformers import pipeline | |
| app = FastAPI() | |
| """## Poppler dir""" | |
| poppler_path = "poppler-23.11.0/Library/bin" | |
| async def classify_doc(files: List[UploadFile] = File(...)): | |
| classificationResults = {} | |
| for file in files: | |
| try: | |
| contents = file.file.read() | |
| filename = file.filename | |
| if filename.endswith('.pdf'): | |
| try: | |
| pages = convert_from_bytes(contents, poppler_path = poppler_path) | |
| print(pages) | |
| for pagenum, image in enumerate(pages): | |
| classificationRes, dtype_conf = doctype_classify(image.convert('RGB'), filename) | |
| # add/update classification result dictionary | |
| if (classificationRes in classificationResults): | |
| classificationResults.update({classificationRes : classificationResults[classificationRes] + 1}) | |
| else: | |
| classificationResults.update({classificationRes : 1}) | |
| except Exception as err: | |
| print(err) | |
| return f"Error in opening {filename}, {err}" | |
| # png, jpg, jpeg files | |
| else: | |
| classificationRes = classify_acct_dtype_str(contents, filename) | |
| # add/update classification result dictionary | |
| if (classificationRes in classificationResults): | |
| classificationResults.update({classificationRes : classificationResults[classificationRes] + 1}) | |
| else: | |
| classificationResults.update({classificationRes : 1}) | |
| except Exception as err: | |
| print(Exception, err) | |
| return {"message": "There was an error in uploading file(s)"} | |
| finally: | |
| file.file.close() | |
| # Convert dictionary to CSV string | |
| csv_data = io.StringIO() | |
| csv_writer = csv.writer(csv_data) | |
| csv_writer.writerow(["Type", "Count"]) # Header row | |
| for key, value in classificationResults.items(): | |
| csv_writer.writerow([key, value]) | |
| return StreamingResponse( | |
| iter([csv_data.getvalue()]), | |
| media_type="text/csv", | |
| headers={"Content-Disposition": f"attachment; filename=data.csv"} | |
| ) | |
| # return {"message": f"{[file.filename for file in files]} : {[classifyFiles(file) for file in files]}"} | |
| def classifyFiles(file): | |
| try: | |
| contents = file.file.read() | |
| filename = file.filename | |
| classificationResults = [] | |
| if filename.endswith('.pdf'): | |
| try: | |
| pages = convert_from_bytes(open(file, 'rb').read()) | |
| for pagenum, image in enumerate(pages): | |
| if pagenum != 0 and pagenum < len(pages): | |
| classificationRes = classify_acct_dtype_str(contents, filename) | |
| # classificationResults[f"{pagenum:02d}"] = { | |
| # 'doctype': classificationRes | |
| # } | |
| except: | |
| return f"Error in opening {filename}" | |
| else: | |
| classificationRes = classify_acct_dtype_str(contents, filename) | |
| # classificationResults[f"{0:02d}"] = { | |
| # 'doctype' : classificationRes | |
| # } | |
| except Exception as err: | |
| print(Exception, err) | |
| return {"message": "There was an error in uploading file(s)"} | |
| finally: | |
| file.file.close() | |
| return classificationRes | |
| app.mount("/", StaticFiles(directory="static", html=True), name="static") | |
| def index() -> FileResponse: | |
| return FileResponse(path="/app/static/index.html", media_type="text/html") | |
| import re | |
| import torch | |
| from transformers import DonutProcessor, VisionEncoderDecoderModel | |
| from datasets import load_dataset | |
| import os | |
| from PIL import Image | |
| # Doc classifier model | |
| classifier_doctype_processor = DonutProcessor.from_pretrained("calumpianojericho/donutclassifier_acctdocs_by_doctype") | |
| classifier_doctype_model = VisionEncoderDecoderModel.from_pretrained("calumpianojericho/donutclassifier_acctdocs_by_doctype") | |
| """### Inference Code""" | |
| def inference(input, model, processor, threshold=1.0, task_prompt="", get_confidence=False): | |
| device = "cuda" if torch.cuda.is_available() else "cpu" | |
| model.to(device) | |
| is_confident = True | |
| decoder_input_ids = processor.tokenizer(task_prompt, add_special_tokens=False, return_tensors="pt").input_ids | |
| pil_img=input | |
| image = np.array(pil_img) | |
| pixel_values = processor(image, return_tensors="pt").pixel_values | |
| outputs = model.generate( | |
| pixel_values.to(device), | |
| decoder_input_ids=decoder_input_ids.to(device), | |
| max_length=model.decoder.config.max_position_embeddings, | |
| early_stopping=True, | |
| pad_token_id=processor.tokenizer.pad_token_id, | |
| eos_token_id= processor.tokenizer.eos_token_id, | |
| use_cache=True, | |
| num_beams=1, | |
| bad_words_ids=[[processor.tokenizer.unk_token_id]], | |
| return_dict_in_generate=True, | |
| output_scores=True, | |
| ) | |
| sequence = processor.batch_decode(outputs.sequences)[0] | |
| sequence = sequence.replace(processor.tokenizer.eos_token, "").replace(processor.tokenizer.pad_token, "") | |
| sequence = re.sub(r"<.*?>", "", sequence, count=1).strip() # remove first task start token | |
| seq = processor.token2json(sequence) | |
| if get_confidence: | |
| return seq, pred_confidence(outputs.scores, threshold) | |
| return seq | |
| def pred_confidence(output_scores, threshold): | |
| is_confident=True | |
| for score in output_scores: | |
| exp_scores = np.exp(score[0].cpu().numpy()) # scores are logits, we use the exp function so that all values are positive | |
| sum_exp = np.sum(exp_scores) # taking the sum of the token scores | |
| idx = np.argmax(exp_scores) # taking the index of the token with the highest score | |
| prob_max = exp_scores[idx]/sum_exp # normalizing the token with the highest score wrt the sum of all scores. Returns probability | |
| if prob_max < threshold: | |
| is_confident = False | |
| # print(prob_max) | |
| return is_confident | |
| CUDA_LAUNCH_BLOCKING=1 | |
| def parse_text(input, filename): | |
| model = base_model | |
| processor = base_processor | |
| seq = inference(input, model, processor, task_prompt="<s_synthdog>") | |
| return str(seq) | |
| def doctype_classify(input, filename): | |
| model = classifier_doctype_model | |
| processor = classifier_doctype_processor | |
| seq, is_confident = inference(input, model, processor, threshold=0.90, task_prompt="<s_classifier_acct>", get_confidence=True) | |
| return seq.get('class'), is_confident | |
| def account_classify(input, filename): | |
| model = classifier_account_model | |
| processor = classifier_account_processor | |
| seq, is_confident = inference(input, model, processor, threshold=0.999, task_prompt="<s_classifier_acct>", get_confidence=True) | |
| return seq.get('class'), is_confident | |
| """## Text processing/string matcher code""" | |
| import locale | |
| locale.getpreferredencoding = lambda: "UTF-8" | |
| """## Classify Document Images""" | |
| import numpy as np | |
| import csv | |
| import re | |
| import os | |
| import requests | |
| from io import BytesIO | |
| def classify_acct_dtype_str(content, filename): | |
| # response = requests.get("https://huggingface.co/datasets/Xenova/transformers.js-docs/resolve/main/city-streets.jpg") | |
| # ipt = Image.open(BytesIO(response.content)) | |
| try: | |
| ipt = Image.open(BytesIO(content)) | |
| dtype_inf, dtype_conf = doctype_classify(ipt, filename) | |
| except Exception as err: | |
| return f"Error in opening {filename}, {err}" | |
| return dtype_inf | |
| # classify_acct_dtype_str("https://huggingface.co/datasets/Xenova/transformers.js-docs/resolve/main/city-streets.jpg") |