import concurrent.futures |
import logging |
import re |
import unicodedata |
from enum import Enum |
from string import Template |
from typing import Dict |
import numpy as np |
from pdfminer.converter import PDFConverter |
from pdfminer.layout import LTChar, LTFigure, LTLine, LTPage |
from pdfminer.pdffont import PDFCIDFont, PDFUnicodeNotDefined |
from pdfminer.pdfinterp import PDFGraphicState, PDFResourceManager |
from pdfminer.utils import apply_matrix_pt, mult_matrix |
from pymupdf import Font |
from tenacity import retry, wait_fixed |
from pdf2zh.translator import ( |
AnythingLLMTranslator, |
ArgosTranslator, |
AzureOpenAITranslator, |
AzureTranslator, |
BaseTranslator, |
BingTranslator, |
DeepLTranslator, |
DeepLXTranslator, |
DeepseekTranslator, |
DifyTranslator, |
GeminiTranslator, |
GoogleTranslator, |
GrokTranslator, |
GroqTranslator, |
ModelScopeTranslator, |
OllamaTranslator, |
OpenAIlikedTranslator, |
OpenAITranslator, |
QwenMtTranslator, |
SiliconTranslator, |
TencentTranslator, |
XinferenceTranslator, |
ZhipuTranslator, |
) |
log = logging.getLogger(__name__) |
class PDFConverterEx(PDFConverter): |
def __init__( |
self, |
rsrcmgr: PDFResourceManager, |
) -> None: |
PDFConverter.__init__(self, rsrcmgr, None, "utf-8", 1, None) |
def begin_page(self, page, ctm) -> None: |
(x0, y0, x1, y1) = page.cropbox |
(x0, y0) = apply_matrix_pt(ctm, (x0, y0)) |
(x1, y1) = apply_matrix_pt(ctm, (x1, y1)) |
mediabox = (0, 0, abs(x0 - x1), abs(y0 - y1)) |
self.cur_item = LTPage(page.pageno, mediabox) |
def end_page(self, page): |
return self.receive_layout(self.cur_item) |
def begin_figure(self, name, bbox, matrix) -> None: |
self._stack.append(self.cur_item) |
self.cur_item = LTFigure(name, bbox, mult_matrix(matrix, self.ctm)) |
self.cur_item.pageid = self._stack[-1].pageid |
def end_figure(self, _: str) -> None: |
fig = self.cur_item |
assert isinstance(self.cur_item, LTFigure), str(type(self.cur_item)) |
self.cur_item = self._stack.pop() |
self.cur_item.add(fig) |
return self.receive_layout(fig) |
def render_char( |
self, |
matrix, |
font, |
fontsize: float, |
scaling: float, |
rise: float, |
cid: int, |
ncs, |
graphicstate: PDFGraphicState, |
) -> float: |
try: |
text = font.to_unichr(cid) |
assert isinstance(text, str), str(type(text)) |
except PDFUnicodeNotDefined: |
text = self.handle_undefined_char(font, cid) |
textwidth = font.char_width(cid) |
textdisp = font.char_disp(cid) |
item = LTChar( |
matrix, |
font, |
fontsize, |
scaling, |
rise, |
text, |
textwidth, |
textdisp, |
ncs, |
graphicstate, |
) |
self.cur_item.add(item) |
item.cid = cid |
item.font = font |
return item.adv |
class Paragraph: |
def __init__(self, y, x, x0, x1, y0, y1, size, brk): |
self.y: float = y |
self.x: float = x |
self.x0: float = x0 |
self.x1: float = x1 |
self.y0: float = y0 |
self.y1: float = y1 |
self.size: float = size |
self.brk: bool = brk |
class TranslateConverter(PDFConverterEx): |
def __init__( |
self, |
rsrcmgr, |
vfont: str = None, |
vchar: str = None, |
thread: int = 0, |
layout={}, |
lang_in: str = "", |
lang_out: str = "", |
service: str = "", |
noto_name: str = "", |
noto: Font = None, |
envs: Dict = None, |
prompt: Template = None, |
ignore_cache: bool = False, |
) -> None: |
super().__init__(rsrcmgr) |
self.vfont = vfont |
self.vchar = vchar |
self.thread = thread |
self.layout = layout |
self.noto_name = noto_name |
self.noto = noto |
self.translator: BaseTranslator = None |
param = service.split(":", 1) |
service_name = param[0] |
service_model = param[1] if len(param) > 1 else None |
if not envs: |
envs = {} |
for translator in [GoogleTranslator, BingTranslator, DeepLTranslator, DeepLXTranslator, OllamaTranslator, XinferenceTranslator, AzureOpenAITranslator, |
OpenAITranslator, ZhipuTranslator, ModelScopeTranslator, SiliconTranslator, GeminiTranslator, AzureTranslator, TencentTranslator, DifyTranslator, AnythingLLMTranslator, ArgosTranslator, GrokTranslator, GroqTranslator, DeepseekTranslator, OpenAIlikedTranslator, QwenMtTranslator,]: |
if service_name == translator.name: |
self.translator = translator(lang_in, lang_out, service_model, envs=envs, prompt=prompt, ignore_cache=ignore_cache) |
if not self.translator: |
raise ValueError("Unsupported translation service") |
def receive_layout(self, ltpage: LTPage): |
sstk: list[str] = [] |
pstk: list[Paragraph] = [] |
vbkt: int = 0 |
vstk: list[LTChar] = [] |
vlstk: list[LTLine] = [] |
vfix: float = 0 |
var: list[list[LTChar]] = [] |
varl: list[list[LTLine]] = [] |
varf: list[float] = [] |
vlen: list[float] = [] |
lstk: list[LTLine] = [] |
xt: LTChar = None |
xt_cls: int = -1 |
vmax: float = ltpage.width / 4 |
ops: str = "" |
def vflag(font: str, char: str): |
if isinstance(font, bytes): |
try: |
font = font.decode('utf-8') |
except UnicodeDecodeError: |
font = "" |
font = font.split("+")[-1] |
if re.match(r"\(cid:", char): |
return True |
if self.vfont: |
if re.match(self.vfont, font): |
return True |
else: |
if re.match( |
r"(CM[^R]|MS.M|XY|MT|BL|RM|EU|LA|RS|LINE|LCIRCLE|TeX-|rsfs|txsy|wasy|stmary|.*Mono|.*Code|.*Ital|.*Sym|.*Math)", |
font, |
): |
return True |
if self.vchar: |
if re.match(self.vchar, char): |
return True |
else: |
if ( |
char |
and char != " " |
and ( |
unicodedata.category(char[0]) |
in ["Lm", "Mn", "Sk", "Sm", "Zl", "Zp", "Zs"] |
or ord(char[0]) in range(0x370, 0x400) |
) |
): |
return True |
return False |
for child in ltpage: |
if isinstance(child, LTChar): |
cur_v = False |
layout = self.layout[ltpage.pageid] |
h, w = layout.shape |
cx, cy = np.clip(int(child.x0), 0, w - 1), np.clip(int(child.y0), 0, h - 1) |
cls = layout[cy, cx] |
if child.get_text() == "•": |
cls = 0 |
if ( |
cls == 0 |
or (cls == xt_cls and len(sstk[-1].strip()) > 1 and child.size < pstk[-1].size * 0.79) |
or vflag(child.fontname, child.get_text()) |
or (child.matrix[0] == 0 and child.matrix[3] == 0) |
): |
cur_v = True |
if not cur_v: |
if vstk and child.get_text() == "(": |
cur_v = True |
vbkt += 1 |
if vbkt and child.get_text() == ")": |
cur_v = True |
vbkt -= 1 |
if ( |
not cur_v |
or cls != xt_cls |
or (sstk[-1] != "" and abs(child.x0 - xt.x0) > vmax) |
): |
if vstk: |
if ( |
not cur_v |
and cls == xt_cls |
and child.x0 > max([vch.x0 for vch in vstk]) |
): |
vfix = vstk[0].y0 - child.y0 |
if sstk[-1] == "": |
xt_cls = -1 |
sstk[-1] += f"{{v{len(var)}}}" |
var.append(vstk) |
varl.append(vlstk) |
varf.append(vfix) |
vstk = [] |
vlstk = [] |
vfix = 0 |
if not vstk: |
if cls == xt_cls: |
if child.x0 > xt.x1 + 1: |
sstk[-1] += " " |
elif child.x1 < xt.x0: |
sstk[-1] += " " |
pstk[-1].brk = True |
else: |
sstk.append("") |
pstk.append(Paragraph(child.y0, child.x0, child.x0, child.x0, child.y0, child.y1, child.size, False)) |
if not cur_v: |
if ( |
child.size > pstk[-1].size |
or len(sstk[-1].strip()) == 1 |
) and child.get_text() != " ": |
pstk[-1].y -= child.size - pstk[-1].size |
pstk[-1].size = child.size |
sstk[-1] += child.get_text() |
else: |
if ( |
not vstk |
and cls == xt_cls |
and child.x0 > xt.x0 |
): |
vfix = child.y0 - xt.y0 |
vstk.append(child) |
pstk[-1].x0 = min(pstk[-1].x0, child.x0) |
pstk[-1].x1 = max(pstk[-1].x1, child.x1) |
pstk[-1].y0 = min(pstk[-1].y0, child.y0) |
pstk[-1].y1 = max(pstk[-1].y1, child.y1) |
xt = child |
xt_cls = cls |
elif isinstance(child, LTFigure): |
pass |
elif isinstance(child, LTLine): |
layout = self.layout[ltpage.pageid] |
h, w = layout.shape |
cx, cy = np.clip(int(child.x0), 0, w - 1), np.clip(int(child.y0), 0, h - 1) |
cls = layout[cy, cx] |
if vstk and cls == xt_cls: |
vlstk.append(child) |
else: |
lstk.append(child) |
else: |
pass |
if vstk: |
sstk[-1] += f"{{v{len(var)}}}" |
var.append(vstk) |
varl.append(vlstk) |
varf.append(vfix) |
log.debug("\n==========[VSTACK]==========\n") |
for id, v in enumerate(var): |
l = max([vch.x1 for vch in v]) - v[0].x0 |
log.debug(f'< {l:.1f} {v[0].x0:.1f} {v[0].y0:.1f} {v[0].cid} {v[0].fontname} {len(varl[id])} > v{id} = {"".join([ch.get_text() for ch in v])}') |
vlen.append(l) |
log.debug("\n==========[SSTACK]==========\n") |
@retry(wait=wait_fixed(1)) |
def worker(s: str): |
if not s.strip() or re.match(r"^\{v\d+\}$", s): |
return s |
try: |
new = self.translator.translate(s) |
return new |
except BaseException as e: |
if log.isEnabledFor(logging.DEBUG): |
log.exception(e) |
else: |
log.exception(e, exc_info=False) |
raise e |
with concurrent.futures.ThreadPoolExecutor( |
max_workers=self.thread |
) as executor: |
news = list(executor.map(worker, sstk)) |
def raw_string(fcur: str, cstk: str): |
if fcur == self.noto_name: |
return "".join(["%04x" % self.noto.has_glyph(ord(c)) for c in cstk]) |
elif isinstance(self.fontmap[fcur], PDFCIDFont): |
return "".join(["%04x" % ord(c) for c in cstk]) |
else: |
return "".join(["%02x" % ord(c) for c in cstk]) |
"zh-cn": 1.4, "zh-tw": 1.4, "zh-hans": 1.4, "zh-hant": 1.4, "zh": 1.4, |
"ja": 1.1, "ko": 1.2, "en": 1.2, "ar": 1.0, "ru": 0.8, "uk": 0.8, "ta": 0.8 |
} |
default_line_height = LANG_LINEHEIGHT_MAP.get(self.translator.lang_out.lower(), 1.1) |
_x, _y = 0, 0 |
ops_list = [] |
def gen_op_txt(font, size, x, y, rtxt): |
return f"/{font} {size:f} Tf 1 0 0 1 {x:f} {y:f} Tm [<{rtxt}>] TJ " |
def gen_op_line(x, y, xlen, ylen, linewidth): |
return f"ET q 1 0 0 1 {x:f} {y:f} cm [] 0 d 0 J {linewidth:f} w 0 0 m {xlen:f} {ylen:f} l S Q BT " |
for id, new in enumerate(news): |
x: float = pstk[id].x |
y: float = pstk[id].y |
x0: float = pstk[id].x0 |
x1: float = pstk[id].x1 |
height: float = pstk[id].y1 - pstk[id].y0 |
size: float = pstk[id].size |
brk: bool = pstk[id].brk |
cstk: str = "" |
fcur: str = None |
lidx = 0 |
tx = x |
fcur_ = fcur |
ptr = 0 |
log.debug(f"< {y} {x} {x0} {x1} {size} {brk} > {sstk[id]} | {new}") |
ops_vals: list[dict] = [] |
while ptr < len(new): |
vy_regex = re.match( |
r"\{\s*v([\d\s]+)\}", new[ptr:], re.IGNORECASE |
) |
mod = 0 |
if vy_regex: |
ptr += len(vy_regex.group(0)) |
try: |
vid = int(vy_regex.group(1).replace(" ", "")) |
adv = vlen[vid] |
except Exception: |
continue |
if var[vid][-1].get_text() and unicodedata.category(var[vid][-1].get_text()[0]) in ["Lm", "Mn", "Sk"]: |
mod = var[vid][-1].width |
else: |
ch = new[ptr] |
fcur_ = None |
try: |
if fcur_ is None and self.fontmap["tiro"].to_unichr(ord(ch)) == ch: |
fcur_ = "tiro" |
except Exception: |
pass |
if fcur_ is None: |
fcur_ = self.noto_name |
if fcur_ == self.noto_name: |
adv = self.noto.char_lengths(ch, size)[0] |
else: |
adv = self.fontmap[fcur_].char_width(ord(ch)) * size |
ptr += 1 |
if ( |
fcur_ != fcur |
or vy_regex |
or x + adv > x1 + 0.1 * size |
): |
if cstk: |
ops_vals.append({ |
"type": OpType.TEXT, |
"font": fcur, |
"size": size, |
"x": tx, |
"dy": 0, |
"rtxt": raw_string(fcur, cstk), |
"lidx": lidx |
}) |
cstk = "" |
if brk and x + adv > x1 + 0.1 * size: |
x = x0 |
lidx += 1 |
if vy_regex: |
fix = 0 |
if fcur is not None: |
fix = varf[vid] |
for vch in var[vid]: |
vc = chr(vch.cid) |
ops_vals.append({ |
"type": OpType.TEXT, |
"font": self.fontid[vch.font], |
"size": vch.size, |
"x": x + vch.x0 - var[vid][0].x0, |
"dy": fix + vch.y0 - var[vid][0].y0, |
"rtxt": raw_string(self.fontid[vch.font], vc), |
"lidx": lidx |
}) |
if log.isEnabledFor(logging.DEBUG): |
lstk.append(LTLine(0.1, (_x, _y), (x + vch.x0 - var[vid][0].x0, fix + y + vch.y0 - var[vid][0].y0))) |
_x, _y = x + vch.x0 - var[vid][0].x0, fix + y + vch.y0 - var[vid][0].y0 |
for l in varl[vid]: |
if l.linewidth < 5: |
ops_vals.append({ |
"type": OpType.LINE, |
"x": l.pts[0][0] + x - var[vid][0].x0, |
"dy": l.pts[0][1] + fix - var[vid][0].y0, |
"linewidth": l.linewidth, |
"xlen": l.pts[1][0] - l.pts[0][0], |
"ylen": l.pts[1][1] - l.pts[0][1], |
"lidx": lidx |
}) |
else: |
if not cstk: |
tx = x |
if x == x0 and ch == " ": |
adv = 0 |
else: |
cstk += ch |
else: |
cstk += ch |
adv -= mod |
fcur = fcur_ |
x += adv |
if log.isEnabledFor(logging.DEBUG): |
lstk.append(LTLine(0.1, (_x, _y), (x, y))) |
_x, _y = x, y |
if cstk: |
ops_vals.append({ |
"type": OpType.TEXT, |
"font": fcur, |
"size": size, |
"x": tx, |
"dy": 0, |
"rtxt": raw_string(fcur, cstk), |
"lidx": lidx |
}) |
line_height = default_line_height |
while (lidx + 1) * size * line_height > height and line_height >= 1: |
line_height -= 0.05 |
for vals in ops_vals: |
if vals["type"] == OpType.TEXT: |
ops_list.append(gen_op_txt(vals["font"], vals["size"], vals["x"], vals["dy"] + y - vals["lidx"] * size * line_height, vals["rtxt"])) |
elif vals["type"] == OpType.LINE: |
ops_list.append(gen_op_line(vals["x"], vals["dy"] + y - vals["lidx"] * size * line_height, vals["xlen"], vals["ylen"], vals["linewidth"])) |
for l in lstk: |
if l.linewidth < 5: |
ops_list.append(gen_op_line(l.pts[0][0], l.pts[0][1], l.pts[1][0] - l.pts[0][0], l.pts[1][1] - l.pts[0][1], l.linewidth)) |
ops = f"BT {''.join(ops_list)}ET " |
return ops |
class OpType(Enum): |
TEXT = "text" |
LINE = "line" |