# -*- coding: utf-8 -*- import os import random import xgboost as xgb from io import BytesIO import torch import re import pdfplumber import logging from PIL import Image, ImageDraw import numpy as np from timeit import default_timer as timer from PyPDF2 import PdfReader as pdf2_read from deepdoc.utils.file_utils import get_project_base_directory from deepdoc.vision import OCR, Recognizer, LayoutRecognizer, TableStructureRecognizer from deepdoc.utils import rag_tokenizer from copy import deepcopy from huggingface_hub import snapshot_download logging.getLogger("pdfminer").setLevel(logging.WARNING) class RAGFlowPdfParser: def __init__(self): self.ocr = OCR() if hasattr(self, "model_speciess"): self.layouter = LayoutRecognizer("layout." + self.model_speciess) else: self.layouter = LayoutRecognizer("layout") self.tbl_det = TableStructureRecognizer() self.updown_cnt_mdl = xgb.Booster() if torch.cuda.is_available(): self.updown_cnt_mdl.set_param({"device": "cuda"}) try: model_dir = os.path.join( get_project_base_directory(), "rag/res/deepdoc") self.updown_cnt_mdl.load_model(os.path.join( model_dir, "updown_concat_xgb.model")) except Exception as e: model_dir = snapshot_download( repo_id="InfiniFlow/text_concat_xgb_v1.0", local_dir=os.path.join(get_project_base_directory(), "rag/res/deepdoc"), local_dir_use_symlinks=False) self.updown_cnt_mdl.load_model(os.path.join( model_dir, "updown_concat_xgb.model")) self.page_from = 0 """ If you have trouble downloading HuggingFace models, -_^ this might help!! For Linux: export HF_ENDPOINT=https://hf-mirror.com For Windows: Good luck ^_- """ def __char_width(self, c): return (c["x1"] - c["x0"]) // max(len(c["text"]), 1) def __height(self, c): return c["bottom"] - c["top"] def _x_dis(self, a, b): return min(abs(a["x1"] - b["x0"]), abs(a["x0"] - b["x1"]), abs(a["x0"] + a["x1"] - b["x0"] - b["x1"]) / 2) def _y_dis( self, a, b): return ( b["top"] + b["bottom"] - a["top"] - a["bottom"]) / 2 def _match_proj(self, b): proj_patt = [ r"第[零一二三四五六七八九十百]+章", r"第[零一二三四五六七八九十百]+[条节]", r"[零一二三四五六七八九十百]+[、是  ]", r"[\((][零一二三四五六七八九十百]+[)\)]", r"[\((][0-9]+[)\)]", r"[0-9]+(、|\.[  ]|)|\.[^0-9./a-zA-Z_%><-]{4,})", r"[0-9]+\.[0-9.]+(、|\.[  ])", r"[⚫•➢①② ]", ] return any([re.match(p, b["text"]) for p in proj_patt]) def _updown_concat_features(self, up, down): w = max(self.__char_width(up), self.__char_width(down)) h = max(self.__height(up), self.__height(down)) y_dis = self._y_dis(up, down) LEN = 6 tks_down = rag_tokenizer.tokenize(down["text"][:LEN]).split(" ") tks_up = rag_tokenizer.tokenize(up["text"][-LEN:]).split(" ") tks_all = up["text"][-LEN:].strip() \ + (" " if re.match(r"[a-zA-Z0-9]+", up["text"][-1] + down["text"][0]) else "") \ + down["text"][:LEN].strip() tks_all = rag_tokenizer.tokenize(tks_all).split(" ") fea = [ up.get("R", -1) == down.get("R", -1), y_dis / h, down["page_number"] - up["page_number"], up["layout_type"] == down["layout_type"], up["layout_type"] == "text", down["layout_type"] == "text", up["layout_type"] == "table", down["layout_type"] == "table", True if re.search( r"([。?!;!?;+))]|[a-z]\.)$", up["text"]) else False, True if re.search(r"[,:‘“、0-9(+-]$", up["text"]) else False, True if re.search( r"(^.?[/,?;:\],。;:’”?!》】)-])", down["text"]) else False, True if re.match(r"[\((][^\(\)()]+[)\)]$", up["text"]) else False, True if re.search(r"[,,][^。.]+$", up["text"]) else False, True if re.search(r"[,,][^。.]+$", up["text"]) else False, True if re.search(r"[\((][^\))]+$", up["text"]) and re.search(r"[\))]", down["text"]) else False, self._match_proj(down), True if re.match(r"[A-Z]", down["text"]) else False, True if re.match(r"[A-Z]", up["text"][-1]) else False, True if re.match(r"[a-z0-9]", up["text"][-1]) else False, True if re.match(r"[0-9.%,-]+$", down["text"]) else False, up["text"].strip()[-2:] == down["text"].strip()[-2:] if len(up["text"].strip() ) > 1 and len( down["text"].strip()) > 1 else False, up["x0"] > down["x1"], abs(self.__height(up) - self.__height(down)) / min(self.__height(up), self.__height(down)), self._x_dis(up, down) / max(w, 0.000001), (len(up["text"]) - len(down["text"])) / max(len(up["text"]), len(down["text"])), len(tks_all) - len(tks_up) - len(tks_down), len(tks_down) - len(tks_up), tks_down[-1] == tks_up[-1], max(down["in_row"], up["in_row"]), abs(down["in_row"] - up["in_row"]), len(tks_down) == 1 and rag_tokenizer.tag(tks_down[0]).find("n") >= 0, len(tks_up) == 1 and rag_tokenizer.tag(tks_up[0]).find("n") >= 0 ] return fea @staticmethod def sort_X_by_page(arr, threashold): # sort using y1 first and then x1 arr = sorted(arr, key=lambda r: (r["page_number"], r["x0"], r["top"])) for i in range(len(arr) - 1): for j in range(i, -1, -1): # restore the order using th if abs(arr[j + 1]["x0"] - arr[j]["x0"]) < threashold \ and arr[j + 1]["top"] < arr[j]["top"] \ and arr[j + 1]["page_number"] == arr[j]["page_number"]: tmp = arr[j] arr[j] = arr[j + 1] arr[j + 1] = tmp return arr def _has_color(self, o): if o.get("ncs", "") == "DeviceGray": if o["stroking_color"] and o["stroking_color"][0] == 1 and o["non_stroking_color"] and \ o["non_stroking_color"][0] == 1: if re.match(r"[a-zT_\[\]\(\)-]+", o.get("text", "")): return False return True def _table_transformer_job(self, ZM): logging.info("Table processing...") imgs, pos = [], [] tbcnt = [0] MARGIN = 10 self.tb_cpns = [] assert len(self.page_layout) == len(self.page_images) for p, tbls in enumerate(self.page_layout): # for page tbls = [f for f in tbls if f["type"] == "table"] tbcnt.append(len(tbls)) if not tbls: continue for tb in tbls: # for table left, top, right, bott = tb["x0"] - MARGIN, tb["top"] - MARGIN, \ tb["x1"] + MARGIN, tb["bottom"] + MARGIN left *= ZM top *= ZM right *= ZM bott *= ZM pos.append((left, top)) imgs.append(self.page_images[p].crop((left, top, right, bott))) assert len(self.page_images) == len(tbcnt) - 1 if not imgs: return recos = self.tbl_det(imgs) tbcnt = np.cumsum(tbcnt) for i in range(len(tbcnt) - 1): # for page pg = [] for j, tb_items in enumerate( recos[tbcnt[i]: tbcnt[i + 1]]): # for table poss = pos[tbcnt[i]: tbcnt[i + 1]] for it in tb_items: # for table components it["x0"] = (it["x0"] + poss[j][0]) it["x1"] = (it["x1"] + poss[j][0]) it["top"] = (it["top"] + poss[j][1]) it["bottom"] = (it["bottom"] + poss[j][1]) for n in ["x0", "x1", "top", "bottom"]: it[n] /= ZM it["top"] += self.page_cum_height[i] it["bottom"] += self.page_cum_height[i] it["pn"] = i it["layoutno"] = j pg.append(it) self.tb_cpns.extend(pg) def gather(kwd, fzy=10, ption=0.6): eles = Recognizer.sort_Y_firstly( [r for r in self.tb_cpns if re.match(kwd, r["label"])], fzy) eles = Recognizer.layouts_cleanup(self.boxes, eles, 5, ption) return Recognizer.sort_Y_firstly(eles, 0) # add R,H,C,SP tag to boxes within table layout headers = gather(r".*header$") rows = gather(r".* (row|header)") spans = gather(r".*spanning") clmns = sorted([r for r in self.tb_cpns if re.match( r"table column$", r["label"])], key=lambda x: (x["pn"], x["layoutno"], x["x0"])) clmns = Recognizer.layouts_cleanup(self.boxes, clmns, 5, 0.5) for b in self.boxes: if b.get("layout_type", "") != "table": continue ii = Recognizer.find_overlapped_with_threashold(b, rows, thr=0.3) if ii is not None: b["R"] = ii b["R_top"] = rows[ii]["top"] b["R_bott"] = rows[ii]["bottom"] ii = Recognizer.find_overlapped_with_threashold( b, headers, thr=0.3) if ii is not None: b["H_top"] = headers[ii]["top"] b["H_bott"] = headers[ii]["bottom"] b["H_left"] = headers[ii]["x0"] b["H_right"] = headers[ii]["x1"] b["H"] = ii ii = Recognizer.find_horizontally_tightest_fit(b, clmns) if ii is not None: b["C"] = ii b["C_left"] = clmns[ii]["x0"] b["C_right"] = clmns[ii]["x1"] ii = Recognizer.find_overlapped_with_threashold(b, spans, thr=0.3) if ii is not None: b["H_top"] = spans[ii]["top"] b["H_bott"] = spans[ii]["bottom"] b["H_left"] = spans[ii]["x0"] b["H_right"] = spans[ii]["x1"] b["SP"] = ii def __ocr(self, pagenum, img, chars, ZM=3): bxs = self.ocr.detect(np.array(img)) if not bxs: self.boxes.append([]) return bxs = [(line[0], line[1][0]) for line in bxs] bxs = Recognizer.sort_Y_firstly( [{"x0": b[0][0] / ZM, "x1": b[1][0] / ZM, "top": b[0][1] / ZM, "text": "", "txt": t, "bottom": b[-1][1] / ZM, "page_number": pagenum} for b, t in bxs if b[0][0] <= b[1][0] and b[0][1] <= b[-1][1]], self.mean_height[-1] / 3 ) # merge chars in the same rect for c in Recognizer.sort_X_firstly( chars, self.mean_width[pagenum - 1] // 4): ii = Recognizer.find_overlapped(c, bxs) if ii is None: self.lefted_chars.append(c) continue ch = c["bottom"] - c["top"] bh = bxs[ii]["bottom"] - bxs[ii]["top"] if abs(ch - bh) / max(ch, bh) >= 0.7 and c["text"] != ' ': self.lefted_chars.append(c) continue if c["text"] == " " and bxs[ii]["text"]: if re.match(r"[0-9a-zA-Z,.?;:!%%]", bxs[ii]["text"][-1]): bxs[ii]["text"] += " " else: bxs[ii]["text"] += c["text"] for b in bxs: if not b["text"]: left, right, top, bott = b["x0"] * ZM, b["x1"] * \ ZM, b["top"] * ZM, b["bottom"] * ZM b["text"] = self.ocr.recognize(np.array(img), np.array([[left, top], [right, top], [right, bott], [left, bott]], dtype=np.float32)) del b["txt"] bxs = [b for b in bxs if b["text"]] if self.mean_height[-1] == 0: self.mean_height[-1] = np.median([b["bottom"] - b["top"] for b in bxs]) self.boxes.append(bxs) def _layouts_rec(self, ZM, drop=True): assert len(self.page_images) == len(self.boxes) self.boxes, self.page_layout = self.layouter( self.page_images, self.boxes, ZM, drop=drop) # cumlative Y for i in range(len(self.boxes)): self.boxes[i]["top"] += \ self.page_cum_height[self.boxes[i]["page_number"] - 1] self.boxes[i]["bottom"] += \ self.page_cum_height[self.boxes[i]["page_number"] - 1] def _text_merge(self): # merge adjusted boxes bxs = self.boxes def end_with(b, txt): txt = txt.strip() tt = b.get("text", "").strip() return tt and tt.find(txt) == len(tt) - len(txt) def start_with(b, txts): tt = b.get("text", "").strip() return tt and any([tt.find(t.strip()) == 0 for t in txts]) # horizontally merge adjacent box with the same layout i = 0 while i < len(bxs) - 1: b = bxs[i] b_ = bxs[i + 1] if b.get("layoutno", "0") != b_.get("layoutno", "1") or b.get("layout_type", "") in ["table", "figure", "equation"]: i += 1 continue if abs(self._y_dis(b, b_) ) < self.mean_height[bxs[i]["page_number"] - 1] / 3: # merge bxs[i]["x1"] = b_["x1"] bxs[i]["top"] = (b["top"] + b_["top"]) / 2 bxs[i]["bottom"] = (b["bottom"] + b_["bottom"]) / 2 bxs[i]["text"] += b_["text"] bxs.pop(i + 1) continue i += 1 continue dis_thr = 1 dis = b["x1"] - b_["x0"] if b.get("layout_type", "") != "text" or b_.get( "layout_type", "") != "text": if end_with(b, ",") or start_with(b_, "(,"): dis_thr = -8 else: i += 1 continue if abs(self._y_dis(b, b_)) < self.mean_height[bxs[i]["page_number"] - 1] / 5 \ and dis >= dis_thr and b["x1"] < b_["x1"]: # merge bxs[i]["x1"] = b_["x1"] bxs[i]["top"] = (b["top"] + b_["top"]) / 2 bxs[i]["bottom"] = (b["bottom"] + b_["bottom"]) / 2 bxs[i]["text"] += b_["text"] bxs.pop(i + 1) continue i += 1 self.boxes = bxs def _naive_vertical_merge(self): bxs = Recognizer.sort_Y_firstly( self.boxes, np.median( self.mean_height) / 3) i = 0 while i + 1 < len(bxs): b = bxs[i] b_ = bxs[i + 1] if b["page_number"] < b_["page_number"] and re.match( r"[0-9 •一—-]+$", b["text"]): bxs.pop(i) continue if not b["text"].strip(): bxs.pop(i) continue concatting_feats = [ b["text"].strip()[-1] in ",;:'\",、‘“;:-", len(b["text"].strip()) > 1 and b["text"].strip( )[-2] in ",;:'\",‘“、;:", b["text"].strip()[0] in "。;?!?”)),,、:", ] # features for not concating feats = [ b.get("layoutno", 0) != b_.get("layoutno", 0), b["text"].strip()[-1] in "。?!?", self.is_english and b["text"].strip()[-1] in ".!?", b["page_number"] == b_["page_number"] and b_["top"] - b["bottom"] > self.mean_height[b["page_number"] - 1] * 1.5, b["page_number"] < b_["page_number"] and abs( b["x0"] - b_["x0"]) > self.mean_width[b["page_number"] - 1] * 4, ] # split features detach_feats = [b["x1"] < b_["x0"], b["x0"] > b_["x1"]] if (any(feats) and not any(concatting_feats)) or any(detach_feats): print( b["text"], b_["text"], any(feats), any(concatting_feats), any(detach_feats)) i += 1 continue # merge up and down b["bottom"] = b_["bottom"] b["text"] += b_["text"] b["x0"] = min(b["x0"], b_["x0"]) b["x1"] = max(b["x1"], b_["x1"]) bxs.pop(i + 1) self.boxes = bxs def _concat_downward(self, concat_between_pages=True): # count boxes in the same row as a feature for i in range(len(self.boxes)): mh = self.mean_height[self.boxes[i]["page_number"] - 1] self.boxes[i]["in_row"] = 0 j = max(0, i - 12) while j < min(i + 12, len(self.boxes)): if j == i: j += 1 continue ydis = self._y_dis(self.boxes[i], self.boxes[j]) / mh if abs(ydis) < 1: self.boxes[i]["in_row"] += 1 elif ydis > 0: break j += 1 # concat between rows boxes = deepcopy(self.boxes) blocks = [] while boxes: chunks = [] def dfs(up, dp): chunks.append(up) i = dp while i < min(dp + 12, len(boxes)): ydis = self._y_dis(up, boxes[i]) smpg = up["page_number"] == boxes[i]["page_number"] mh = self.mean_height[up["page_number"] - 1] mw = self.mean_width[up["page_number"] - 1] if smpg and ydis > mh * 4: break if not smpg and ydis > mh * 16: break down = boxes[i] if not concat_between_pages and down["page_number"] > up["page_number"]: break if up.get("R", "") != down.get( "R", "") and up["text"][-1] != ",": i += 1 continue if re.match(r"[0-9]{2,3}/[0-9]{3}$", up["text"]) \ or re.match(r"[0-9]{2,3}/[0-9]{3}$", down["text"]) \ or not down["text"].strip(): i += 1 continue if not down["text"].strip(): i += 1 continue if up["x1"] < down["x0"] - 10 * \ mw or up["x0"] > down["x1"] + 10 * mw: i += 1 continue if i - dp < 5 and up.get("layout_type") == "text": if up.get("layoutno", "1") == down.get( "layoutno", "2"): dfs(down, i + 1) boxes.pop(i) return i += 1 continue fea = self._updown_concat_features(up, down) if self.updown_cnt_mdl.predict( xgb.DMatrix([fea]))[0] <= 0.5: i += 1 continue dfs(down, i + 1) boxes.pop(i) return dfs(boxes[0], 1) boxes.pop(0) if chunks: blocks.append(chunks) # concat within each block boxes = [] for b in blocks: if len(b) == 1: boxes.append(b[0]) continue t = b[0] for c in b[1:]: t["text"] = t["text"].strip() c["text"] = c["text"].strip() if not c["text"]: continue if t["text"] and re.match( r"[0-9\.a-zA-Z]+$", t["text"][-1] + c["text"][-1]): t["text"] += " " t["text"] += c["text"] t["x0"] = min(t["x0"], c["x0"]) t["x1"] = max(t["x1"], c["x1"]) t["page_number"] = min(t["page_number"], c["page_number"]) t["bottom"] = c["bottom"] if not t["layout_type"] \ and c["layout_type"]: t["layout_type"] = c["layout_type"] boxes.append(t) self.boxes = Recognizer.sort_Y_firstly(boxes, 0) def _filter_forpages(self): if not self.boxes: return findit = False i = 0 while i < len(self.boxes): if not re.match(r"(contents|目录|目次|table of contents|致谢|acknowledge)$", re.sub(r"( | |\u3000)+", "", self.boxes[i]["text"].lower())): i += 1 continue findit = True eng = re.match( r"[0-9a-zA-Z :'.-]{5,}", self.boxes[i]["text"].strip()) self.boxes.pop(i) if i >= len(self.boxes): break prefix = self.boxes[i]["text"].strip()[:3] if not eng else " ".join( self.boxes[i]["text"].strip().split(" ")[:2]) while not prefix: self.boxes.pop(i) if i >= len(self.boxes): break prefix = self.boxes[i]["text"].strip()[:3] if not eng else " ".join( self.boxes[i]["text"].strip().split(" ")[:2]) self.boxes.pop(i) if i >= len(self.boxes) or not prefix: break for j in range(i, min(i + 128, len(self.boxes))): if not re.match(prefix, self.boxes[j]["text"]): continue for k in range(i, j): self.boxes.pop(i) break if findit: return page_dirty = [0] * len(self.page_images) for b in self.boxes: if re.search(r"(··|··|··)", b["text"]): page_dirty[b["page_number"] - 1] += 1 page_dirty = set([i + 1 for i, t in enumerate(page_dirty) if t > 3]) if not page_dirty: return i = 0 while i < len(self.boxes): if self.boxes[i]["page_number"] in page_dirty: self.boxes.pop(i) continue i += 1 def _merge_with_same_bullet(self): i = 0 while i + 1 < len(self.boxes): b = self.boxes[i] b_ = self.boxes[i + 1] if not b["text"].strip(): self.boxes.pop(i) continue if not b_["text"].strip(): self.boxes.pop(i + 1) continue if b["text"].strip()[0] != b_["text"].strip()[0] \ or b["text"].strip()[0].lower() in set("qwertyuopasdfghjklzxcvbnm") \ or rag_tokenizer.is_chinese(b["text"].strip()[0]) \ or b["top"] > b_["bottom"]: i += 1 continue b_["text"] = b["text"] + "\n" + b_["text"] b_["x0"] = min(b["x0"], b_["x0"]) b_["x1"] = max(b["x1"], b_["x1"]) b_["top"] = b["top"] self.boxes.pop(i) def _extract_table_figure(self, need_image, ZM, return_html, need_position): tables = {} figures = {} # extract figure and table boxes i = 0 lst_lout_no = "" nomerge_lout_no = [] while i < len(self.boxes): if "layoutno" not in self.boxes[i]: i += 1 continue lout_no = str(self.boxes[i]["page_number"]) + \ "-" + str(self.boxes[i]["layoutno"]) if TableStructureRecognizer.is_caption(self.boxes[i]) or self.boxes[i]["layout_type"] in ["table caption", "title", "figure caption", "reference"]: nomerge_lout_no.append(lst_lout_no) if self.boxes[i]["layout_type"] == "table": if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): self.boxes.pop(i) continue if lout_no not in tables: tables[lout_no] = [] tables[lout_no].append(self.boxes[i]) self.boxes.pop(i) lst_lout_no = lout_no continue if need_image and self.boxes[i]["layout_type"] == "figure": if re.match(r"(数据|资料|图表)*来源[:: ]", self.boxes[i]["text"]): self.boxes.pop(i) continue if lout_no not in figures: figures[lout_no] = [] figures[lout_no].append(self.boxes[i]) self.boxes.pop(i) lst_lout_no = lout_no continue i += 1 # merge table on different pages nomerge_lout_no = set(nomerge_lout_no) tbls = sorted([(k, bxs) for k, bxs in tables.items()], key=lambda x: (x[1][0]["top"], x[1][0]["x0"])) i = len(tbls) - 1 while i - 1 >= 0: k0, bxs0 = tbls[i - 1] k, bxs = tbls[i] i -= 1 if k0 in nomerge_lout_no: continue if bxs[0]["page_number"] == bxs0[0]["page_number"]: continue if bxs[0]["page_number"] - bxs0[0]["page_number"] > 1: continue mh = self.mean_height[bxs[0]["page_number"] - 1] if self._y_dis(bxs0[-1], bxs[0]) > mh * 23: continue tables[k0].extend(tables[k]) del tables[k] def x_overlapped(a, b): return not any([a["x1"] < b["x0"], a["x0"] > b["x1"]]) # find captions and pop out i = 0 while i < len(self.boxes): c = self.boxes[i] # mh = self.mean_height[c["page_number"]-1] if not TableStructureRecognizer.is_caption(c): i += 1 continue # find the nearest layouts def nearest(tbls): nonlocal c mink = "" minv = 1000000000 for k, bxs in tbls.items(): for b in bxs: if b.get("layout_type", "").find("caption") >= 0: continue y_dis = self._y_dis(c, b) x_dis = self._x_dis( c, b) if not x_overlapped( c, b) else 0 dis = y_dis * y_dis + x_dis * x_dis if dis < minv: mink = k minv = dis return mink, minv tk, tv = nearest(tables) fk, fv = nearest(figures) # if min(tv, fv) > 2000: # i += 1 # continue if tv < fv and tk: tables[tk].insert(0, c) logging.debug( "TABLE:" + self.boxes[i]["text"] + "; Cap: " + tk) elif fk: figures[fk].insert(0, c) logging.debug( "FIGURE:" + self.boxes[i]["text"] + "; Cap: " + tk) self.boxes.pop(i) res = [] positions = [] def cropout(bxs, ltype, poss): nonlocal ZM pn = set([b["page_number"] - 1 for b in bxs]) if len(pn) < 2: pn = list(pn)[0] ht = self.page_cum_height[pn] b = { "x0": np.min([b["x0"] for b in bxs]), "top": np.min([b["top"] for b in bxs]) - ht, "x1": np.max([b["x1"] for b in bxs]), "bottom": np.max([b["bottom"] for b in bxs]) - ht } louts = [l for l in self.page_layout[pn] if l["type"] == ltype] ii = Recognizer.find_overlapped(b, louts, naive=True) if ii is not None: b = louts[ii] else: logging.warn( f"Missing layout match: {pn + 1},%s" % (bxs[0].get( "layoutno", ""))) left, top, right, bott = b["x0"], b["top"], b["x1"], b["bottom"] if right < left: right = left + 1 poss.append((pn + self.page_from, left, right, top, bott)) return self.page_images[pn] \ .crop((left * ZM, top * ZM, right * ZM, bott * ZM)) pn = {} for b in bxs: p = b["page_number"] - 1 if p not in pn: pn[p] = [] pn[p].append(b) pn = sorted(pn.items(), key=lambda x: x[0]) imgs = [cropout(arr, ltype, poss) for p, arr in pn] pic = Image.new("RGB", (int(np.max([i.size[0] for i in imgs])), int(np.sum([m.size[1] for m in imgs]))), (245, 245, 245)) height = 0 for img in imgs: pic.paste(img, (0, int(height))) height += img.size[1] return pic # crop figure out and add caption for k, bxs in figures.items(): txt = "\n".join([b["text"] for b in bxs]) if not txt: continue poss = [] res.append( (cropout( bxs, "figure", poss), [txt])) positions.append(poss) for k, bxs in tables.items(): if not bxs: continue bxs = Recognizer.sort_Y_firstly(bxs, np.mean( [(b["bottom"] - b["top"]) / 2 for b in bxs])) poss = [] res.append((cropout(bxs, "table", poss), self.tbl_det.construct_table(bxs, html=return_html, is_english=self.is_english))) positions.append(poss) assert len(positions) == len(res) if need_position: return list(zip(res, positions)) return res def proj_match(self, line): if len(line) <= 2: return if re.match(r"[0-9 ().,%%+/-]+$", line): return False for p, j in [ (r"第[零一二三四五六七八九十百]+章", 1), (r"第[零一二三四五六七八九十百]+[条节]", 2), (r"[零一二三四五六七八九十百]+[、  ]", 3), (r"[\((][零一二三四五六七八九十百]+[)\)]", 4), (r"[0-9]+(、|\.[  ]|\.[^0-9])", 5), (r"[0-9]+\.[0-9]+(、|[.  ]|[^0-9])", 6), (r"[0-9]+\.[0-9]+\.[0-9]+(、|[  ]|[^0-9])", 7), (r"[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(、|[  ]|[^0-9])", 8), (r".{,48}[::??]$", 9), (r"[0-9]+)", 10), (r"[\((][0-9]+[)\)]", 11), (r"[零一二三四五六七八九十百]+是", 12), (r"[⚫•➢✓]", 12) ]: if re.match(p, line): return j return def _line_tag(self, bx, ZM): pn = [bx["page_number"]] top = bx["top"] - self.page_cum_height[pn[0] - 1] bott = bx["bottom"] - self.page_cum_height[pn[0] - 1] page_images_cnt = len(self.page_images) if pn[-1] - 1 >= page_images_cnt: return "" while bott * ZM > self.page_images[pn[-1] - 1].size[1]: bott -= self.page_images[pn[-1] - 1].size[1] / ZM pn.append(pn[-1] + 1) if pn[-1] - 1 >= page_images_cnt: return "" return "@@{}\t{:.1f}\t{:.1f}\t{:.1f}\t{:.1f}##" \ .format("-".join([str(p) for p in pn]), bx["x0"], bx["x1"], top, bott) def __filterout_scraps(self, boxes, ZM): def width(b): return b["x1"] - b["x0"] def height(b): return b["bottom"] - b["top"] def usefull(b): if b.get("layout_type"): return True if width( b) > self.page_images[b["page_number"] - 1].size[0] / ZM / 3: return True if b["bottom"] - b["top"] > self.mean_height[b["page_number"] - 1]: return True return False res = [] while boxes: lines = [] widths = [] pw = self.page_images[boxes[0]["page_number"] - 1].size[0] / ZM mh = self.mean_height[boxes[0]["page_number"] - 1] mj = self.proj_match( boxes[0]["text"]) or boxes[0].get( "layout_type", "") == "title" def dfs(line, st): nonlocal mh, pw, lines, widths lines.append(line) widths.append(width(line)) width_mean = np.mean(widths) mmj = self.proj_match( line["text"]) or line.get( "layout_type", "") == "title" for i in range(st + 1, min(st + 20, len(boxes))): if (boxes[i]["page_number"] - line["page_number"]) > 0: break if not mmj and self._y_dis( line, boxes[i]) >= 3 * mh and height(line) < 1.5 * mh: break if not usefull(boxes[i]): continue if mmj or \ (self._x_dis(boxes[i], line) < pw / 10): \ # and abs(width(boxes[i])-width_mean)/max(width(boxes[i]),width_mean)<0.5): # concat following dfs(boxes[i], i) boxes.pop(i) break try: if usefull(boxes[0]): dfs(boxes[0], 0) else: logging.debug("WASTE: " + boxes[0]["text"]) except Exception as e: pass boxes.pop(0) mw = np.mean(widths) if mj or mw / pw >= 0.35 or mw > 200: res.append( "\n".join([c["text"] + self._line_tag(c, ZM) for c in lines])) else: logging.debug("REMOVED: " + "<<".join([c["text"] for c in lines])) return "\n\n".join(res) @staticmethod def total_page_number(fnm, binary=None): try: pdf = pdfplumber.open( fnm) if not binary else pdfplumber.open(BytesIO(binary)) return len(pdf.pages) except Exception as e: logging.error(str(e)) def __images__(self, fnm, zoomin=3, page_from=0, page_to=299, callback=None): self.lefted_chars = [] self.mean_height = [] self.mean_width = [] self.boxes = [] self.garbages = {} self.page_cum_height = [0] self.page_layout = [] self.page_from = page_from st = timer() try: self.pdf = pdfplumber.open(fnm) if isinstance( fnm, str) else pdfplumber.open(BytesIO(fnm)) self.page_images = [p.to_image(resolution=72 * zoomin).annotated for i, p in enumerate(self.pdf.pages[page_from:page_to])] self.page_chars = [[c for c in page.chars if self._has_color(c)] for page in self.pdf.pages[page_from:page_to]] self.total_page = len(self.pdf.pages) except Exception as e: logging.error(str(e)) self.outlines = [] try: self.pdf = pdf2_read(fnm if isinstance(fnm, str) else BytesIO(fnm)) outlines = self.pdf.outline def dfs(arr, depth): for a in arr: if isinstance(a, dict): self.outlines.append((a["/Title"], depth)) continue dfs(a, depth + 1) dfs(outlines, 0) except Exception as e: logging.warning(f"Outlines exception: {e}") if not self.outlines: logging.warning(f"Miss outlines") logging.info("Images converted.") self.is_english = [re.search(r"[a-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join( random.choices([c["text"] for c in self.page_chars[i]], k=min(100, len(self.page_chars[i]))))) for i in range(len(self.page_chars))] if sum([1 if e else 0 for e in self.is_english]) > len( self.page_images) / 2: self.is_english = True else: self.is_english = False self.is_english = False st = timer() for i, img in enumerate(self.page_images): chars = self.page_chars[i] if not self.is_english else [] self.mean_height.append( np.median(sorted([c["height"] for c in chars])) if chars else 0 ) self.mean_width.append( np.median(sorted([c["width"] for c in chars])) if chars else 8 ) self.page_cum_height.append(img.size[1] / zoomin) j = 0 while j + 1 < len(chars): if chars[j]["text"] and chars[j + 1]["text"] \ and re.match(r"[0-9a-zA-Z,.:;!%]+", chars[j]["text"] + chars[j + 1]["text"]) \ and chars[j + 1]["x0"] - chars[j]["x1"] >= min(chars[j + 1]["width"], chars[j]["width"]) / 2: chars[j]["text"] += " " j += 1 self.__ocr(i + 1, img, chars, zoomin) if callback and i % 6 == 5: callback(prog=(i + 1) * 0.6 / len(self.page_images), msg="") # print("OCR:", timer()-st) if not self.is_english and not any( [c for c in self.page_chars]) and self.boxes: bxes = [b for bxs in self.boxes for b in bxs] self.is_english = re.search(r"[\na-zA-Z0-9,/¸;:'\[\]\(\)!@#$%^&*\"?<>._-]{30,}", "".join([b["text"] for b in random.choices(bxes, k=min(30, len(bxes)))])) logging.info(f"Is it English: {self.is_english}") self.page_cum_height = np.cumsum(self.page_cum_height) assert len(self.page_cum_height) == len(self.page_images) + 1 def __call__(self, fnm, need_image=False, zoomin=3, return_html=False): self.__images__(fnm, zoomin) self._layouts_rec(zoomin) self._table_transformer_job(zoomin) self._text_merge() self._concat_downward() self._filter_forpages() tbls = self._extract_table_figure( need_image, zoomin, return_html, False) return self.__filterout_scraps(deepcopy(self.boxes), zoomin), tbls def remove_tag(self, txt): return re.sub(r"@@[\t0-9.-]+?##", "", txt) def crop(self, text, ZM=3, need_position=False): imgs = [] poss = [] for tag in re.findall(r"@@[0-9-]+\t[0-9.\t]+##", text): pn, left, right, top, bottom = tag.strip( "#").strip("@").split("\t") left, right, top, bottom = float(left), float( right), float(top), float(bottom) poss.append(([int(p) - 1 for p in pn.split("-")], left, right, top, bottom)) if not poss: if need_position: return None, None return max_width = max( np.max([right - left for (_, left, right, _, _) in poss]), 6) GAP = 6 pos = poss[0] poss.insert(0, ([pos[0][0]], pos[1], pos[2], max( 0, pos[3] - 120), max(pos[3] - GAP, 0))) pos = poss[-1] poss.append(([pos[0][-1]], pos[1], pos[2], min(self.page_images[pos[0][-1]].size[1] / ZM, pos[4] + GAP), min(self.page_images[pos[0][-1]].size[1] / ZM, pos[4] + 120))) positions = [] for ii, (pns, left, right, top, bottom) in enumerate(poss): right = left + max_width bottom *= ZM for pn in pns[1:]: bottom += self.page_images[pn - 1].size[1] imgs.append( self.page_images[pns[0]].crop((left * ZM, top * ZM, right * ZM, min( bottom, self.page_images[pns[0]].size[1]) )) ) if 0 < ii < len(poss) - 1: positions.append((pns[0] + self.page_from, left, right, top, min( bottom, self.page_images[pns[0]].size[1]) / ZM)) bottom -= self.page_images[pns[0]].size[1] for pn in pns[1:]: imgs.append( self.page_images[pn].crop((left * ZM, 0, right * ZM, min(bottom, self.page_images[pn].size[1]) )) ) if 0 < ii < len(poss) - 1: positions.append((pn + self.page_from, left, right, 0, min( bottom, self.page_images[pn].size[1]) / ZM)) bottom -= self.page_images[pn].size[1] if not imgs: if need_position: return None, None return height = 0 for img in imgs: height += img.size[1] + GAP height = int(height) width = int(np.max([i.size[0] for i in imgs])) pic = Image.new("RGB", (width, height), (245, 245, 245)) height = 0 for ii, img in enumerate(imgs): if ii == 0 or ii + 1 == len(imgs): img = img.convert('RGBA') overlay = Image.new('RGBA', img.size, (0, 0, 0, 0)) overlay.putalpha(128) img = Image.alpha_composite(img, overlay).convert("RGB") pic.paste(img, (0, int(height))) height += img.size[1] + GAP if need_position: return pic, positions return pic def get_position(self, bx, ZM): poss = [] pn = bx["page_number"] top = bx["top"] - self.page_cum_height[pn - 1] bott = bx["bottom"] - self.page_cum_height[pn - 1] poss.append((pn, bx["x0"], bx["x1"], top, min( bott, self.page_images[pn - 1].size[1] / ZM))) while bott * ZM > self.page_images[pn - 1].size[1]: bott -= self.page_images[pn - 1].size[1] / ZM top = 0 pn += 1 poss.append((pn, bx["x0"], bx["x1"], top, min( bott, self.page_images[pn - 1].size[1] / ZM))) return poss class PlainParser(object): def __call__(self, filename, from_page=0, to_page=100000, **kwargs): self.outlines = [] lines = [] try: self.pdf = pdf2_read( filename if isinstance( filename, str) else BytesIO(filename)) for page in self.pdf.pages[from_page:to_page]: lines.extend([t for t in page.extract_text().split("\n")]) outlines = self.pdf.outline def dfs(arr, depth): for a in arr: if isinstance(a, dict): self.outlines.append((a["/Title"], depth)) continue dfs(a, depth + 1) dfs(outlines, 0) except Exception as e: logging.warning(f"Outlines exception: {e}") if not self.outlines: logging.warning(f"Miss outlines") return [(l, "") for l in lines], [] def crop(self, ck, need_position): raise NotImplementedError @staticmethod def remove_tag(txt): raise NotImplementedError if __name__ == "__main__": pass