Spaces:
Runtime error
Runtime error
import os | |
import pandas as pd | |
os.system('cd fairseq;' | |
'pip install ./; cd ..') | |
os.system('cd ezocr;' | |
'pip install .; cd ..') | |
import torch | |
import numpy as np | |
from fairseq import utils, tasks | |
from fairseq import checkpoint_utils | |
from utils.eval_utils import eval_step | |
from data.mm_data.ocr_dataset import ocr_resize | |
from tasks.mm_tasks.ocr import OcrTask | |
from PIL import Image, ImageDraw | |
from torchvision import transforms | |
from typing import List, Tuple | |
import cv2 | |
from easyocrlite import ReaderLite | |
import gradio as gr | |
# Register refcoco task | |
tasks.register_task('ocr', OcrTask) | |
os.system('wget https://shuangqing-multimodal.oss-cn-zhangjiakou.aliyuncs.com/ocr_general_clean.pt; ' | |
'mkdir -p checkpoints; mv ocr_general_clean.pt checkpoints/ocr_general_clean.pt') | |
# turn on cuda if GPU is available | |
use_cuda = torch.cuda.is_available() | |
# use fp16 only when GPU is available | |
use_fp16 = True | |
mean = [0.5, 0.5, 0.5] | |
std = [0.5, 0.5, 0.5] | |
Rect = Tuple[int, int, int, int] | |
FourPoint = Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int]] | |
def four_point_transform(image: np.ndarray, rect: FourPoint) -> np.ndarray: | |
(tl, tr, br, bl) = rect | |
widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2)) | |
widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2)) | |
maxWidth = max(int(widthA), int(widthB)) | |
# compute the height of the new image, which will be the | |
# maximum distance between the top-right and bottom-right | |
# y-coordinates or the top-left and bottom-left y-coordinates | |
heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2)) | |
heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2)) | |
maxHeight = max(int(heightA), int(heightB)) | |
dst = np.array( | |
[[0, 0], [maxWidth - 1, 0], [maxWidth - 1, maxHeight - 1], [0, maxHeight - 1]], | |
dtype="float32", | |
) | |
# compute the perspective transform matrix and then apply it | |
M = cv2.getPerspectiveTransform(rect, dst) | |
warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight)) | |
return warped | |
def get_images(img: str, reader: ReaderLite, **kwargs): | |
results = reader.process(img, **kwargs) | |
return results | |
def draw_boxes(image, bounds, color='red', width=4): | |
draw = ImageDraw.Draw(image) | |
for i, bound in enumerate(bounds): | |
p0, p1, p2, p3 = bound | |
draw.text((p0[0]+5, p0[1]+5), str(i+1), fill=color, align='center') | |
draw.line([*p0, *p1, *p2, *p3, *p0], fill=color, width=width) | |
return image | |
def encode_text(task, text, length=None, append_bos=False, append_eos=False): | |
bos_item = torch.LongTensor([task.src_dict.bos()]) | |
eos_item = torch.LongTensor([task.src_dict.eos()]) | |
pad_idx = task.src_dict.pad() | |
s = task.tgt_dict.encode_line( | |
line=task.bpe.encode(text), | |
add_if_not_exist=False, | |
append_eos=False | |
).long() | |
if length is not None: | |
s = s[:length] | |
if append_bos: | |
s = torch.cat([bos_item, s]) | |
if append_eos: | |
s = torch.cat([s, eos_item]) | |
return s | |
def patch_resize_transform(patch_image_size=480, is_document=False): | |
_patch_resize_transform = transforms.Compose( | |
[ | |
lambda image: ocr_resize( | |
image, patch_image_size, is_document=is_document, split='test', | |
), | |
transforms.ToTensor(), | |
transforms.Normalize(mean=mean, std=std), | |
] | |
) | |
return _patch_resize_transform | |
reader = ReaderLite(gpu=True) | |
overrides={"eval_cider": False, "beam": 5, "max_len_b": 64, "patch_image_size": 480, | |
"orig_patch_image_size": 224, "no_repeat_ngram_size": 0, "seed": 42} | |
models, cfg, task = checkpoint_utils.load_model_ensemble_and_task( | |
utils.split_paths('checkpoints/ocr_general_clean.pt'), | |
arg_overrides=overrides | |
) | |
# Move models to GPU | |
for model in models: | |
model.eval() | |
if use_fp16: | |
model.half() | |
if use_cuda and not cfg.distributed_training.pipeline_model_parallel: | |
model.cuda() | |
model.prepare_for_inference_(cfg) | |
# Initialize generator | |
generator = task.build_generator(models, cfg.generation) | |
bos_item = torch.LongTensor([task.src_dict.bos()]) | |
eos_item = torch.LongTensor([task.src_dict.eos()]) | |
pad_idx = task.src_dict.pad() | |
# Construct input for caption task | |
def construct_sample(task, image: Image, patch_image_size=480): | |
patch_image = patch_resize_transform(patch_image_size)(image).unsqueeze(0) | |
patch_mask = torch.tensor([True]) | |
src_text = encode_text(task, "图片上的文字是什么?", append_bos=True, append_eos=True).unsqueeze(0) | |
src_length = torch.LongTensor([s.ne(pad_idx).long().sum() for s in src_text]) | |
sample = { | |
"id":np.array(['42']), | |
"net_input": { | |
"src_tokens": src_text, | |
"src_lengths": src_length, | |
"patch_images": patch_image, | |
"patch_masks": patch_mask, | |
}, | |
"target": None | |
} | |
return sample | |
# Function to turn FP32 to FP16 | |
def apply_half(t): | |
if t.dtype is torch.float32: | |
return t.to(dtype=torch.half) | |
return t | |
def ocr(img): | |
out_img = Image.open(img) | |
results = get_images(img, reader, text_confidence=0.7, text_threshold=0.4, | |
link_threshold=0.43, slope_ths=0., add_margin=0.02) | |
box_list, image_list = zip(*results) | |
draw_boxes(out_img, box_list) | |
ocr_result = [] | |
for i, (box, image) in enumerate(zip(box_list, image_list)): | |
image = Image.fromarray(image) | |
sample = construct_sample(task, image, cfg.task.patch_image_size) | |
sample = utils.move_to_cuda(sample) if use_cuda else sample | |
sample = utils.apply_to_sample(apply_half, sample) if use_fp16 else sample | |
with torch.no_grad(): | |
result, scores = eval_step(task, generator, models, sample) | |
ocr_result.append([str(i+1), result[0]['ocr'].replace(' ', '')]) | |
result = pd.DataFrame(ocr_result, columns=['Box ID', 'Text']) | |
return out_img, result | |
title = "Chinese OCR" | |
description = "Gradio Demo for Chinese OCR based on OFA. "\ | |
"Upload your own image or click any one of the examples, and click " \ | |
"\"Submit\" and then wait for the generated OCR result. " \ | |
"\n中文OCR体验区。欢迎上传图片,静待检测文字返回~" | |
article = "<p style='text-align: center'><a href='https://github.com/OFA-Sys/OFA' target='_blank'>OFA Github " \ | |
"Repo</a></p> " | |
examples = [['shupai.png'], ['chinese.jpg'], ['gaidao.jpeg'], ['qiaodaima.png'], | |
['benpao.jpeg'], ['wanli.png'], ['xsd.jpg']] | |
io = gr.Interface(fn=ocr, inputs=gr.inputs.Image(type='filepath', label='Image'), | |
outputs=[gr.outputs.Image(type='pil', label='Image'), | |
gr.outputs.Dataframe(headers=['Box ID', 'Text'], type='pandas', label='OCR Results')], | |
title=title, description=description, article=article, examples=examples) | |
io.launch() | |