import re import os import copy import base64 import magic from dataclasses import dataclass from typing import List import numpy as np from io import BytesIO class HuChunker: @dataclass class Fields: text_chunks: List = None table_chunks: List = None def __init__(self): self.MAX_LVL = 12 self.proj_patt = [ (r"第[零一二三四五六七八九十百]+章", 1), (r"第[零一二三四五六七八九十百]+[条节]", 2), (r"[零一二三四五六七八九十百]+[、 ]", 3), (r"[\((][零一二三四五六七八九十百]+[)\)]", 4), (r"[0-9]+(、|\.[ ]|\.[^0-9])", 5), (r"[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 6), (r"[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 7), (r"[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+(、|[ ]|[^0-9])", 8), (r".{,48}[::??]@", 9), (r"[0-9]+)", 10), (r"[\((][0-9]+[)\)]", 11), (r"[零一二三四五六七八九十百]+是", 12), (r"[⚫•➢✓ ]", 12) ] self.lines = [] def _garbage(self, txt): patt = [ r"(在此保证|不得以任何形式翻版|请勿传阅|仅供内部使用|未经事先书面授权)", r"(版权(归本公司)*所有|免责声明|保留一切权力|承担全部责任|特别声明|报告中涉及)", r"(不承担任何责任|投资者的通知事项:|任何机构和个人|本报告仅为|不构成投资)", r"(不构成对任何个人或机构投资建议|联系其所在国家|本报告由从事证券交易)", r"(本研究报告由|「认可投资者」|所有研究报告均以|请发邮件至)", r"(本报告仅供|市场有风险,投资需谨慎|本报告中提及的)", r"(本报告反映|此信息仅供|证券分析师承诺|具备证券投资咨询业务资格)", r"^(时间|签字|签章)[::]", r"(参考文献|目录索引|图表索引)", r"[ ]*年[ ]+月[ ]+日", r"^(中国证券业协会|[0-9]+年[0-9]+月[0-9]+日)$", r"\.{10,}", r"(———————END|帮我转发|欢迎收藏|快来关注我吧)" ] return any([re.search(p, txt) for p in patt]) def _proj_match(self, line): for p, j in self.proj_patt: if re.match(p, line): return j return def _does_proj_match(self): mat = [None for _ in range(len(self.lines))] for i in range(len(self.lines)): mat[i] = self._proj_match(self.lines[i]) return mat def naive_text_chunk(self, text, ti="", MAX_LEN=612): if text: self.lines = [l.strip().replace(u'\u3000', u' ') .replace(u'\xa0', u'') for l in text.split("\n\n")] self.lines = [l for l in self.lines if not self._garbage(l)] self.lines = [re.sub(r"([ ]+| )", " ", l) for l in self.lines if l] if not self.lines: return [] arr = self.lines res = [""] i = 0 while i < len(arr): a = arr[i] if not a: i += 1 continue if len(a) > MAX_LEN: a_ = a.split("\n") if len(a_) >= 2: arr.pop(i) for j in range(2, len(a_) + 1): if len("\n".join(a_[:j])) >= MAX_LEN: arr.insert(i, "\n".join(a_[:j - 1])) arr.insert(i + 1, "\n".join(a_[j - 1:])) break else: assert False, f"Can't split: {a}" continue if len(res[-1]) < MAX_LEN / 3: res[-1] += "\n" + a else: res.append(a) i += 1 if ti: for i in range(len(res)): if res[i].find("——来自") >= 0: continue res[i] += f"\t——来自“{ti}”" return res def _merge(self): # merge continuous same level text lines = [self.lines[0]] if self.lines else [] for i in range(1, len(self.lines)): if self.mat[i] == self.mat[i - 1] \ and len(lines[-1]) < 256 \ and len(self.lines[i]) < 256: lines[-1] += "\n" + self.lines[i] continue lines.append(self.lines[i]) self.lines = lines self.mat = self._does_proj_match() return self.mat def text_chunks(self, text): if text: self.lines = [l.strip().replace(u'\u3000', u' ') .replace(u'\xa0', u'') for l in re.split(r"[\r\n]", text)] self.lines = [l for l in self.lines if not self._garbage(l)] self.lines = [l for l in self.lines if l] self.mat = self._does_proj_match() mat = self._merge() tree = [] for i in range(len(self.lines)): tree.append({"proj": mat[i], "children": [], "read": False}) # find all children for i in range(len(self.lines) - 1): if tree[i]["proj"] is None: continue ed = i + 1 while ed < len(tree) and (tree[ed]["proj"] is None or tree[ed]["proj"] > tree[i]["proj"]): ed += 1 nxt = tree[i]["proj"] + 1 st = set([p["proj"] for p in tree[i + 1: ed] if p["proj"]]) while nxt not in st: nxt += 1 if nxt > self.MAX_LVL: break if nxt <= self.MAX_LVL: for j in range(i + 1, ed): if tree[j]["proj"] is not None: break tree[i]["children"].append(j) for j in range(i + 1, ed): if tree[j]["proj"] != nxt: continue tree[i]["children"].append(j) else: for j in range(i + 1, ed): tree[i]["children"].append(j) # get DFS combinations, find all the paths to leaf paths = [] def dfs(i, path): nonlocal tree, paths path.append(i) tree[i]["read"] = True if len(self.lines[i]) > 256: paths.append(path) return if not tree[i]["children"]: if len(path) > 1 or len(self.lines[i]) >= 32: paths.append(path) return for j in tree[i]["children"]: dfs(j, copy.deepcopy(path)) for i, t in enumerate(tree): if t["read"]: continue dfs(i, []) # concat txt on the path for all paths res = [] lines = np.array(self.lines) for p in paths: if len(p) < 2: tree[p[0]]["read"] = False continue txt = "\n".join(lines[p[:-1]]) + "\n" + lines[p[-1]] res.append(txt) # concat continuous orphans assert len(tree) == len(lines) ii = 0 while ii < len(tree): if tree[ii]["read"]: ii += 1 continue txt = lines[ii] e = ii + 1 while e < len(tree) and not tree[e]["read"] and len(txt) < 256: txt += "\n" + lines[e] e += 1 res.append(txt) ii = e # if the node has not been read, find its daddy def find_daddy(st): nonlocal lines, tree proj = tree[st]["proj"] if len(self.lines[st]) > 512: return [st] if proj is None: proj = self.MAX_LVL + 1 for i in range(st - 1, -1, -1): if tree[i]["proj"] and tree[i]["proj"] < proj: a = [st] + find_daddy(i) return a return [] return res class PdfChunker(HuChunker): def __init__(self, pdf_parser): self.pdf = pdf_parser super().__init__() def tableHtmls(self, pdfnm): _, tbls = self.pdf(pdfnm, return_html=True) res = [] for img, arr in tbls: if arr[0].find("