Spaces:
Runtime error
Runtime error
import asyncio | |
import string, re | |
import pandas as pd | |
from aiogoogletrans import Translator | |
from spellchecker import SpellChecker | |
from nltk.tokenize import RegexpTokenizer | |
class Normalizer: | |
""" | |
A class for text normalization tasks such as converting to lowercase, | |
removing whitespace, punctuation, HTML tags, emojis, etc. | |
""" | |
def __init__(self): | |
""" | |
Initializes the Normalizer object. | |
""" | |
# Letter variations dictionary | |
self._letter_variations = { | |
"aàáâãäåāăą": "a", | |
"cçćĉċč": "c", | |
"eèéêëēĕėęě": "e", | |
"gğ": "g", | |
"hħĥ": "h", | |
"iìíîïīĭįı": "i", | |
"jĵ": "j", | |
"nñńņň": "n", | |
"oòóôõöøōŏő": "o", | |
"ś": "s", | |
"ß": "ss", | |
"uùúûüūŭůűų": "u", | |
"yýÿŷ": "y", | |
"æ": "ae", | |
"œ": "oe", | |
} | |
# Generate regex pattern including single characters | |
pattern_parts = [] | |
for variation in self._letter_variations.keys(): | |
pattern_parts.append(variation) | |
for char in variation: | |
if len(char) == 1: | |
pattern_parts.append(re.escape(char)) | |
self._pattern = "|".join(pattern_parts) | |
# RegexpTokenizer | |
self._regexp = RegexpTokenizer("[\w']+") | |
# Dictionary of acronyms | |
acronyms_url = "https://raw.githubusercontent.com/sugatagh/E-commerce-Text-Classification/main/JSON/english_acronyms.json" | |
self._acronyms_dict = pd.read_json(acronyms_url, typ="series") | |
self._acronyms_list = list(self._acronyms_dict.keys()) | |
# Dictionary of contractions | |
contractions_url = "https://raw.githubusercontent.com/sugatagh/E-commerce-Text-Classification/main/JSON/english_contractions.json" | |
self._contractions_dict = pd.read_json(contractions_url, typ="series") | |
self._contractions_list = list(self._contractions_dict.keys()) | |
# Initialize translator for language detection | |
self._translator = Translator() | |
# Converting to lowercase | |
def _convert_to_lowercase(self, text): | |
""" | |
Convert the input text to lowercase. | |
Args: | |
text (str): The input text to be converted. | |
Returns: | |
str: The input text converted to lowercase. | |
""" | |
try: | |
return text.lower() | |
except Exception as e: | |
print(f"An error occurred during lowercase conversion: {e}") | |
return text | |
# Removing whitespaces | |
def _remove_whitespace(self, text): | |
""" | |
Remove leading and trailing whitespaces from the input text. | |
Args: | |
text (str): The input text to be processed. | |
Returns: | |
str: The input text with leading and trailing whitespaces removed. | |
""" | |
try: | |
return text.strip() | |
except Exception as e: | |
print(f"An error occurred during whitespace removal: {e}") | |
return text | |
# Removing punctuations | |
def _remove_punctuation(self, text): | |
""" | |
Remove punctuation marks from the input text, except for apostrophes and percent signs. | |
Args: | |
text (str): The input text to be processed. | |
Returns: | |
str: The input text with punctuation marks removed. | |
""" | |
try: | |
punct_str = string.punctuation | |
punct_str = punct_str.replace("'", "").replace( | |
"%", "" | |
) # discarding apostrophe from the string to keep the contractions intact | |
return text.translate(str.maketrans("", "", punct_str)) | |
except Exception as e: | |
print(f"An error occurred during punctuation removal: {e}") | |
return text | |
# Removing HTML tags | |
def _remove_html(self, text): | |
""" | |
Remove HTML tags from the input text. | |
Args: | |
text (str): The input text containing HTML tags. | |
Returns: | |
str: The input text with HTML tags removed. | |
""" | |
try: | |
html = re.compile(r"<.*?>") | |
return html.sub(r"", text) | |
except Exception as e: | |
print(f"An error occurred during HTML tag removal: {e}") | |
return text | |
# Removing emojis | |
def _remove_emoji(self, text): | |
""" | |
Remove emojis from the input text. | |
Args: | |
text (str): The input text containing emojis. | |
Returns: | |
str: The input text with emojis removed. | |
""" | |
try: | |
emoji_pattern = re.compile( | |
"[" | |
"\U0001F600-\U0001F64F" # emoticons | |
"\U0001F300-\U0001F5FF" # symbols & pictographs | |
"\U0001F680-\U0001F6FF" # transport & map symbols | |
"\U0001F1E0-\U0001F1FF" # flags (iOS) | |
"\U00002702-\U000027B0" | |
"\U000024C2-\U0001F251" | |
"]+", | |
flags=re.UNICODE, | |
) | |
return emoji_pattern.sub(r"", text) | |
except Exception as e: | |
print(f"An error occurred during emoji removal: {e}") | |
return text | |
# Removing other unicode characters | |
def _remove_http(self, text): | |
""" | |
Remove HTTP links from the input text. | |
Args: | |
text (str): The input text containing HTTP links. | |
Returns: | |
str: The input text with HTTP links removed. | |
""" | |
try: | |
http = "https?://\S+|www\.\S+" # matching strings beginning with http (but not just "http") | |
pattern = r"({})".format(http) # creating pattern | |
return re.sub(pattern, "", text) | |
except Exception as e: | |
print(f"An error occurred during HTTP link removal: {e}") | |
return text | |
# Function to convert contractions in a text | |
def _convert_acronyms(self, text): | |
""" | |
Convert acronyms in the text. | |
Example of acronyms dictionary: | |
{"LOL": "laugh out loud", "BRB": "be right back", "IDK": "I don't know"} | |
Args: | |
text (str): The input text containing acronyms. | |
Returns: | |
str: The input text with acronyms expanded. | |
""" | |
try: | |
words = [] | |
for word in self._regexp.tokenize(text): | |
if word in self._acronyms_list: | |
words = words + self._acronyms_dict[word].split() | |
else: | |
words = words + word.split() | |
text_converted = " ".join(words) | |
return text_converted | |
except Exception as e: | |
print(f"An error occurred during acronym conversion: {e}") | |
return text | |
# Function to convert contractions in a text | |
def _convert_contractions(self, text): | |
""" | |
Convert contractions in the text. | |
Example of contractions dictionary: | |
{"I'm": "I am", "he's": "he is", "won't": "will not"} | |
Args: | |
text (str): The input text containing contractions. | |
Returns: | |
str: The input text with contractions expanded. | |
""" | |
try: | |
words = [] | |
for word in self._regexp.tokenize(text): | |
if word in self._contractions_list: | |
words = words + self._contractions_dict[word].split() | |
else: | |
words = words + word.split() | |
text_converted = " ".join(words) | |
return text_converted | |
except Exception as e: | |
print(f"An error occurred during contraction conversion: {e}") | |
return text | |
def _fix_letter_variations(self, query): | |
""" | |
Replace variations of letters with their original counterparts. | |
Args: | |
query (str): The input query containing variations of letters. | |
Returns: | |
str: The normalized query with variations replaced by their original counterparts. | |
""" | |
def replace_variation(match): | |
""" | |
Helper function to replace variations with original counterparts. | |
Args: | |
match (re.Match): The match object representing the found variation. | |
Returns: | |
str: The original character if match is not found in letter_variations, otherwise its original counterpart. | |
""" | |
for key in self._letter_variations.keys(): | |
if match.group(0) in key: | |
return self._letter_variations[key] | |
return match.group(0) | |
try: | |
# Fixing the query | |
normalized_query = re.sub(self._pattern, replace_variation, query) | |
return normalized_query | |
except Exception as e: | |
print(f"An error occurred during letter variation fixing: {e}") | |
return query | |
def _normalize_query(self, word: str): | |
""" | |
Clean the input text by performing the following steps: | |
1. Remove non-alphabetic characters and keep specific characters like spaces, dashes, asterisks, and Arabic characters. | |
2. Remove non-alphabetic characters between alphabetic characters. | |
3. Remove repeating characters. | |
4. Remove preceding numbers (e.g. 123phone -> phone). | |
5. Add space between numbers and letters. | |
6. Remove extra spaces. | |
Args: | |
word (str): The input text to be cleaned. | |
Returns: | |
str: The cleaned text. | |
""" | |
try: | |
# Remove non-alphabetic characters and keep specific characters like spaces, dashes, asterisks, and Arabic characters | |
word = re.sub( | |
r"[^A-Za-z\s\-%*.$\u0621-\u064A0-9\u00E4\u00F6\u00FC\u00C4\u00D6\u00DC\u00df]", | |
"", | |
word, | |
flags=re.UNICODE, | |
) | |
# Remove non-alphabetic characters between alphabetic characters | |
clean_text = re.sub( | |
r"(?<=[a-zA-Z])([^A-Za-z\u0621-\u064A\s]+)(?=[a-zA-Z])", "", word | |
) | |
# Remove non-alphabetic characters between alphabetic characters | |
clean_text = re.sub(r"(?<=[a-zA-Z])([^A-Za-z\s]+)(?=[a-zA-Z])", "", clean_text) | |
# Remove non-alphabetic characters between Arabic characters | |
clean_text = re.sub( | |
r"(?<=[\u0621-\u064A])([^\u0621-\u064A\s]+)(?=[\u0621-\u064A])", | |
"", | |
clean_text, | |
) | |
# Remove repeating characters | |
clean_text = re.sub(r"(.)(\1+)", r"\1\1", clean_text) | |
# Remove preceding non latin alpha (e.g. صصphone -> phone) | |
clean_text = re.sub(r"([\u0621-\u064A]+)([a-zA-Z]+)", r"\2", clean_text) | |
# Add space between numbers and letters | |
clean_text = re.sub(r"([a-zA-Z]+)([\u0621-\u064A]+)", r"\1", clean_text) | |
# Remove preceding latin alpha (from arabic words) (e.g. phoneصص -> phone) | |
clean_text = re.sub(r"([a-zA-Z]+)([\u0621-\u064A]+)", r"\2", clean_text) | |
# Add space between numbers and letters | |
clean_text = re.sub(r"([\u0621-\u064A]+)([a-zA-Z]+)", r"\1", clean_text) | |
# Remove preceding numbers (e.g. 123phone -> phone) | |
clean_text = re.sub(r"(\d+)([a-zA-Z\u0621-\u064A]+)", r"\1 \2", clean_text) | |
# Add space between numbers and letters | |
clean_text = re.sub(r"([a-zA-Z\u0621-\u064A]+)(\d+)", r"\1 \2", clean_text) | |
# Remove extra spaces | |
clean_text = re.sub(r"\s+", " ", clean_text) | |
return clean_text.strip() | |
except Exception as e: | |
print(f"An error occurred during query normalization: {e}") | |
return word | |
def keep_one_char(self, word: str) -> str: | |
""" | |
Keep only one occurrence of consecutive repeated characters in the input word. | |
Args: | |
- word (str): The input word to modify. | |
Returns: | |
- str: The modified word with only one occurrence of consecutive repeated characters. | |
""" | |
try: | |
return re.sub(r"(.)(\1+)", r"\1", word) | |
except Exception as e: | |
print(f"An error occurred during character repetition removal: {e}") | |
return word | |
def translate_text(self, text: str) -> str: | |
""" | |
Translate the given text to English and return the translated text. | |
Args: | |
- text (str): The text to translate. | |
Returns: | |
- str: The translated text. | |
""" | |
try: | |
loop = asyncio.get_event_loop() | |
translated_text = ( | |
loop.run_until_complete(self._translator.translate(text)) | |
.text.lower() | |
.strip() | |
) | |
except Exception as e: | |
print(f"Text Translation failed: {e}") | |
translated_text = ( | |
text.lower().strip() | |
) # Use original text if translation fails | |
return translated_text | |
def check_spelling(self, query: str) -> str: | |
""" | |
Check the spelling of the input query and return the corrected version. | |
Args: | |
- query (str): The input query to check its spelling. | |
Returns: | |
- str: The corrected query. | |
""" | |
try: | |
# Detect the language of the input query using Google Translate API | |
# input_language = self._translator.detect(query) | |
input_language = "en" if query.encode().isalpha() else "ar" | |
# Initialize SpellChecker with detected language, fallback to English if language detection fails | |
try: | |
spell_checker = SpellChecker(language=input_language) | |
except: | |
spell_checker = SpellChecker(language="en") | |
# Initialize an empty string to store the corrected query | |
result_query = "" | |
# Iterate through each word in the query | |
for word in query.split(" "): | |
# Get the corrected version of the word | |
corrected_word = spell_checker.correction(word) | |
# If the corrected word is not found, try correcting with keeping one character | |
if corrected_word is None: | |
corrected_word = spell_checker.correction(self.keep_one_char(word)) | |
# If still not found, keep the original word | |
if corrected_word is None: | |
result_query += word + " " | |
else: | |
result_query += corrected_word + " " | |
else: | |
result_query += corrected_word + " " | |
# Remove trailing whitespace and return the corrected query | |
return result_query.strip() | |
except Exception as e: | |
print(f"An error occurred during spelling check: {e}") | |
return query | |
def clean_text(self, text): | |
""" | |
Normalize the input text. | |
Args: | |
text (str): The input text to be normalized. | |
Returns: | |
str: The normalized text. | |
""" | |
try: | |
# Convert text to lowercase | |
text = self._convert_to_lowercase(text) | |
# Remove whitespace | |
text = self._remove_whitespace(text) | |
# Convert text to one line | |
text = re.sub("\n", " ", text) | |
# Remove square brackets | |
text = re.sub("\[.*?\]", "", text) | |
# Remove HTTP links | |
text = self._remove_http(text) | |
# Remove HTML tags | |
text = self._remove_html(text) | |
# Remove emojis | |
text = self._remove_emoji(text) | |
# Fix letter variations | |
text = self._fix_letter_variations(text) | |
# Normalize queries | |
text = self._normalize_query(text) | |
return text | |
except Exception as e: | |
print(f"An error occurred during text cleaning: {e}") | |
return text | |