import os import pandas as pd os.system('cd fairseq;' 'pip install ./; cd ..') os.system('cd ezocr;' 'pip install .; cd ..') os.system('pip install --upgrade tensorflow-gpu==1.15;' 'pip install "modelscope[cv]" -f https://modelscope.oss-cn-beijing.aliyuncs.com/releases/repo.html') import torch import numpy as np from fairseq import utils, tasks from fairseq import checkpoint_utils from utils.eval_utils import eval_step from data.mm_data.ocr_dataset import ocr_resize from tasks.mm_tasks.ocr import OcrTask from PIL import Image, ImageDraw from torchvision import transforms from typing import List, Tuple import cv2 # from easyocrlite import ReaderLite import gradio as gr from modelscope.pipelines import pipeline from modelscope.utils.constant import Tasks from modelscope.outputs import OutputKeys from modelscope.preprocessors.image import load_image # Register refcoco task tasks.register_task('ocr', OcrTask) os.system('wget https://shuangqing-multimodal.oss-cn-zhangjiakou.aliyuncs.com/ocr_general_clean.pt; ' 'mkdir -p checkpoints; mv ocr_general_clean.pt checkpoints/ocr_general_clean.pt') # turn on cuda if GPU is available use_cuda = torch.cuda.is_available() # use fp16 only when GPU is available use_fp16 = True mean = [0.5, 0.5, 0.5] std = [0.5, 0.5, 0.5] Rect = Tuple[int, int, int, int] FourPoint = Tuple[Tuple[int, int], Tuple[int, int], Tuple[int, int], Tuple[int, int]] def four_point_transform(image: np.ndarray, rect: FourPoint) -> np.ndarray: (tl, tr, br, bl) = rect widthA = np.sqrt(((br[0] - bl[0]) ** 2) + ((br[1] - bl[1]) ** 2)) widthB = np.sqrt(((tr[0] - tl[0]) ** 2) + ((tr[1] - tl[1]) ** 2)) maxWidth = max(int(widthA), int(widthB)) # compute the height of the new image, which will be the # maximum distance between the top-right and bottom-right # y-coordinates or the top-left and bottom-left y-coordinates heightA = np.sqrt(((tr[0] - br[0]) ** 2) + ((tr[1] - br[1]) ** 2)) heightB = np.sqrt(((tl[0] - bl[0]) ** 2) + ((tl[1] - bl[1]) ** 2)) maxHeight = max(int(heightA), int(heightB)) dst = np.array( [[0, 0], [maxWidth - 1, 0], [maxWidth - 1, maxHeight - 1], [0, maxHeight - 1]], dtype="float32", ) # compute the perspective transform matrix and then apply it M = cv2.getPerspectiveTransform(rect, dst) warped = cv2.warpPerspective(image, M, (maxWidth, maxHeight)) return warped def get_images(img: str, reader: ReaderLite, **kwargs): results = reader.process(img, **kwargs) return results def draw_boxes(image, bounds, color='red', width=4): draw = ImageDraw.Draw(image) for i, bound in enumerate(bounds): p0, p1, p2, p3 = bound draw.text((p0[0]+5, p0[1]+5), str(i+1), fill=color, align='center') draw.line([*p0, *p1, *p2, *p3, *p0], fill=color, width=width) return image def encode_text(task, text, length=None, append_bos=False, append_eos=False): bos_item = torch.LongTensor([task.src_dict.bos()]) eos_item = torch.LongTensor([task.src_dict.eos()]) s = task.tgt_dict.encode_line( line=task.bpe.encode(text), add_if_not_exist=False, append_eos=False ).long() if length is not None: s = s[:length] if append_bos: s = torch.cat([bos_item, s]) if append_eos: s = torch.cat([s, eos_item]) return s def patch_resize_transform(patch_image_size=480, is_document=False): _patch_resize_transform = transforms.Compose( [ lambda image: ocr_resize( image, patch_image_size, is_document=is_document, split='test', ), transforms.ToTensor(), transforms.Normalize(mean=mean, std=std), ] ) return _patch_resize_transform # reader = ReaderLite(gpu=True) ocr_detection = pipeline(Tasks.ocr_detection, model='damo/cv_resnet18_ocr-detection-line-level_damo') overrides={"eval_cider": False, "beam": 5, "max_len_b": 64, "patch_image_size": 480, "orig_patch_image_size": 224, "no_repeat_ngram_size": 0, "seed": 42} models, cfg, task = checkpoint_utils.load_model_ensemble_and_task( utils.split_paths('checkpoints/ocr_general_clean.pt'), arg_overrides=overrides ) # Move models to GPU for model in models: model.eval() if use_fp16: model.half() if use_cuda and not cfg.distributed_training.pipeline_model_parallel: model.cuda() model.prepare_for_inference_(cfg) # Initialize generator generator = task.build_generator(models, cfg.generation) bos_item = torch.LongTensor([task.src_dict.bos()]) eos_item = torch.LongTensor([task.src_dict.eos()]) pad_idx = task.src_dict.pad() # Construct input for caption task def construct_sample(task, image: Image, patch_image_size=480): patch_image = patch_resize_transform(patch_image_size)(image).unsqueeze(0) patch_mask = torch.tensor([True]) src_text = encode_text(task, "图片上的文字是什么?", append_bos=True, append_eos=True).unsqueeze(0) src_length = torch.LongTensor([s.ne(pad_idx).long().sum() for s in src_text]) sample = { "id":np.array(['42']), "net_input": { "src_tokens": src_text, "src_lengths": src_length, "patch_images": patch_image, "patch_masks": patch_mask, }, "target": None } return sample # Function to turn FP32 to FP16 def apply_half(t): if t.dtype is torch.float32: return t.to(dtype=torch.half) return t def ocr(img): boxes = ocr_detection(img)[OutputKeys.POLYGONS] image = cv2.imread(img) out_img = Image.open(img) ocr_result = list() for i, box in boxes: # 因为检测结果是四边形,所以用透视变化转为长方形 post1 = box.reshape((4, 2)).astype(np.float32) width = box[4] - box[0] height = box[5] - box[1] post2 = np.float32([[0, 0], [width, 0], [width, height], [0, height]]) M = cv2.getPerspectiveTransform(post1, post2) new_img = cv2.warpPerspective(image, M, (width, height)) new_img_pil = Image.fromarray(cv2.cvtColor(new_img, cv2.COLOR_BGR2RGB)) # 开启文字识别 sample = construct_sample(task, new_img_pil, cfg.task.patch_image_size) sample = utils.move_to_cuda(sample) if use_cuda else sample sample = utils.apply_to_sample(apply_half, sample) if use_fp16 else sample with torch.no_grad(): result, scores = eval_step(task, generator, models, sample) ocr_result.append([str(i+1), result[0]['ocr'].replace(' ', '')]) result = pd.DataFrame(ocr_result, columns=['Box ID', 'Text']) # results = get_images(img, reader, text_confidence=0.7, text_threshold=0.4, # link_threshold=0.43, slope_ths=0., add_margin=0.02) # box_list, image_list = zip(*results) draw_boxes(out_img, boxes) # # ocr_result = [] # for i, (box, image) in enumerate(zip(box_list, image_list)): # image = Image.fromarray(image) # sample = construct_sample(task, image, cfg.task.patch_image_size) # sample = utils.move_to_cuda(sample) if use_cuda else sample # sample = utils.apply_to_sample(apply_half, sample) if use_fp16 else sample # # with torch.no_grad(): # result, scores = eval_step(task, generator, models, sample) # ocr_result.append([str(i+1), result[0]['ocr'].replace(' ', '')]) # # result = pd.DataFrame(ocr_result, columns=['Box ID', 'Text']) return out_img, result title = "Chinese OCR" description = "Gradio Demo for Chinese OCR based on OFA. "\ "Upload your own image or click any one of the examples, and click " \ "\"Submit\" and then wait for the generated OCR result. " \ "\n中文OCR体验区。欢迎上传图片,静待检测文字返回~" article = "

OFA Github " \ "Repo

" examples = [['shupai.png'], ['chinese.jpg'], ['gaidao.jpeg'], ['qiaodaima.png'], ['benpao.jpeg'], ['wanli.png'], ['xsd.jpg']] io = gr.Interface(fn=ocr, inputs=gr.inputs.Image(type='filepath', label='Image'), outputs=[gr.outputs.Image(type='pil', label='Image'), gr.outputs.Dataframe(headers=['Box ID', 'Text'], type='pandas', label='OCR Results')], title=title, description=description, article=article, examples=examples) io.launch()