|
|
|
"""Extract pdf structure in XML format""" |
|
import logging |
|
import os.path |
|
import re |
|
import sys |
|
from typing import Any, Container, Dict, Iterable, List, Optional, TextIO, Union, cast |
|
from argparse import ArgumentParser |
|
|
|
import pdfminer |
|
from pdfminer.pdfdocument import PDFDocument, PDFNoOutlines, PDFXRefFallback |
|
from pdfminer.pdfpage import PDFPage |
|
from pdfminer.pdfparser import PDFParser |
|
from pdfminer.pdftypes import PDFObjectNotFound, PDFValueError |
|
from pdfminer.pdftypes import PDFStream, PDFObjRef, resolve1, stream_value |
|
from pdfminer.psparser import PSKeyword, PSLiteral, LIT |
|
from pdfminer.utils import isnumber |
|
|
|
logging.basicConfig() |
|
logger = logging.getLogger(__name__) |
|
|
|
ESC_PAT = re.compile(r'[\000-\037&<>()"\042\047\134\177-\377]') |
|
|
|
|
|
def escape(s: Union[str, bytes]) -> str: |
|
if isinstance(s, bytes): |
|
us = str(s, "latin-1") |
|
else: |
|
us = s |
|
return ESC_PAT.sub(lambda m: "&#%d;" % ord(m.group(0)), us) |
|
|
|
|
|
def dumpxml(out: TextIO, obj: object, codec: Optional[str] = None) -> None: |
|
if obj is None: |
|
out.write("<null />") |
|
return |
|
|
|
if isinstance(obj, dict): |
|
out.write('<dict size="%d">\n' % len(obj)) |
|
for (k, v) in obj.items(): |
|
out.write("<key>%s</key>\n" % k) |
|
out.write("<value>") |
|
dumpxml(out, v) |
|
out.write("</value>\n") |
|
out.write("</dict>") |
|
return |
|
|
|
if isinstance(obj, list): |
|
out.write('<list size="%d">\n' % len(obj)) |
|
for v in obj: |
|
dumpxml(out, v) |
|
out.write("\n") |
|
out.write("</list>") |
|
return |
|
|
|
if isinstance(obj, (str, bytes)): |
|
out.write('<string size="%d">%s</string>' % (len(obj), escape(obj))) |
|
return |
|
|
|
if isinstance(obj, PDFStream): |
|
if codec == "raw": |
|
|
|
out.write(obj.get_rawdata()) |
|
elif codec == "binary": |
|
|
|
out.write(obj.get_data()) |
|
else: |
|
out.write("<stream>\n<props>\n") |
|
dumpxml(out, obj.attrs) |
|
out.write("\n</props>\n") |
|
if codec == "text": |
|
data = obj.get_data() |
|
out.write('<data size="%d">%s</data>\n' % (len(data), escape(data))) |
|
out.write("</stream>") |
|
return |
|
|
|
if isinstance(obj, PDFObjRef): |
|
out.write('<ref id="%d" />' % obj.objid) |
|
return |
|
|
|
if isinstance(obj, PSKeyword): |
|
|
|
out.write("<keyword>%s</keyword>" % obj.name) |
|
return |
|
|
|
if isinstance(obj, PSLiteral): |
|
|
|
out.write("<literal>%s</literal>" % obj.name) |
|
return |
|
|
|
if isnumber(obj): |
|
out.write("<number>%s</number>" % obj) |
|
return |
|
|
|
raise TypeError(obj) |
|
|
|
|
|
def dumptrailers( |
|
out: TextIO, doc: PDFDocument, show_fallback_xref: bool = False |
|
) -> None: |
|
for xref in doc.xrefs: |
|
if not isinstance(xref, PDFXRefFallback) or show_fallback_xref: |
|
out.write("<trailer>\n") |
|
dumpxml(out, xref.get_trailer()) |
|
out.write("\n</trailer>\n\n") |
|
no_xrefs = all(isinstance(xref, PDFXRefFallback) for xref in doc.xrefs) |
|
if no_xrefs and not show_fallback_xref: |
|
msg = ( |
|
"This PDF does not have an xref. Use --show-fallback-xref if " |
|
"you want to display the content of a fallback xref that " |
|
"contains all objects." |
|
) |
|
logger.warning(msg) |
|
return |
|
|
|
|
|
def dumpallobjs( |
|
out: TextIO, |
|
doc: PDFDocument, |
|
codec: Optional[str] = None, |
|
show_fallback_xref: bool = False, |
|
) -> None: |
|
visited = set() |
|
out.write("<pdf>") |
|
for xref in doc.xrefs: |
|
for objid in xref.get_objids(): |
|
if objid in visited: |
|
continue |
|
visited.add(objid) |
|
try: |
|
obj = doc.getobj(objid) |
|
if obj is None: |
|
continue |
|
out.write('<object id="%d">\n' % objid) |
|
dumpxml(out, obj, codec=codec) |
|
out.write("\n</object>\n\n") |
|
except PDFObjectNotFound as e: |
|
print("not found: %r" % e) |
|
dumptrailers(out, doc, show_fallback_xref) |
|
out.write("</pdf>") |
|
return |
|
|
|
|
|
def dumpoutline( |
|
outfp: TextIO, |
|
fname: str, |
|
objids: Any, |
|
pagenos: Container[int], |
|
password: str = "", |
|
dumpall: bool = False, |
|
codec: Optional[str] = None, |
|
extractdir: Optional[str] = None, |
|
) -> None: |
|
fp = open(fname, "rb") |
|
parser = PDFParser(fp) |
|
doc = PDFDocument(parser, password) |
|
pages = { |
|
page.pageid: pageno |
|
for (pageno, page) in enumerate(PDFPage.create_pages(doc), 1) |
|
} |
|
|
|
def resolve_dest(dest: object) -> Any: |
|
if isinstance(dest, (str, bytes)): |
|
dest = resolve1(doc.get_dest(dest)) |
|
elif isinstance(dest, PSLiteral): |
|
dest = resolve1(doc.get_dest(dest.name)) |
|
if isinstance(dest, dict): |
|
dest = dest["D"] |
|
if isinstance(dest, PDFObjRef): |
|
dest = dest.resolve() |
|
return dest |
|
|
|
try: |
|
outlines = doc.get_outlines() |
|
outfp.write("<outlines>\n") |
|
for (level, title, dest, a, se) in outlines: |
|
pageno = None |
|
if dest: |
|
dest = resolve_dest(dest) |
|
pageno = pages[dest[0].objid] |
|
elif a: |
|
action = a |
|
if isinstance(action, dict): |
|
subtype = action.get("S") |
|
if subtype and repr(subtype) == "/'GoTo'" and action.get("D"): |
|
dest = resolve_dest(action["D"]) |
|
pageno = pages[dest[0].objid] |
|
s = escape(title) |
|
outfp.write('<outline level="{!r}" title="{}">\n'.format(level, s)) |
|
if dest is not None: |
|
outfp.write("<dest>") |
|
dumpxml(outfp, dest) |
|
outfp.write("</dest>\n") |
|
if pageno is not None: |
|
outfp.write("<pageno>%r</pageno>\n" % pageno) |
|
outfp.write("</outline>\n") |
|
outfp.write("</outlines>\n") |
|
except PDFNoOutlines: |
|
pass |
|
parser.close() |
|
fp.close() |
|
return |
|
|
|
|
|
LITERAL_FILESPEC = LIT("Filespec") |
|
LITERAL_EMBEDDEDFILE = LIT("EmbeddedFile") |
|
|
|
|
|
def extractembedded(fname: str, password: str, extractdir: str) -> None: |
|
def extract1(objid: int, obj: Dict[str, Any]) -> None: |
|
filename = os.path.basename(obj.get("UF") or cast(bytes, obj.get("F")).decode()) |
|
fileref = obj["EF"].get("UF") or obj["EF"].get("F") |
|
fileobj = doc.getobj(fileref.objid) |
|
if not isinstance(fileobj, PDFStream): |
|
error_msg = ( |
|
"unable to process PDF: reference for %r is not a " |
|
"PDFStream" % filename |
|
) |
|
raise PDFValueError(error_msg) |
|
if fileobj.get("Type") is not LITERAL_EMBEDDEDFILE: |
|
raise PDFValueError( |
|
"unable to process PDF: reference for %r " |
|
"is not an EmbeddedFile" % (filename) |
|
) |
|
path = os.path.join(extractdir, "%.6d-%s" % (objid, filename)) |
|
if os.path.exists(path): |
|
raise IOError("file exists: %r" % path) |
|
print("extracting: %r" % path) |
|
os.makedirs(os.path.dirname(path), exist_ok=True) |
|
out = open(path, "wb") |
|
out.write(fileobj.get_data()) |
|
out.close() |
|
return |
|
|
|
with open(fname, "rb") as fp: |
|
parser = PDFParser(fp) |
|
doc = PDFDocument(parser, password) |
|
extracted_objids = set() |
|
for xref in doc.xrefs: |
|
for objid in xref.get_objids(): |
|
obj = doc.getobj(objid) |
|
if ( |
|
objid not in extracted_objids |
|
and isinstance(obj, dict) |
|
and obj.get("Type") is LITERAL_FILESPEC |
|
): |
|
extracted_objids.add(objid) |
|
extract1(objid, obj) |
|
return |
|
|
|
|
|
def dumppdf( |
|
outfp: TextIO, |
|
fname: str, |
|
objids: Iterable[int], |
|
pagenos: Container[int], |
|
password: str = "", |
|
dumpall: bool = False, |
|
codec: Optional[str] = None, |
|
extractdir: Optional[str] = None, |
|
show_fallback_xref: bool = False, |
|
) -> None: |
|
fp = open(fname, "rb") |
|
parser = PDFParser(fp) |
|
doc = PDFDocument(parser, password) |
|
if objids: |
|
for objid in objids: |
|
obj = doc.getobj(objid) |
|
dumpxml(outfp, obj, codec=codec) |
|
if pagenos: |
|
for (pageno, page) in enumerate(PDFPage.create_pages(doc)): |
|
if pageno in pagenos: |
|
if codec: |
|
for obj in page.contents: |
|
obj = stream_value(obj) |
|
dumpxml(outfp, obj, codec=codec) |
|
else: |
|
dumpxml(outfp, page.attrs) |
|
if dumpall: |
|
dumpallobjs(outfp, doc, codec, show_fallback_xref) |
|
if (not objids) and (not pagenos) and (not dumpall): |
|
dumptrailers(outfp, doc, show_fallback_xref) |
|
fp.close() |
|
if codec not in ("raw", "binary"): |
|
outfp.write("\n") |
|
return |
|
|
|
|
|
def create_parser() -> ArgumentParser: |
|
parser = ArgumentParser(description=__doc__, add_help=True) |
|
parser.add_argument( |
|
"files", |
|
type=str, |
|
default=None, |
|
nargs="+", |
|
help="One or more paths to PDF files.", |
|
) |
|
|
|
parser.add_argument( |
|
"--version", |
|
"-v", |
|
action="version", |
|
version="pdfminer.six v{}".format(pdfminer.__version__), |
|
) |
|
parser.add_argument( |
|
"--debug", |
|
"-d", |
|
default=False, |
|
action="store_true", |
|
help="Use debug logging level.", |
|
) |
|
procedure_parser = parser.add_mutually_exclusive_group() |
|
procedure_parser.add_argument( |
|
"--extract-toc", |
|
"-T", |
|
default=False, |
|
action="store_true", |
|
help="Extract structure of outline", |
|
) |
|
procedure_parser.add_argument( |
|
"--extract-embedded", "-E", type=str, help="Extract embedded files" |
|
) |
|
|
|
parse_params = parser.add_argument_group( |
|
"Parser", description="Used during PDF parsing" |
|
) |
|
parse_params.add_argument( |
|
"--page-numbers", |
|
type=int, |
|
default=None, |
|
nargs="+", |
|
help="A space-seperated list of page numbers to parse.", |
|
) |
|
parse_params.add_argument( |
|
"--pagenos", |
|
"-p", |
|
type=str, |
|
help="A comma-separated list of page numbers to parse. Included for " |
|
"legacy applications, use --page-numbers for more idiomatic " |
|
"argument entry.", |
|
) |
|
parse_params.add_argument( |
|
"--objects", |
|
"-i", |
|
type=str, |
|
help="Comma separated list of object numbers to extract", |
|
) |
|
parse_params.add_argument( |
|
"--all", |
|
"-a", |
|
default=False, |
|
action="store_true", |
|
help="If the structure of all objects should be extracted", |
|
) |
|
parse_params.add_argument( |
|
"--show-fallback-xref", |
|
action="store_true", |
|
help="Additionally show the fallback xref. Use this if the PDF " |
|
"has zero or only invalid xref's. This setting is ignored if " |
|
"--extract-toc or --extract-embedded is used.", |
|
) |
|
parse_params.add_argument( |
|
"--password", |
|
"-P", |
|
type=str, |
|
default="", |
|
help="The password to use for decrypting PDF file.", |
|
) |
|
|
|
output_params = parser.add_argument_group( |
|
"Output", description="Used during output generation." |
|
) |
|
output_params.add_argument( |
|
"--outfile", |
|
"-o", |
|
type=str, |
|
default="-", |
|
help='Path to file where output is written. Or "-" (default) to ' |
|
"write to stdout.", |
|
) |
|
codec_parser = output_params.add_mutually_exclusive_group() |
|
codec_parser.add_argument( |
|
"--raw-stream", |
|
"-r", |
|
default=False, |
|
action="store_true", |
|
help="Write stream objects without encoding", |
|
) |
|
codec_parser.add_argument( |
|
"--binary-stream", |
|
"-b", |
|
default=False, |
|
action="store_true", |
|
help="Write stream objects with binary encoding", |
|
) |
|
codec_parser.add_argument( |
|
"--text-stream", |
|
"-t", |
|
default=False, |
|
action="store_true", |
|
help="Write stream objects as plain text", |
|
) |
|
|
|
return parser |
|
|
|
|
|
def main(argv: Optional[List[str]] = None) -> None: |
|
parser = create_parser() |
|
args = parser.parse_args(args=argv) |
|
|
|
if args.debug: |
|
logging.getLogger().setLevel(logging.DEBUG) |
|
|
|
if args.outfile == "-": |
|
outfp = sys.stdout |
|
else: |
|
outfp = open(args.outfile, "w") |
|
|
|
if args.objects: |
|
objids = [int(x) for x in args.objects.split(",")] |
|
else: |
|
objids = [] |
|
|
|
if args.page_numbers: |
|
pagenos = {x - 1 for x in args.page_numbers} |
|
elif args.pagenos: |
|
pagenos = {int(x) - 1 for x in args.pagenos.split(",")} |
|
else: |
|
pagenos = set() |
|
|
|
password = args.password |
|
|
|
if args.raw_stream: |
|
codec: Optional[str] = "raw" |
|
elif args.binary_stream: |
|
codec = "binary" |
|
elif args.text_stream: |
|
codec = "text" |
|
else: |
|
codec = None |
|
|
|
for fname in args.files: |
|
if args.extract_toc: |
|
dumpoutline( |
|
outfp, |
|
fname, |
|
objids, |
|
pagenos, |
|
password=password, |
|
dumpall=args.all, |
|
codec=codec, |
|
extractdir=None, |
|
) |
|
elif args.extract_embedded: |
|
extractembedded(fname, password=password, extractdir=args.extract_embedded) |
|
else: |
|
dumppdf( |
|
outfp, |
|
fname, |
|
objids, |
|
pagenos, |
|
password=password, |
|
dumpall=args.all, |
|
codec=codec, |
|
extractdir=None, |
|
show_fallback_xref=args.show_fallback_xref, |
|
) |
|
|
|
outfp.close() |
|
|
|
|
|
if __name__ == "__main__": |
|
main() |
|
|