cpu_tracs / utils /preprocessing.py
Duplicate from GIZ/cpu-demo
from haystack.nodes.base import BaseComponent
from haystack.schema import Document
from haystack.nodes import PDFToTextOCRConverter, PDFToTextConverter
from haystack.nodes import TextConverter, DocxToTextConverter, PreProcessor
from typing import Callable, Dict, List, Optional, Text, Tuple, Union
from typing_extensions import Literal
import pandas as pd
import logging
import re
import string
from haystack.pipelines import Pipeline
def useOCR(file_path: str)-> Text:
Converts image pdfs into text, Using the Farm-haystack[OCR]
file_path: file_path of uploade file, returned by add_upload function in
Returns the text file as string.
converter = PDFToTextOCRConverter(remove_numeric_tables=True,
docs = converter.convert(file_path=file_path, meta=None)
return docs[0].content
class FileConverter(BaseComponent):
Wrapper class to convert uploaded document into text by calling appropriate
Converter class, will use internally haystack PDFToTextOCR in case of image
pdf. Cannot use the FileClassifier from haystack as its doesnt has any
label/output class for image.
1. https://haystack.deepset.ai/pipeline_nodes/custom-nodes
2. https://docs.haystack.deepset.ai/docs/file_converters
3. https://github.com/deepset-ai/haystack/tree/main/haystack/nodes/file_converter
4. https://docs.haystack.deepset.ai/reference/file-converters-api
outgoing_edges = 1
def run(self, file_name: str , file_path: str, encoding: Optional[str]=None,
id_hash_keys: Optional[List[str]] = None,
) -> Tuple[dict,str]:
""" this is required method to invoke the component in
the pipeline implementation.
file_name: name of file
file_path: file_path of uploade file, returned by add_upload function in
See the links provided in Class docstring/description to see other params
output: dictionary, with key as identifier and value could be anything
we need to return. In this case its the List of Hasyatck Document
output_1: As there is only one outgoing edge, we pass 'output_1' string
if file_name.endswith('.pdf'):
converter = PDFToTextConverter(remove_numeric_tables=True)
if file_name.endswith('.txt'):
converter = TextConverter(remove_numeric_tables=True)
if file_name.endswith('.docx'):
converter = DocxToTextConverter()
except Exception as e:
documents = []
# encoding is empty, probably should be utf-8
document = converter.convert(
file_path=file_path, meta=None,
encoding=encoding, id_hash_keys=id_hash_keys
text = document.content
# in case of scanned/images only PDF the content might contain only
# the page separator (\f or \x0c). We check if is so and use
# use the OCR to get the text.
filtered = re.sub(r'\x0c', '', text)
if filtered == "":
logging.info("Using OCR")
text = useOCR(file_path)
meta={"name": file_name},
logging.info('file conversion succesful')
output = {'documents': documents}
return output, 'output_1'
def run_batch():
we dont have requirement to process the multiple files in one go
therefore nothing here, however to use the custom node we need to have
this method for the class.
def basic(s:str, remove_punc:bool = False):
Performs basic cleaning of text.
s: string to be processed
removePunc: to remove all Punctuation including ',' and '.' or not
Returns: processed string: see comments in the source code for more info
# Remove URLs
s = re.sub(r'^https?:\/\/.*[\r\n]*', ' ', s, flags=re.MULTILINE)
s = re.sub(r"http\S+", " ", s)
# Remove new line characters
s = re.sub('\n', ' ', s)
# Remove punctuations
if remove_punc == True:
translator = str.maketrans(' ', ' ', string.punctuation)
s = s.translate(translator)
# Remove distracting single quotes and dotted pattern
s = re.sub("\'", " ", s)
s = s.replace("..","")
return s.strip()
def paraLengthCheck(paraList, max_len = 100):
There are cases where preprocessor cannot respect word limit, when using
respect sentence boundary flag due to missing sentence boundaries.
Therefore we run one more round of split here for those paragraphs
paraList : list of paragraphs/text
max_len : max length to be respected by sentences which bypassed
preprocessor strategy
new_para_list = []
for passage in paraList:
# check if para exceeds words limit
if len(passage.content.split()) > max_len:
# we might need few iterations example if para = 512 tokens
# we need to iterate 5 times to reduce para to size limit of '100'
iterations = int(len(passage.content.split())/max_len)
for i in range(iterations):
temp = " ".join(passage.content.split()[max_len*i:max_len*(i+1)])
temp = " ".join(passage.content.split()[max_len*(i+1):])
# paragraphs which dont need any splitting
new_para_list.append((passage.content, passage.meta['page']))
logging.info("New paragraphs length {}".format(len(new_para_list)))
return new_para_list
class UdfPreProcessor(BaseComponent):
class to preprocess the document returned by FileConverter. It will check
for splitting strategy and splits the document by word or sentences and then
synthetically create the paragraphs.
1. https://docs.haystack.deepset.ai/docs/preprocessor
2. https://docs.haystack.deepset.ai/reference/preprocessor-api
3. https://github.com/deepset-ai/haystack/tree/main/haystack/nodes/preprocessor
outgoing_edges = 1
def run(self, documents:List[Document], remove_punc:bool=False,
split_by: Literal["sentence", "word"] = 'sentence',
split_length:int = 2, split_respect_sentence_boundary:bool = False,
split_overlap:int = 0):
""" this is required method to invoke the component in
the pipeline implementation.
documents: documents from the output dictionary returned by Fileconverter
remove_punc: to remove all Punctuation including ',' and '.' or not
split_by: document splitting strategy either as word or sentence
split_length: when synthetically creating the paragrpahs from document,
it defines the length of paragraph.
split_respect_sentence_boundary: Used when using 'word' strategy for
splititng of text.
split_overlap: Number of words or sentences that overlap when creating
the paragraphs. This is done as one sentence or 'some words' make sense
when read in together with others. Therefore the overlap is used.
output: dictionary, with key as identifier and value could be anything
we need to return. In this case the output will contain 4 objects
the paragraphs text list as List, Haystack document, Dataframe and
one raw text file.
output_1: As there is only one outgoing edge, we pass 'output_1' string
if split_by == 'sentence':
split_respect_sentence_boundary = False
split_respect_sentence_boundary = split_respect_sentence_boundary
preprocessor = PreProcessor(
split_respect_sentence_boundary= split_respect_sentence_boundary,
# will add page number only in case of PDF not for text/docx file.
for i in documents:
# # basic cleaning before passing it to preprocessor.
# i = basic(i)
docs_processed = preprocessor.process([i])
for item in docs_processed:
item.content = basic(item.content, remove_punc= remove_punc)
df = pd.DataFrame(docs_processed)
all_text = " ".join(df.content.to_list())
para_list = df.content.to_list()
logging.info('document split into {} paragraphs'.format(len(para_list)))
output = {'documents': docs_processed,
'dataframe': df,
'text': all_text,
'paraList': para_list
return output, "output_1"
def run_batch():
we dont have requirement to process the multiple files in one go
therefore nothing here, however to use the custom node we need to have
this method for the class.
def processingpipeline():
Returns the preprocessing pipeline. Will use FileConverter and UdfPreProcesor
from utils.preprocessing
preprocessing_pipeline = Pipeline()
file_converter = FileConverter()
custom_preprocessor = UdfPreProcessor()
name="FileConverter", inputs=["File"])
preprocessing_pipeline.add_node(component = custom_preprocessor,
name ='UdfPreProcessor', inputs=["FileConverter"])
return preprocessing_pipeline