import asyncio import string, re import pandas as pd from aiogoogletrans import Translator from spellchecker import SpellChecker from nltk.tokenize import RegexpTokenizer class Normalizer: """ A class for text normalization tasks such as converting to lowercase, removing whitespace, punctuation, HTML tags, emojis, etc. """ def __init__(self): """ Initializes the Normalizer object. """ # Letter variations dictionary self._letter_variations = { "aàáâãäåāăą": "a", "cçćĉċč": "c", "eèéêëēĕėęě": "e", "gğ": "g", "hħĥ": "h", "iìíîïīĭįı": "i", "jĵ": "j", "nñńņň": "n", "oòóôõöøōŏő": "o", "ś": "s", "ß": "ss", "uùúûüūŭůűų": "u", "yýÿŷ": "y", "æ": "ae", "œ": "oe", } # Generate regex pattern including single characters pattern_parts = [] for variation in self._letter_variations.keys(): pattern_parts.append(variation) for char in variation: if len(char) == 1: pattern_parts.append(re.escape(char)) self._pattern = "|".join(pattern_parts) # RegexpTokenizer self._regexp = RegexpTokenizer("[\w']+") # Dictionary of acronyms acronyms_url = "https://raw.githubusercontent.com/sugatagh/E-commerce-Text-Classification/main/JSON/english_acronyms.json" self._acronyms_dict = pd.read_json(acronyms_url, typ="series") self._acronyms_list = list(self._acronyms_dict.keys()) # Dictionary of contractions contractions_url = "https://raw.githubusercontent.com/sugatagh/E-commerce-Text-Classification/main/JSON/english_contractions.json" self._contractions_dict = pd.read_json(contractions_url, typ="series") self._contractions_list = list(self._contractions_dict.keys()) # Initialize translator for language detection self._translator = Translator() # Converting to lowercase def _convert_to_lowercase(self, text): """ Convert the input text to lowercase. Args: text (str): The input text to be converted. Returns: str: The input text converted to lowercase. """ try: return text.lower() except Exception as e: print(f"An error occurred during lowercase conversion: {e}") return text # Removing whitespaces def _remove_whitespace(self, text): """ Remove leading and trailing whitespaces from the input text. Args: text (str): The input text to be processed. Returns: str: The input text with leading and trailing whitespaces removed. """ try: return text.strip() except Exception as e: print(f"An error occurred during whitespace removal: {e}") return text # Removing punctuations def _remove_punctuation(self, text): """ Remove punctuation marks from the input text, except for apostrophes and percent signs. Args: text (str): The input text to be processed. Returns: str: The input text with punctuation marks removed. """ try: punct_str = string.punctuation punct_str = punct_str.replace("'", "").replace( "%", "" ) # discarding apostrophe from the string to keep the contractions intact return text.translate(str.maketrans("", "", punct_str)) except Exception as e: print(f"An error occurred during punctuation removal: {e}") return text # Removing HTML tags def _remove_html(self, text): """ Remove HTML tags from the input text. Args: text (str): The input text containing HTML tags. Returns: str: The input text with HTML tags removed. """ try: html = re.compile(r"<.*?>") return html.sub(r"", text) except Exception as e: print(f"An error occurred during HTML tag removal: {e}") return text # Removing emojis def _remove_emoji(self, text): """ Remove emojis from the input text. Args: text (str): The input text containing emojis. Returns: str: The input text with emojis removed. """ try: emoji_pattern = re.compile( "[" "\U0001F600-\U0001F64F" # emoticons "\U0001F300-\U0001F5FF" # symbols & pictographs "\U0001F680-\U0001F6FF" # transport & map symbols "\U0001F1E0-\U0001F1FF" # flags (iOS) "\U00002702-\U000027B0" "\U000024C2-\U0001F251" "]+", flags=re.UNICODE, ) return emoji_pattern.sub(r"", text) except Exception as e: print(f"An error occurred during emoji removal: {e}") return text # Removing other unicode characters def _remove_http(self, text): """ Remove HTTP links from the input text. Args: text (str): The input text containing HTTP links. Returns: str: The input text with HTTP links removed. """ try: http = "https?://\S+|www\.\S+" # matching strings beginning with http (but not just "http") pattern = r"({})".format(http) # creating pattern return re.sub(pattern, "", text) except Exception as e: print(f"An error occurred during HTTP link removal: {e}") return text # Function to convert contractions in a text def _convert_acronyms(self, text): """ Convert acronyms in the text. Example of acronyms dictionary: {"LOL": "laugh out loud", "BRB": "be right back", "IDK": "I don't know"} Args: text (str): The input text containing acronyms. Returns: str: The input text with acronyms expanded. """ try: words = [] for word in self._regexp.tokenize(text): if word in self._acronyms_list: words = words + self._acronyms_dict[word].split() else: words = words + word.split() text_converted = " ".join(words) return text_converted except Exception as e: print(f"An error occurred during acronym conversion: {e}") return text # Function to convert contractions in a text def _convert_contractions(self, text): """ Convert contractions in the text. Example of contractions dictionary: {"I'm": "I am", "he's": "he is", "won't": "will not"} Args: text (str): The input text containing contractions. Returns: str: The input text with contractions expanded. """ try: words = [] for word in self._regexp.tokenize(text): if word in self._contractions_list: words = words + self._contractions_dict[word].split() else: words = words + word.split() text_converted = " ".join(words) return text_converted except Exception as e: print(f"An error occurred during contraction conversion: {e}") return text def _fix_letter_variations(self, query): """ Replace variations of letters with their original counterparts. Args: query (str): The input query containing variations of letters. Returns: str: The normalized query with variations replaced by their original counterparts. """ def replace_variation(match): """ Helper function to replace variations with original counterparts. Args: match (re.Match): The match object representing the found variation. Returns: str: The original character if match is not found in letter_variations, otherwise its original counterpart. """ for key in self._letter_variations.keys(): if match.group(0) in key: return self._letter_variations[key] return match.group(0) try: # Fixing the query normalized_query = re.sub(self._pattern, replace_variation, query) return normalized_query except Exception as e: print(f"An error occurred during letter variation fixing: {e}") return query def _normalize_query(self, word: str): """ Clean the input text by performing the following steps: 1. Remove non-alphabetic characters and keep specific characters like spaces, dashes, asterisks, and Arabic characters. 2. Remove non-alphabetic characters between alphabetic characters. 3. Remove repeating characters. 4. Remove preceding numbers (e.g. 123phone -> phone). 5. Add space between numbers and letters. 6. Remove extra spaces. Args: word (str): The input text to be cleaned. Returns: str: The cleaned text. """ try: # Remove non-alphabetic characters and keep specific characters like spaces, dashes, asterisks, and Arabic characters word = re.sub( r"[^A-Za-z\s\-%*.$\u0621-\u064A0-9\u00E4\u00F6\u00FC\u00C4\u00D6\u00DC\u00df]", "", word, flags=re.UNICODE, ) # Remove non-alphabetic characters between alphabetic characters clean_text = re.sub( r"(?<=[a-zA-Z])([^A-Za-z\u0621-\u064A\s]+)(?=[a-zA-Z])", "", word ) # Remove non-alphabetic characters between alphabetic characters clean_text = re.sub(r"(?<=[a-zA-Z])([^A-Za-z\s]+)(?=[a-zA-Z])", "", clean_text) # Remove non-alphabetic characters between Arabic characters clean_text = re.sub( r"(?<=[\u0621-\u064A])([^\u0621-\u064A\s]+)(?=[\u0621-\u064A])", "", clean_text, ) # Remove repeating characters clean_text = re.sub(r"(.)(\1+)", r"\1\1", clean_text) # Remove preceding non latin alpha (e.g. صصphone -> phone) clean_text = re.sub(r"([\u0621-\u064A]+)([a-zA-Z]+)", r"\2", clean_text) # Add space between numbers and letters clean_text = re.sub(r"([a-zA-Z]+)([\u0621-\u064A]+)", r"\1", clean_text) # Remove preceding latin alpha (from arabic words) (e.g. phoneصص -> phone) clean_text = re.sub(r"([a-zA-Z]+)([\u0621-\u064A]+)", r"\2", clean_text) # Add space between numbers and letters clean_text = re.sub(r"([\u0621-\u064A]+)([a-zA-Z]+)", r"\1", clean_text) # Remove preceding numbers (e.g. 123phone -> phone) clean_text = re.sub(r"(\d+)([a-zA-Z\u0621-\u064A]+)", r"\1 \2", clean_text) # Add space between numbers and letters clean_text = re.sub(r"([a-zA-Z\u0621-\u064A]+)(\d+)", r"\1 \2", clean_text) # Remove extra spaces clean_text = re.sub(r"\s+", " ", clean_text) return clean_text.strip() except Exception as e: print(f"An error occurred during query normalization: {e}") return word def keep_one_char(self, word: str) -> str: """ Keep only one occurrence of consecutive repeated characters in the input word. Args: - word (str): The input word to modify. Returns: - str: The modified word with only one occurrence of consecutive repeated characters. """ try: return re.sub(r"(.)(\1+)", r"\1", word) except Exception as e: print(f"An error occurred during character repetition removal: {e}") return word def translate_text(self, text: str) -> str: """ Translate the given text to English and return the translated text. Args: - text (str): The text to translate. Returns: - str: The translated text. """ try: loop = asyncio.get_event_loop() translated_text = ( loop.run_until_complete(self._translator.translate(text)) .text.lower() .strip() ) except Exception as e: print(f"Text Translation failed: {e}") translated_text = ( text.lower().strip() ) # Use original text if translation fails return translated_text def check_spelling(self, query: str) -> str: """ Check the spelling of the input query and return the corrected version. Args: - query (str): The input query to check its spelling. Returns: - str: The corrected query. """ try: # Detect the language of the input query using Google Translate API # input_language = self._translator.detect(query) input_language = "en" if query.encode().isalpha() else "ar" # Initialize SpellChecker with detected language, fallback to English if language detection fails try: spell_checker = SpellChecker(language=input_language) except: spell_checker = SpellChecker(language="en") # Initialize an empty string to store the corrected query result_query = "" # Iterate through each word in the query for word in query.split(" "): # Get the corrected version of the word corrected_word = spell_checker.correction(word) # If the corrected word is not found, try correcting with keeping one character if corrected_word is None: corrected_word = spell_checker.correction(self.keep_one_char(word)) # If still not found, keep the original word if corrected_word is None: result_query += word + " " else: result_query += corrected_word + " " else: result_query += corrected_word + " " # Remove trailing whitespace and return the corrected query return result_query.strip() except Exception as e: print(f"An error occurred during spelling check: {e}") return query def clean_text(self, text): """ Normalize the input text. Args: text (str): The input text to be normalized. Returns: str: The normalized text. """ try: # Convert text to lowercase text = self._convert_to_lowercase(text) # Remove whitespace text = self._remove_whitespace(text) # Convert text to one line text = re.sub("\n", " ", text) # Remove square brackets text = re.sub("\[.*?\]", "", text) # Remove HTTP links text = self._remove_http(text) # Remove HTML tags text = self._remove_html(text) # Remove emojis text = self._remove_emoji(text) # Fix letter variations text = self._fix_letter_variations(text) # Normalize queries text = self._normalize_query(text) return text except Exception as e: print(f"An error occurred during text cleaning: {e}") return text