# -*- coding: utf-8 -*- """ Created by Shengbo.Zhang on 2021/08/13 """ import io import re import os import csv import logging from docx import Document from pdf2docx import Converter from Pdf2Txt.config import * from pdfminer.layout import LAParams from pdfminer.pdfpage import PDFPage from pdfminer.converter import TextConverter from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter from Pdf2Txt.config import _check_ann_title_processable # 关闭pdf2docx模块中Converter的日志输出 logging.disable(logging.INFO) logging.disable(logging.WARNING) def _get_txt_from_pdf(pdf_path, out_path): ''' 读取Pdf文件,直接将其转换为Txt文本格式 :param pdf_path: 输入的pdf公告文件的完整路径 :param out_path: 输出的txt结果文件的完整路径 :return: bool ''' manager = PDFResourceManager() output = io.StringIO() converter = TextConverter(manager, output, laparams=LAParams()) interpreter = PDFPageInterpreter(manager, converter) with open(pdf_path, 'rb') as infile: content = [] for page in PDFPage.get_pages(infile, check_extractable=True): interpreter.process_page(page) convertedPDF = output.getvalue() # print(convertedPDF) content.append(convertedPDF) # print(len(content)) # print(content) for idx, val in enumerate(content): val = re.sub('\n+','\n', val) val = re.sub('\n +', '', val) val = val.replace(' ', '') content[idx] = val with open(out_path, 'wb') as f: f.write(''.join(content).encode('utf-8')) output.close() converter.close() f.close() return True def _get_cleaned_txt(txtPath, out_path): ''' 对Txt文件进行内容格式清洗(暂时仅供测试) :param txtPath: 输入的txt文件的完整路径 :param out_path: 输出的txt文件的完整路径 :return: bool ''' with open(txtPath, 'rb')as f: content = f.read().decode('utf-8') p = re.compile(r'(?<=##)\S.+(?=##)|[\u4e00-\u9fff+\u3002\uFF0C]') x = ''.join(re.findall(p, content)) final_result = re.sub(u"[\uFF0C|\u3002|\u002B]{2,}", "", x) with open(out_path, "w")as txtPath: txtPath.write(final_result) # print(final_result) return True def get_docx_from_pdf(pdf_path, out_path): ''' 读取Pdf文件,将其转换为Docx格式并存在本地 :param pdf_path: 输入的pdf公告文件的完整路径 :param out_path: 输出的中间docx结果文件的完整路径 :return: bool ''' try: cv = Converter(pdf_path) cv.convert(out_path) except Exception: return False for p in cv.pages: if not p.finalized: cv.close() return False cv.close() return True def _find_key_indexs(str, key): ''' 给定一个父字符串和子串,在父串中查找子串的所有索引位置,并返回一个包含所有下标的列表 :param str: 父字符串 :param key: 子字符串 :return: list ''' lstKey = [] countStr = str.count(key) if countStr < 1: return [] elif countStr == 1: indexKey = str.find(key) return [indexKey] else: indexKey = str.find(key) lstKey.append(indexKey) while countStr > 1: str_new = str[indexKey + 1:len(str) + 1] indexKey_new = str_new.find(key) indexKey = indexKey + 1 + indexKey_new lstKey.append(indexKey) countStr -= 1 lstKey.sort(reverse=True) return lstKey def _insert_char_into_str(str, idx, char): ''' 给定一个父字符串、下标位置、子串,在父串中的下标位置插入子串,并返回一个新的字符串 :param str: 父字符串 :param idx: 插入位置索引 :param char: 子字符串 :return: str ''' tmp = list(str) tmp.insert(idx, char) return ''.join(tmp) def _is_chinese(str): ''' 给定一个字符串,判断该字符串是否全是中文 :param str: 字符串 :return: bool ''' for ch in str: if '\u4e00' <= ch <= '\u9fff': return True return False def _get_table_row_feat(str): ''' 给定一个空格分割的表格行字符串,计算它的特征(01组成的字符串) :param str: 字符串 :return: 字符串 ''' s = str.split() r = '' for c in s: try: _ = float(c) r += '1' except Exception: r += '0' return r def _check_if_include_first_proper(s, corpus): ''' 检查字符串s中是否包含语料列表first_corpus中的某一内容 :param s: 字符串 :param corpus: 字符串列表 :return: [bool, str] ''' for i in corpus: if i in s: return [True, i] return [False, ''] def _check_if_include_second_proper(s, corpus): ''' 检查字符串s中是否包含语料列表first_corpus中的某一内容 :param s: 字符串 :param corpus: 字符串列表 :return: list ''' res = [] for i in corpus: if i in s: res.append([True, i]) else: res.append([False, i]) return res def _match_and_insert(string, pattern, substring): ''' 匹配string字符串中的pattern,计算所有pattern在string中的首个字符索引位置,并在string从后向前插入substring至这些位置 :param string: 待匹配的字符串 :param pattern: 匹配模式 :param substring: 待插入的子字符串 :return: 插入后的字符串 ''' idx_list = [] for j in re.finditer(pattern, string): idx_list.append(j.span()[0]) # 将匹配模式的所有索引下标进行倒序排列,方便后续插入end_flag idx_list.sort(reverse=True) if idx_list != []: for k in idx_list: if k > 0 and string[k-1] != '“': string = _insert_char_into_str(string, k, substring) return string def _match_and_delete(string, pattern): ''' 匹配string字符串中的pattern,计算pattern在string中的首个字符索引位置,删除该索引前2个位置的换行符\n :param string: 待匹配的字符串 :param pattern: 匹配模式 :return: 删除'\n\n'子字符串后的字符串 ''' matcher = re.search(pattern, string) if matcher: k = matcher.span()[0] if k >= 2 and string[k-1] == '\n' and string[k-2] == '\n': string = string[:k-2] + string[k:] return string def get_txt_from_docx(doc): ''' 读取Docx文件中每个自然行的材料内容 :param doc: 一个Document对象实例 :param out_path: 输出的txt结果文件的完整路径 :return: bool(转换是否成功), list(格式化修正后的文本列表) ''' # 公告中的编号符号的集合,例如:'(1)', '1、' NUMBER_1 = '123456789一二三四五六七八九十' # 数字与大小写的英文字母的集合 NUMBER_2 = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz' # 提取docx文件中的单行文本至初始文本列表paras中 paras = [para.text+'\n' for i, para in enumerate(doc.paragraphs)] # 存储首轮格式化修正文本的列表new_paras new_paras = [] # new_paras中各字符串的长度 new_paras_len_cnt = [] try: # 遍历paras文本列表中的各字符串 for val in paras: # 若该行文本为空,或者是页面号,或者是’单位‘,则跳过 if val == '\n' or re.search('^[0-9]+ \n$', val) or val[:2] == '单位': continue # 否则,将该行文本添加进new_paras文本列表中 new_paras.append(val.lstrip()) # 计算该行文本的字符长度 new_paras_len_cnt.append(len(val)) # 正文标识符,指示正文开始的行号 line_mark = 0 # 遍历new_paras的前10行,目的是处理公告的头部信息,例如:证券代码、证券简称、公告编号、公告标题等 for i, val in enumerate(new_paras[:10]): # 如果出现制表符,或者空格数大于1,则仅保留一个空格 if '\t' in val or val.count(' ') > 2: new_paras[i] = ' '.join(val.split()) + '\n' if '证券代码:' in new_paras[i]: continue # 如果行末尾是'有限公司',则去掉可能的空格 if val.replace(' ', '')[-5:] == '有限公司\n': new_paras[i] = val.replace(' ', '') continue # 循环检查下一行,直到行末尾是'公告',或'股东大会的通知',或'董事意见函'(应与mongo.py中process_files()相对应) # 此时认为抵达正文的起始位置,后续处理将从第line_mark行开始 if _check_ann_title_processable(val.replace(' ', ''), exp=1): new_paras[i] = val.replace(' ', '') line_mark = i + 1 break else: new_paras[i] = val.replace('\n', '').replace(' ', '') # 计算new_paras中各行的平均字符长度 mean_len = sum(new_paras_len_cnt)//len(new_paras_len_cnt) # 遍历new_paras for i, _ in enumerate(new_paras): # 如果是正文部分 if i >= line_mark: # 去掉该行中的一些符号(空格、特殊符号、英文逗号) new_paras[i] = new_paras[i]\ .replace(' ', '')\ .replace(' ', '')\ .replace('', '')\ .replace(',', '') # 如果该行长度大于平均长度,并且(下一行首部为非编号,或者下一行首部是编号且以换行结尾),则认为该行在段落中,故去掉该行换行符 if i < len(new_paras)-1 and \ len(new_paras[i]) >= mean_len and \ ((new_paras[i + 1].replace('(','').replace('(','')[0] not in NUMBER_1) or new_paras[i + 1][-1] == '\n'): new_paras[i] = new_paras[i].replace('\n', '') # 如果该行的下一行长度大于等于3,并且该行的下一行首部是非编号,且不包含关键字'年',则认为该行在段落中,故去掉该行换行符 if i < len(new_paras)-2 and \ len(new_paras[i + 1]) >= 3 and \ new_paras[i + 1].replace('(','').replace('(','')[0] in NUMBER_2 and \ (not '.' in new_paras[i+1][:3]) and \ (not '、' in new_paras[i+1][:3]) and \ (not '年' in new_paras[i+1]): new_paras[i] = new_paras[i].replace('\n', '') # 查找该行中,中文冒号符号':'的所有索引位置j for j in _find_key_indexs(new_paras[i], ':'): # 如果该行的j+1位置不是换行,并且该行不包括中文括号'()'与书名号'《》',且该行不包含一级专用名词,则认为该行应独立成 # 段落,故在该行j+1位置插入换行符 # 注:执行插入操作时,若有多个位置进行插入,则总是从后往前插入,确保插入后索引仍然正确 if j < len(new_paras[i])-1 and new_paras[i][j+1] != '\n' and \ ('(' not in new_paras[i]) and ('《' not in new_paras[i]) and \ (')' not in new_paras[i]) and ('》' not in new_paras[i]) and \ (not _check_if_include_first_proper(new_paras[i], FIRST_PROPER_CORPUS)[0]): new_paras[i] = _insert_char_into_str(new_paras[i], j+1, '\n') # 查找该行中,中文左括号符号'('的所有索引位置j for j in _find_key_indexs(new_paras[i], '('): # 如果该行的j+1位置是编号,并且该行的上一行末尾不是换行,且该行j-1位置为非中文和非书名号,则认为该行的下一行应独立成 # 段落,故在该行j位置插入换行符 if new_paras[i][j+1] in NUMBER_1 and new_paras[i-1][-1] != '\n' and \ (not _is_chinese(new_paras[i][j-1])) and new_paras[i][j-1] != '》': new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n') # 查找该行中,英文左括号符号'('的所有索引位置j for j in _find_key_indexs(new_paras[i], '('): # 如果该行的j+1位置是编号,并且该行的上一行末尾不是换行,且该行j-1位置为非中文和非书名号,则认为该行的下一行应独立成 # 段落,故在该行j位置插入换行符 if new_paras[i][j + 1] in NUMBER_1 and new_paras[i - 1][-1] != '\n' and \ (not _is_chinese(new_paras[i][j - 1])) and new_paras[i][j - 1] != '》': new_paras[i] = _insert_char_into_str(new_paras[i], j, '\n') # 查找该行中,中文顿号符号'、'的所有索引位置j for j in _find_key_indexs(new_paras[i], '、'): # 如果该行的j-1位置是编号(不超过9或十),并且该行的上一行末尾不是换行,则认为该行的下一行应独立成段落,故在该行j-1 # 位置插入换行符 if (j-2) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] not in NUMBER_1 \ and new_paras[i][j-2] in '。;.;' and new_paras[i-1][-1] != '\n': new_paras[i] = _insert_char_into_str(new_paras[i], j-1, '\n') continue # 如果该行的j-1与j-2位置都是编号(超过9或十),并且该行的上一行末尾不是换行,则认为该行的下一行应独立成段落,故在该 # 行j-2位置插入换行符 if (j-3) < len(new_paras[i]) and new_paras[i][j-1] in NUMBER_1 and new_paras[i][j-2] in NUMBER_1 \ and new_paras[i][j-3] in '。;.;' and new_paras[i-1][-1] != '\n': new_paras[i] = _insert_char_into_str(new_paras[i], j-2, '\n') # 修正某些情形下'特此公告。'未自成段落的情况 if new_paras[i] == '特此公告。\n': if new_paras[i-1][-1] != '\n': new_paras[i] = '\n特此公告。\n' if new_paras[i+1][-1] != '\n': new_paras[i+1] += '\n' # 如果该行的下一行中含有独立的一级专用名词,则认为该行的下一行应独立成段落,故在该行的末尾插入缺省的换行符 if (i+1) < len(new_paras): tmp_flag, tmp_str = _check_if_include_first_proper(new_paras[i+1], FIRST_PROPER_CORPUS) if tmp_flag: tmp_idx = new_paras[i+1].index(tmp_str) - 1 if tmp_idx >= 0 and new_paras[i+1][tmp_idx] != '(': if new_paras[i][-1] != '\n': new_paras[i] += '\n' # 将new_paras中的若干字符串连接成为一个字符串str_sum str_sum = ''.join(new_paras) # 将str_num字符串按照换行符进行分割,生成次轮格式化修正文本的列表final_paras final_paras = str_sum.split('\n') # 遍历final_paras for i, val in enumerate(final_paras): # 每一自然段落的末尾符号,这里为两个换行符,便于清晰地查看最终生成的txt文本 end_flag = '\n\n' # 给final_paras中的每一行添加一个末尾符号 final_paras[i] += end_flag # 在该行中查找匹配到的所有形如: '(1)', '(2)' 的模式 # 此处认为该模式的起始位置应独立成段落,例如'\n(1)XXX...', '\n(1)XXX...' if '(' in final_paras[i]: final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag) # 在该行中,查找所有的中文左括号符号'('与中文右括号符号')',计算它们各自的数量 # 如果两符号的数量不相等,则认为该行处在段落中,故应去掉该行末尾的end_flag if len(_find_key_indexs(final_paras[i], '(')) != len(_find_key_indexs(final_paras[i], ')')): final_paras[i] = final_paras[i][:-2] # 将final_paras中的若干字符串连接成为一个字符串str_sum str_sum = ''.join(final_paras) # 将str_num字符串按照换行符进行分割,生成终轮格式化修正文本的列表,覆盖掉之前的final_paras final_paras = str_sum.split('\n\n') # 遍历final_paras for i, val in enumerate(final_paras): # 每一自然段落的末尾符号,这里为两个换行符,便于清晰地查看最终生成的txt文本 end_flag = '\n\n' # 给final_paras中的每一行再次添加上一个末尾符号 final_paras[i] += end_flag # 修正某些情形下'重要内容提示:'未自成段落的情况 if '重要内容提示:' in final_paras[i]: idx = final_paras[i].index('重要内容提示:') if final_paras[i][idx+7] != '\n': final_paras[i] = _insert_char_into_str(final_paras[i], idx+7, '\n\n') if idx > 0: if final_paras[i][idx-1] != '\n': final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') # 修正某些情形下'表决结果:'及其跟随的结果未自成段落的情况 if '表决结果:' in final_paras[i]: if final_paras[i][:5] == '表决结果:': final_paras[i] = final_paras[i][:-2] elif final_paras[i][-7:] == '表决结果:\n\n': idx = final_paras[i].find('表决结果:') final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') final_paras[i] = final_paras[i][:-2] else: idx = final_paras[i].find('表决结果:') final_paras[i] = _insert_char_into_str(final_paras[i], idx, '\n\n') # 检查该行中的所有二级专用名词(指一级专用名词后所在段落中出现的专用名词,不应独立成段落) for is_include, s_include in _check_if_include_second_proper(final_paras[i], SECOND_PROPER_CORPUS): if is_include: # 如果该行中含有某一二级专用名词,并且名词后有换行符,则去掉该行的换行符 if final_paras[i][final_paras[i].index(s_include)+len(s_include)] == '\n': final_paras[i] = final_paras[i].replace('\n', '') # 在该行中查找匹配到的所有形如: '(一)', '(1)', '(一)', '(1)' 的模式 # 此处认为该模式的起始位置应独立成段落,例如'\n(一)XXX...', '\n(1)XXX...' if '(' in final_paras[i]: final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}[\)\)]+', end_flag) final_paras[i] = _match_and_insert(final_paras[i], '[\(\(]+[0-9]{1,2}[\)\)]+', end_flag) # 在该行中查找匹配到的所有形如: '一、', '1、' 的模式 # 此处认为该模式的起始位置应独立成段落,例如'\n一、XXX...', '\n1、XXX...' if '、' in final_paras[i]: final_paras[i] = _match_and_insert(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、', end_flag) final_paras[i] = _match_and_insert(final_paras[i], '[0-9]{1,2}、', end_flag) # 这里对形如: 'XXX第一、二组、三组的XXX' 的特例进行处理,即去掉前序错误添加的换行符 final_paras[i] = _match_and_delete(final_paras[i], '[\u96f6\u4e00\u4e8c\u4e09\u56db\u4e94\u516d\u4e03\u516b\u4e5d\u5341]{1,2}、[\S]+、[\S]+') final_paras[i] = _match_and_delete(final_paras[i], '[0-9]+、[0-9]+') # 再次检查'●'符号,若非独立成行则在该符号前添加换行符 for j in _find_key_indexs(final_paras[i], '●'): if j > 0: final_paras[i] = _insert_char_into_str(final_paras[i], j, end_flag) # 如果上述处理流程出现任何异常抛掷,则返回(False, []),标志转换失败 except Exception: return False, [] # 返回(True, final_paras),标志转换成功 return True, final_paras def get_table_from_docx(doc, txt, out_path="", is_out_flag=False): ''' 读取Docx文件中每个表格的材料内容 :param doc: 一个Document对象实例 :param txt: 一个字符串列表,包含PDF的正文文本内容 :param out_path: 输出的csv结果文件的完整路径 :param is_out_flag: 是否输出csv结果文件,默认不输出 :return: list, list ''' data = [] table_txt = [] attach_txt = {} for table in doc.tables[:]: table_txt.append('-----表格-----\n') for i, row in enumerate(table.rows[:]): row_content = [] for cell in row.cells[:]: c = cell.text new_c = c.replace('\n', '').replace(' ','').replace('\t','').replace(',','') row_content.append(new_c) if row_content == []: continue if '本公司' in row_content[0]: tmp = '' for line in row_content: tmp += line.strip() tmp += '\n\n' attach_txt['000'] = tmp continue if '证券代码' in row_content[0]: tmp = '^' for line in row_content: tmp += line.strip()+' ' tmp += '$\n' txt.insert(tmp, 0) continue data.append(row_content) new_row = '^' + '\t'.join(row_content) + '$\n' if new_row.replace('\t','') != '^$\n': table_txt.append(new_row) data.append('-----表格-----\n') table_txt.append('-----表格-----\n') flag = False for i, val in enumerate(table_txt): if val == '-----表格-----\n': if not flag: flag = True else: table_txt[i] = '^$\n' else: flag = False table_txt = list(filter(lambda x: x != '^$\n', table_txt)) for i, val in enumerate(table_txt): if val == '-----表格-----\n' and (i > 0) and (i < len(table_txt)-1): feat1 = _get_table_row_feat(table_txt[i-1].replace('\n', '')) feat2 = _get_table_row_feat(table_txt[i+1].replace('\n', '')) if feat1 == feat2: table_txt[i] = '^$\n' if len(table_txt) == 1 and table_txt[0] == '-----表格-----\n': table_txt[0] = '^$\n' for i, val in enumerate(table_txt): if val == '-----表格-----': continue if val == '^$\n': table_txt[i] = '' continue table_txt[i] = val[1:][:-2] + '\n' txt.extend(table_txt) if is_out_flag: f = open(out_path, 'w+', newline='') writer = csv.writer(f) for i, val in enumerate(data): if i == 0 and val == '\n': continue writer.writerow(val) f.close() return txt, attach_txt def refine_pdf2txt_list_result(txt, att_txt): ''' 对txt字符串列表进行最后的校对,还原或附加误识别为表格的正文内容 :param txt: 一个字符串列表,包含PDF的正文文本内容 :param att_txt: 一些误识别为表格的正文内容 :return: list ''' for id, val in enumerate(txt): if id > 10: break else: if val[-6:-2] == '有限公司': txt[id] = val[:-2] continue if '000' in att_txt and _check_ann_title_processable(val, exp=2): txt.insert(id+1, att_txt['000']) break return txt def write_pdf2txt_list_result(out_path, txt, out_mode_flag=True): ''' 将txt字符串列表写为txt文本文件 :param out_path: 生成的txt文本文件的路径 :param txt: 一个字符串列表,包含PDF的正文和表格 :param out_mode_flag: 是否添加段头标识'^'和段尾标识'$' :return: bool ''' with open(out_path, "w", encoding='utf-8') as f: if not out_mode_flag: for line in txt: if line != '^$\n': f.write(line) else: strs = ''.join(txt) paras = strs.split('\n') for line in paras: if line != '': f.write('^' + line + '$\n') return True def get_pdf2txt_str_result(txt, out_mode_flag=True): ''' 将txt字符串列表内元素拼接为完整的txt内容 :param txt: 一个字符串列表,包含PDF的正文和表格 :param out_mode_flag: 是否添加段头标识'^'和段尾标识'$' :return: str ''' txt_str = "" for line in txt: if not out_mode_flag: for line in txt: if line != '^$\n': txt_str += line else: strs = ''.join(txt) paras = strs.split('\n') for line in paras: if line != '': txt_str += ('^' + line + '$\n') return txt_str def find_all_local_file(base, extension): ''' 找出给定目录下所有的指定后缀格式的文件路径 :param base: 目录路径 :param extension: 后缀格式,例如: '.pdf' :return: str ''' for root, ds, fs in os.walk(base): for f in fs: if f.endswith(extension.lower()) or f.endswith(extension.upper()): fullname = os.path.join(root, f).replace('/', '//').replace('\\', '//') yield fullname