Application / pdf2txt_v1.py
FrankWu's picture
Upload 5 files
e2dccf7 verified
# -*- coding: utf-8 -*-
"""
Created by Shengbo.Zhang on 2021/08/13
"""
import io
import re
import os
import csv
import logging
from docx import Document
from pdf2docx import Converter
from Pdf2Txt.config import *
from pdfminer.layout import LAParams
from pdfminer.pdfpage import PDFPage
from pdfminer.converter import TextConverter
from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter
from Pdf2Txt.config import _check_ann_title_processable
# 关闭pdf2docx模块中Converter的日志输出
logging.disable(logging.INFO)
logging.disable(logging.WARNING)
def _get_txt_from_pdf(pdf_path, out_path):
'''
读取Pdf文件,直接将其转换为Txt文本格式
:param pdf_path: 输入的pdf公告文件的完整路径
:param out_path: 输出的txt结果文件的完整路径
:return: bool
'''
manager = PDFResourceManager()
output = io.StringIO()
converter = TextConverter(manager, output, laparams=LAParams())
interpreter = PDFPageInterpreter(manager, converter)
with open(pdf_path, 'rb') as infile:
content = []
for page in PDFPage.get_pages(infile, check_extractable=True):
interpreter.process_page(page)
convertedPDF = output.getvalue()
# print(convertedPDF)
content.append(convertedPDF)
# print(len(content))
# print(content)
for idx, val in enumerate(content):
val = re.sub('\n+','\n', val)
val = re.sub('\n +', '', val)
val = val.replace(' ', '')
content[idx] = val
with open(out_path, 'wb') as f:
f.write(''.join(content).encode('utf-8'))
output.close()
converter.close()
f.close()
return True
def _get_cleaned_txt(txtPath, out_path):
'''
对Txt文件进行内容格式清洗(暂时仅供测试)
:param txtPath: 输入的txt文件的完整路径
:param out_path: 输出的txt文件的完整路径
:return: bool
'''
with open(txtPath, 'rb')as f:
content = f.read().decode('utf-8')
p = re.compile(r'(?<=##)\S.+(?=##)|[\u4e00-\u9fff+\u3002\uFF0C]')
x = ''.join(re.findall(p, content))
final_result = re.sub(u"[\uFF0C|\u3002|\u002B]{2,}", "", x)
with open(out_path, "w")as txtPath:
txtPath.write(final_result)
# print(final_result)
return True
def get_docx_from_pdf(pdf_path, out_path):
'''
读取Pdf文件,将其转换为Docx格式并存在本地
:param pdf_path: 输入的pdf公告文件的完整路径
:param out_path: 输出的中间docx结果文件的完整路径
:return: bool
'''
try:
cv = Converter(pdf_path)
cv.convert(out_path)
except Exception:
return False
for p in cv.pages:
if not p.finalized:
cv.close()
return False
cv.close()
return True
def _find_key_indexs(str, key):
'''
给定一个父字符串和子串,在父串中查找子串的所有索引位置,并返回一个包含所有下标的列表
:param str: 父字符串
:param key: 子字符串
:return: list
'''
lstKey = []
countStr = str.count(key)
if countStr < 1:
return []
elif countStr == 1:
indexKey = str.find(key)
return [indexKey]
else:
indexKey = str.find(key)
lstKey.append(indexKey)
while countStr > 1:
str_new = str[indexKey + 1:len(str) + 1]
indexKey_new = str_new.find(key)
indexKey = indexKey + 1 + indexKey_new
lstKey.append(indexKey)
countStr -= 1
lstKey.sort(reverse=True)
return lstKey
def _insert_char_into_str(str, idx, char):
'''
给定一个父字符串、下标位置、子串,在父串中的下标位置插入子串,并返回一个新的字符串
:param str: 父字符串
:param idx: 插入位置索引
:param char: 子字符串
:return: str
'''
tmp = list(str)
tmp.insert(idx, char)
return ''.join(tmp)
def _is_chinese(str):
'''
给定一个字符串,判断该字符串是否全是中文
:param str: 字符串
:return: bool
'''
for ch in str:
if '\u4e00' <= ch <= '\u9fff':
return True
return False
def _get_table_row_feat(str):
'''
给定一个空格分割的表格行字符串,计算它的特征(01组成的字符串)
:param str: 字符串
:return: 字符串
'''
s = str.split()
r = ''
for c in s:
try:
_ = float(c)
r += '1'
except Exception:
r += '0'
return r
def _check_if_include_first_proper(s, corpus):
'''
检查字符串s中是否包含语料列表first_corpus中的某一内容
:param s: 字符串
:param corpus: 字符串列表
:return: [bool, str]
'''
for i in corpus:
if i in s:
return [True, i]
return [False, '']
def _check_if_include_second_proper(s, corpus):
'''
检查字符串s中是否包含语料列表first_corpus中的某一内容
:param s: 字符串
:param corpus: 字符串列表
:return: list
'''
res = []
for i in corpus:
if i in s:
res.append([True, i])
else:
res.append([False, i])
return res
def _match_and_insert(string, pattern, substring):
'''
匹配string字符串中的pattern,计算所有pattern在string中的首个字符索引位置,并在string从后向前插入substring至这些位置
:param string: 待匹配的字符串
:param pattern: 匹配模式
:param substring: 待插入的子字符串
:return: 插入后的字符串
'''
idx_list = []
for j in re.finditer(pattern, string):
idx_list.append(j.span()[0])
# 将匹配模式的所有索引下标进行倒序排列,方便后续插入end_flag
idx_list.sort(reverse=True)
if idx_list != []:
for k in idx_list:
if k > 0 and string[k-1] != '“':
string = _insert_char_into_str(string, k, substring)
return string
def _match_and_delete(string, pattern):
'''
匹配string字符串中的pattern,计算pattern在string中的首个字符索引位置,删除该索引前2个位置的换行符\n
:param string: 待匹配的字符串
:param pattern: 匹配模式
:return: 删除'\n\n'子字符串后的字符串
'''
matcher = re.search(pattern, string)
if matcher:
k = matcher.span()[0]
if k >= 2 and string[k-1] == '\n' and string[k-2] == '\n':
string = string[:k-2] + string[k:]
return string
def get_txt_from_docx(doc):
'''
读取Docx文件中每个自然行的材料内容
:param doc: 一个Document对象实例
:param out_path: 输出的txt结果文件的完整路径
:return: bool(转换是否成功), list(格式化修正后的文本列表)
'''
# 公告中的编号符号的集合,例如:'(1)', '1、'
NUMBER_1 = '123456789一二三四五六七八九十'
# 数字与大小写的英文字母的集合
NUMBER_2 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz'
# 提取docx文件中的单行文本至初始文本列表paras中
paras = [para.text+'\n' for i, para in enumerate(doc.paragraphs)]
# 存储首轮格式化修正文本的列表new_paras
new_paras = []
# new_paras中各字符串的长度
new_paras_len_cnt = []
try:
# 遍历paras文本列表中的各字符串
for val in paras:
# 若该行文本为空,或者是页面号,或者是’单位‘,则跳过
if val == '\n' or re.search('^[0-9]+ \n$', val) or val[:2] == '单位':
continue
# 否则,将该行文本添加进new_paras文本列表中
new_paras.append(val.lstrip())
# 计算该行文本的字符长度
new_paras_len_cnt.append(len(val))
# 正文标识符,指示正文开始的行号
line_mark = 0
# 遍历new_paras的前10行,目的是处理公告的头部信息,例如:证券代码、证券简称、公告编号、公告标题等
for i, val in enumerate(new_paras[:10]):
# 如果出现制表符,或者空格数大于1,则仅保留一个空格
if '\t' in val or val.count(' ') > 2:
new_paras[i] = ' '.join(val.split()) + '\n'
if '证券代码:' in new_paras[i]:
continue
# 如果行末尾是'有限公司',则去掉可能的空格
if val.replace(' ', '')[-5:] == '有限公司\n':
new_paras[i] = val.replace(' ', '')
continue
# 循环检查下一行,直到行末尾是'公告',或'股东大会的通知',或'董事意见函'(应与mongo.py中process_files()相对应)
# 此时认为抵达正文的起始位置,后续处理将从第line_mark行开始
if _check_ann_title_processable(val.replace(' ', ''), exp=1):
new_paras[i] = val.replace(' ', '')
line_mark = i + 1
break
else:
new_paras[i] = val.replace('\n', '').replace(' ', '')
# 计算new_paras中各行的平均字符长度
mean_len = sum(new_paras_len_cnt)//len(new_paras_len_cnt)
# 遍历new_paras
for i, _ in enumerate(new_paras):
# 如果是正文部分
if i >= line_mark:
# 去掉该行中的一些符号(空格、特殊符号、英文逗号)
new_paras[i] = new_paras[i]\
.replace(' ', '')\
.replace(' ', '')\
.replace('', '')\
.replace(',', '')
# 如果该行长度大于平均长度,并且(下一行首部为非编号,或者下一行首部是编号且以换行结尾),则认为该行在段落中,故去掉该行换行符
if i < len(new_paras)-1 and \
len(new_paras[i]) >= mean_len and \
((new_paras[i + 1].replace('(','').replace('(','')[0] not in NUMBER_1) or
new_paras[i + 1][-1] == '\n'):
new_paras[i] = new_paras[i].replace('\n', '')
# 如果该行的下一行长度大于等于3,并且该行的下一行首部是非编号,且不包含关键字'年',则认为该行在段落中,故去掉该行换行符
if i < len(new_paras)-2 and \
len(new_paras[i + 1]) >= 3 and \
new_paras[i + 1].replace('(','').replace('(','')[0] in NUMBER_2 and \
(not '.' in new_paras[i+1][:3]) and \
(not '、' in new_paras[i+1][:3]) and \
(not '年' in new_paras[i+1]):
new_paras[i] = new_paras[i].replace('\n', '')
# 查找该行中,中文冒号符号':'的所有索引位置j
for j in _find_key_indexs(new_paras[i], ':'):
# 如果该行的j+1位置不是换行,并且该行不包括中文括号'()'与书名号'《》',且该行不包含一级专用名词,则认为该行应独立成
# 段落,故在该行j+1位置插入换行符
# 注:执行插入操作时,若有多个位置进行插入,则总是从后往前插入,确保插入后索引仍然正确
if j < len(new_paras[i])-1 and new_paras[i][j+1] != '\n' and \
('(' not in new_paras[i]) and ('《' not in new_paras[i]) and \
(')' not in new_paras[i]) and ('》' not in new_paras[i]) and \
(not _check_if_include_first_proper(new_paras[i], FIRST_PROPER_CORPUS)[0]):
new_paras[i] = _insert_char_into_str(new_paras[i], j+1, '\n')
# 查找该行中,中文左括号符号'('的所有索引位置j
for j in _find_key_indexs(new_paras[i], '('):
# 如果该行的j+1位置是编号,并且该行的上一行末尾不是换行,且该行j-1位置为非中文和非书名号,则认为该行的下一行应独立成
# 段落,故在该行j位置插入换行符
if new_paras[i][j+1] in NUMBER_1 and new_paras[i-1][-1] != '\n' and \
(not _is_chinese(new_paras[i][j-1])) and new_paras[i][j-1] != '》':
new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n')
# 查找该行中,英文左括号符号'('的所有索引位置j
for j in _find_key_indexs(new_paras[i], '('):
# 如果该行的j+1位置是编号,并且该行的上一行末尾不是换行,且该行j-1位置为非中文和非书名号,则认为该行的下一行应独立成
# 段落,故在该行j位置插入换行符
if new_paras[i][j + 1] in NUMBER_1 and new_paras[i - 1][-1] != '\n' and \
(not _is_chinese(new_paras[i][j - 1])) and new_paras[i][j - 1] != '》':
new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n')
# 查找该行中,中文顿号符号'、'的所有索引位置j
for j in _find_key_indexs(new_paras[i], '、'):
# 如果该行的j-1位置是编号(不超过9或十),并且该行的上一行末尾不是换行,则认为该行的下一行应独立成段落,故在该行j-1
# 位置插入换行符
if (j-2) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] not in NUMBER_1 \
and new_paras[i][j-2] in '。;.;' and new_paras[i-1][-1] != '\n':
new_paras[i] = _insert_char_into_str(new_paras[i], j-1, '\n')
continue
# 如果该行的j-1与j-2位置都是编号(超过9或十),并且该行的上一行末尾不是换行,则认为该行的下一行应独立成段落,故在该
# 行j-2位置插入换行符
if (j-3) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] in NUMBER_1 \
and new_paras[i][j-3] in '。;.;' and new_paras[i-1][-1] != '\n':
new_paras[i] = _insert_char_into_str(new_paras[i], j-2, '\n')
# 修正某些情形下'特此公告。'未自成段落的情况
if new_paras[i] == '特此公告。\n':
if new_paras[i-1][-1] != '\n':
new_paras[i] = '\n特此公告。\n'
if new_paras[i+1][-1] != '\n':
new_paras[i+1] += '\n'
# 如果该行的下一行中含有独立的一级专用名词,则认为该行的下一行应独立成段落,故在该行的末尾插入缺省的换行符
if (i+1) < len(new_paras):
tmp_flag, tmp_str = _check_if_include_first_proper(new_paras[i+1], FIRST_PROPER_CORPUS)
if tmp_flag:
tmp_idx = new_paras[i+1].index(tmp_str) - 1
if tmp_idx >= 0 and new_paras[i+1][tmp_idx] != '(':
if new_paras[i][-1] != '\n':
new_paras[i] += '\n'
# 将new_paras中的若干字符串连接成为一个字符串str_sum
str_sum = ''.join(new_paras)
# 将str_num字符串按照换行符进行分割,生成次轮格式化修正文本的列表final_paras
final_paras = str_sum.split('\n')
# 遍历final_paras
for i, val in enumerate(final_paras):
# 每一自然段落的末尾符号,这里为两个换行符,便于清晰地查看最终生成的txt文本
end_flag = '\n\n'
# 给final_paras中的每一行添加一个末尾符号
final_paras[i] += end_flag
# 在该行中查找匹配到的所有形如: '(1)', '(2)' 的模式
# 此处认为该模式的起始位置应独立成段落,例如'\n(1)XXX...', '\n(1)XXX...'
if '(' in final_paras[i]:
final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag)
# 在该行中,查找所有的中文左括号符号'('与中文右括号符号')',计算它们各自的数量
# 如果两符号的数量不相等,则认为该行处在段落中,故应去掉该行末尾的end_flag
if len(_find_key_indexs(final_paras[i], '(')) != len(_find_key_indexs(final_paras[i], ')')):
final_paras[i] = final_paras[i][:-2]
# 将final_paras中的若干字符串连接成为一个字符串str_sum
str_sum = ''.join(final_paras)
# 将str_num字符串按照换行符进行分割,生成终轮格式化修正文本的列表,覆盖掉之前的final_paras
final_paras = str_sum.split('\n\n')
# 遍历final_paras
for i, val in enumerate(final_paras):
# 每一自然段落的末尾符号,这里为两个换行符,便于清晰地查看最终生成的txt文本
end_flag = '\n\n'
# 给final_paras中的每一行再次添加上一个末尾符号
final_paras[i] += end_flag
# 修正某些情形下'重要内容提示:'未自成段落的情况
if '重要内容提示:' in final_paras[i]:
idx = final_paras[i].index('重要内容提示:')
if final_paras[i][idx+7] != '\n':
final_paras[i] = _insert_char_into_str(final_paras[i], idx+7, '\n\n')
if idx > 0:
if final_paras[i][idx-1] != '\n':
final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n')
# 修正某些情形下'表决结果:'及其跟随的结果未自成段落的情况
if '表决结果:' in final_paras[i]:
if final_paras[i][:5] == '表决结果:':
final_paras[i] = final_paras[i][:-2]
elif final_paras[i][-7:] == '表决结果:\n\n':
idx = final_paras[i].find('表决结果:')
final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n')
final_paras[i] = final_paras[i][:-2]
else:
idx = final_paras[i].find('表决结果:')
final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n')
# 检查该行中的所有二级专用名词(指一级专用名词后所在段落中出现的专用名词,不应独立成段落)
for is_include, s_include in _check_if_include_second_proper(final_paras[i], SECOND_PROPER_CORPUS):
if is_include:
# 如果该行中含有某一二级专用名词,并且名词后有换行符,则去掉该行的换行符
if final_paras[i][final_paras[i].index(s_include)+len(s_include)] == '\n':
final_paras[i] = final_paras[i].replace('\n', '')
# 在该行中查找匹配到的所有形如: '(一)', '(1)', '(一)', '(1)' 的模式
# 此处认为该模式的起始位置应独立成段落,例如'\n(一)XXX...', '\n(1)XXX...'
if '(' in final_paras[i]:
final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}[\)\)]+', end_flag)
final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag)
# 在该行中查找匹配到的所有形如: '一、', '1、' 的模式
# 此处认为该模式的起始位置应独立成段落,例如'\n一、XXX...', '\n1、XXX...'
if '、' in final_paras[i]:
final_paras[i] = _match_and_insert(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、', end_flag)
final_paras[i] = _match_and_insert(final_paras[i], '[0-9]{1,2}、', end_flag)
# 这里对形如: 'XXX第一、二组、三组的XXX' 的特例进行处理,即去掉前序错误添加的换行符
final_paras[i] = _match_and_delete(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、[\S]+、[\S]+')
final_paras[i] = _match_and_delete(final_paras[i], '[0-9]+、[0-9]+')
# 再次检查'●'符号,若非独立成行则在该符号前添加换行符
for j in _find_key_indexs(final_paras[i], '●'):
if j > 0:
final_paras[i] = _insert_char_into_str(final_paras[i], j, end_flag)
# 如果上述处理流程出现任何异常抛掷,则返回(False, []),标志转换失败
except Exception:
return False, []
# 返回(True, final_paras),标志转换成功
return True, final_paras
def get_table_from_docx(doc, txt, out_path="", is_out_flag=False):
'''
读取Docx文件中每个表格的材料内容
:param doc: 一个Document对象实例
:param txt: 一个字符串列表,包含PDF的正文文本内容
:param out_path: 输出的csv结果文件的完整路径
:param is_out_flag: 是否输出csv结果文件,默认不输出
:return: list, list
'''
data = []
table_txt = []
attach_txt = {}
for table in doc.tables[:]:
table_txt.append('-----表格-----\n')
for i, row in enumerate(table.rows[:]):
row_content = []
for cell in row.cells[:]:
c = cell.text
new_c = c.replace('\n', '').replace(' ','').replace('\t','').replace(',','')
row_content.append(new_c)
if row_content == []:
continue
if '本公司' in row_content[0]:
tmp = ''
for line in row_content:
tmp += line.strip()
tmp += '\n\n'
attach_txt['000'] = tmp
continue
if '证券代码' in row_content[0]:
tmp = '^'
for line in row_content:
tmp += line.strip()+' '
tmp += '$\n'
txt.insert(tmp, 0)
continue
data.append(row_content)
new_row = '^' + '\t'.join(row_content) + '$\n'
if new_row.replace('\t','') != '^$\n':
table_txt.append(new_row)
data.append('-----表格-----\n')
table_txt.append('-----表格-----\n')
flag = False
for i, val in enumerate(table_txt):
if val == '-----表格-----\n':
if not flag:
flag = True
else:
table_txt[i] = '^$\n'
else:
flag = False
table_txt = list(filter(lambda x: x != '^$\n', table_txt))
for i, val in enumerate(table_txt):
if val == '-----表格-----\n' and (i > 0) and (i < len(table_txt)-1):
feat1 = _get_table_row_feat(table_txt[i-1].replace('\n', ''))
feat2 = _get_table_row_feat(table_txt[i+1].replace('\n', ''))
if feat1 == feat2:
table_txt[i] = '^$\n'
if len(table_txt) == 1 and table_txt[0] == '-----表格-----\n':
table_txt[0] = '^$\n'
for i, val in enumerate(table_txt):
if val == '-----表格-----':
continue
if val == '^$\n':
table_txt[i] = ''
continue
table_txt[i] = val[1:][:-2] + '\n'
txt.extend(table_txt)
if is_out_flag:
f = open(out_path, 'w+', newline='')
writer = csv.writer(f)
for i, val in enumerate(data):
if i == 0 and val == '\n':
continue
writer.writerow(val)
f.close()
return txt, attach_txt
def refine_pdf2txt_list_result(txt, att_txt):
'''
对txt字符串列表进行最后的校对,还原或附加误识别为表格的正文内容
:param txt: 一个字符串列表,包含PDF的正文文本内容
:param att_txt: 一些误识别为表格的正文内容
:return: list
'''
for id, val in enumerate(txt):
if id > 10: break
else:
if val[-6:-2] == '有限公司':
txt[id] = val[:-2]
continue
if '000' in att_txt and _check_ann_title_processable(val, exp=2):
txt.insert(id+1, att_txt['000'])
break
return txt
def write_pdf2txt_list_result(out_path, txt, out_mode_flag=True):
'''
将txt字符串列表写为txt文本文件
:param out_path: 生成的txt文本文件的路径
:param txt: 一个字符串列表,包含PDF的正文和表格
:param out_mode_flag: 是否添加段头标识'^'和段尾标识'$'
:return: bool
'''
with open(out_path, "w", encoding='utf-8') as f:
if not out_mode_flag:
for line in txt:
if line != '^$\n':
f.write(line)
else:
strs = ''.join(txt)
paras = strs.split('\n')
for line in paras:
if line != '':
f.write('^' + line + '$\n')
return True
def get_pdf2txt_str_result(txt, out_mode_flag=True):
'''
将txt字符串列表内元素拼接为完整的txt内容
:param txt: 一个字符串列表,包含PDF的正文和表格
:param out_mode_flag: 是否添加段头标识'^'和段尾标识'$'
:return: str
'''
txt_str = ""
for line in txt:
if not out_mode_flag:
for line in txt:
if line != '^$\n':
txt_str += line
else:
strs = ''.join(txt)
paras = strs.split('\n')
for line in paras:
if line != '':
txt_str += ('^' + line + '$\n')
return txt_str
def find_all_local_file(base, extension):
'''
找出给定目录下所有的指定后缀格式的文件路径
:param base: 目录路径
:param extension: 后缀格式,例如: '.pdf'
:return: str
'''
for root, ds, fs in os.walk(base):
for f in fs:
if f.endswith(extension.lower()) or f.endswith(extension.upper()):
fullname = os.path.join(root, f).replace('/', '//').replace('\\', '//')
yield fullname