Spaces:
Running
Running
import logging | |
from io import BytesIO | |
from typing import TYPE_CHECKING, BinaryIO, Optional, Union | |
from pdf2zh import settings | |
from pdf2zh.casting import safe_int | |
from pdf2zh.pdfexceptions import PDFException | |
from pdf2zh.pdftypes import PDFObjRef, PDFStream, dict_value, int_value | |
from pdf2zh.psexceptions import PSEOF | |
from pdf2zh.psparser import KWD, PSKeyword, PSStackParser | |
if TYPE_CHECKING: | |
from pdf2zh.pdfdocument import PDFDocument | |
log = logging.getLogger(__name__) | |
class PDFSyntaxError(PDFException): | |
pass | |
# PDFParser stack holds all the base types plus PDFStream, PDFObjRef, and None | |
class PDFParser(PSStackParser[Union[PSKeyword, PDFStream, PDFObjRef, None]]): | |
"""PDFParser fetch PDF objects from a file stream. | |
It can handle indirect references by referring to | |
a PDF document set by set_document method. | |
It also reads XRefs at the end of every PDF file. | |
Typical usage: | |
parser = PDFParser(fp) | |
parser.read_xref() | |
parser.read_xref(fallback=True) # optional | |
parser.set_document(doc) | |
parser.seek(offset) | |
parser.nextobject() | |
""" | |
def __init__(self, fp: BinaryIO) -> None: | |
PSStackParser.__init__(self, fp) | |
self.doc: Optional[PDFDocument] = None | |
self.fallback = False | |
def set_document(self, doc: "PDFDocument") -> None: | |
"""Associates the parser with a PDFDocument object.""" | |
self.doc = doc | |
KEYWORD_R = KWD(b"R") | |
KEYWORD_NULL = KWD(b"null") | |
KEYWORD_ENDOBJ = KWD(b"endobj") | |
KEYWORD_STREAM = KWD(b"stream") | |
KEYWORD_XREF = KWD(b"xref") | |
KEYWORD_STARTXREF = KWD(b"startxref") | |
def do_keyword(self, pos: int, token: PSKeyword) -> None: | |
"""Handles PDF-related keywords.""" | |
if token in (self.KEYWORD_XREF, self.KEYWORD_STARTXREF): | |
self.add_results(*self.pop(1)) | |
elif token is self.KEYWORD_ENDOBJ: | |
self.add_results(*self.pop(4)) | |
elif token is self.KEYWORD_NULL: | |
# null object | |
self.push((pos, None)) | |
elif token is self.KEYWORD_R: | |
# reference to indirect object | |
if len(self.curstack) >= 2: | |
(_, _object_id), _ = self.pop(2) | |
object_id = safe_int(_object_id) | |
if object_id is not None: | |
obj = PDFObjRef(self.doc, object_id) | |
self.push((pos, obj)) | |
elif token is self.KEYWORD_STREAM: | |
# stream object | |
((_, dic),) = self.pop(1) | |
dic = dict_value(dic) | |
objlen = 0 | |
if not self.fallback: | |
try: | |
objlen = int_value(dic["Length"]) | |
except KeyError: | |
if settings.STRICT: | |
raise PDFSyntaxError("/Length is undefined: %r" % dic) | |
self.seek(pos) | |
try: | |
(_, line) = self.nextline() # 'stream' | |
except PSEOF: | |
if settings.STRICT: | |
raise PDFSyntaxError("Unexpected EOF") | |
return | |
pos += len(line) | |
self.fp.seek(pos) | |
data = bytearray(self.fp.read(objlen)) | |
self.seek(pos + objlen) | |
while 1: | |
try: | |
(linepos, line) = self.nextline() | |
except PSEOF: | |
if settings.STRICT: | |
raise PDFSyntaxError("Unexpected EOF") | |
break | |
if b"endstream" in line: | |
i = line.index(b"endstream") | |
objlen += i | |
if self.fallback: | |
data += line[:i] | |
break | |
objlen += len(line) | |
if self.fallback: | |
data += line | |
self.seek(pos + objlen) | |
# XXX limit objlen not to exceed object boundary | |
# log.debug( | |
# "Stream: pos=%d, objlen=%d, dic=%r, data=%r...", | |
# pos, | |
# objlen, | |
# dic, | |
# data[:10], | |
# ) | |
assert self.doc is not None | |
stream = PDFStream(dic, bytes(data), self.doc.decipher) | |
self.push((pos, stream)) | |
else: | |
# others | |
self.push((pos, token)) | |
class PDFStreamParser(PDFParser): | |
"""PDFStreamParser is used to parse PDF content streams | |
that is contained in each page and has instructions | |
for rendering the page. A reference to a PDF document is | |
needed because a PDF content stream can also have | |
indirect references to other objects in the same document. | |
""" | |
def __init__(self, data: bytes) -> None: | |
PDFParser.__init__(self, BytesIO(data)) | |
def flush(self) -> None: | |
self.add_results(*self.popall()) | |
KEYWORD_OBJ = KWD(b"obj") | |
def do_keyword(self, pos: int, token: PSKeyword) -> None: | |
if token is self.KEYWORD_R: | |
# reference to indirect object | |
(_, _object_id), _ = self.pop(2) | |
object_id = safe_int(_object_id) | |
if object_id is not None: | |
obj = PDFObjRef(self.doc, object_id) | |
self.push((pos, obj)) | |
return | |
elif token in (self.KEYWORD_OBJ, self.KEYWORD_ENDOBJ): | |
if settings.STRICT: | |
# See PDF Spec 3.4.6: Only the object values are stored in the | |
# stream; the obj and endobj keywords are not used. | |
raise PDFSyntaxError("Keyword endobj found in stream") | |
return | |
# others | |
self.push((pos, token)) | |