from transformers import DonutProcessor, VisionEncoderDecoderModel import locale import re import json import torch from tqdm.auto import tqdm import numpy as np from donut import JSONParseEvaluator from datasets import load_dataset from functools import lru_cache import os import time from config import settings locale.getpreferredencoding = lambda: "UTF-8" @lru_cache(maxsize=1) def prepare_model(): processor = DonutProcessor.from_pretrained(settings.processor) model = VisionEncoderDecoderModel.from_pretrained(settings.model) device = "cuda" if torch.cuda.is_available() else "cpu" model.eval() model.to(device) dataset = load_dataset(settings.dataset, split="test") return processor, model, device, dataset def run_evaluate_donut(): worker_pid = os.getpid() print(f"Handling evaluation request with worker PID: {worker_pid}") start_time = time.time() output_list = [] accs = [] processor, model, device, dataset = prepare_model() for idx, sample in tqdm(enumerate(dataset), total=len(dataset)): # prepare encoder inputs pixel_values = processor(sample["image"].convert("RGB"), return_tensors="pt").pixel_values pixel_values = pixel_values.to(device) # prepare decoder inputs task_prompt = "" decoder_input_ids = processor.tokenizer(task_prompt, add_special_tokens=False, return_tensors="pt").input_ids decoder_input_ids = decoder_input_ids.to(device) # autoregressively generate sequence outputs = model.generate( pixel_values, decoder_input_ids=decoder_input_ids, max_length=model.decoder.config.max_position_embeddings, early_stopping=True, pad_token_id=processor.tokenizer.pad_token_id, eos_token_id=processor.tokenizer.eos_token_id, use_cache=True, num_beams=1, bad_words_ids=[[processor.tokenizer.unk_token_id]], return_dict_in_generate=True, ) # turn into JSON seq = processor.batch_decode(outputs.sequences)[0] seq = seq.replace(processor.tokenizer.eos_token, "").replace(processor.tokenizer.pad_token, "") seq = re.sub(r"<.*?>", "", seq, count=1).strip() # remove first task start token seq = processor.token2json(seq) ground_truth = json.loads(sample["ground_truth"]) ground_truth = ground_truth["gt_parse"] evaluator = JSONParseEvaluator() score = evaluator.cal_acc(seq, ground_truth) accs.append(score) output_list.append(seq) end_time = time.time() processing_time = end_time - start_time scores = {"accuracies": accs, "mean_accuracy": np.mean(accs)} print(scores, f"length : {len(accs)}") print("Mean accuracy:", np.mean(accs)) print(f"Evaluation done, worker PID: {worker_pid}") return scores, np.mean(accs), processing_time