Spaces:
Runtime error
Runtime error
| import asyncio | |
| import string, re | |
| import pandas as pd | |
| from aiogoogletrans import Translator | |
| from spellchecker import SpellChecker | |
| from nltk.tokenize import RegexpTokenizer | |
| from langdetect import detect | |
| from fake_useragent import UserAgent | |
| import json | |
| import requests | |
| from fastapi import HTTPException | |
| class Normalizer: | |
| """ | |
| A class for text normalization tasks such as converting to lowercase, | |
| removing whitespace, punctuation, HTML tags, emojis, etc. | |
| """ | |
| def __init__(self): | |
| """ | |
| Initializes the Normalizer object. | |
| """ | |
| # Letter variations dictionary | |
| self._letter_variations = { | |
| "aàáâãäåāăą": "a", | |
| "cçćĉċč": "c", | |
| "eèéêëēĕėęě": "e", | |
| "gğ": "g", | |
| "hħĥ": "h", | |
| "iìíîïīĭįı": "i", | |
| "jĵ": "j", | |
| "nñńņň": "n", | |
| "oòóôõöøōŏő": "o", | |
| "ś": "s", | |
| "ß": "ss", | |
| "uùúûüūŭůűų": "u", | |
| "yýÿŷ": "y", | |
| "æ": "ae", | |
| "œ": "oe", | |
| } | |
| # Generate regex pattern including single characters | |
| pattern_parts = [] | |
| for variation in self._letter_variations.keys(): | |
| pattern_parts.append(variation) | |
| for char in variation: | |
| if len(char) == 1: | |
| pattern_parts.append(re.escape(char)) | |
| self._pattern = "|".join(pattern_parts) | |
| # RegexpTokenizer | |
| self._regexp = RegexpTokenizer("[\w']+") | |
| # Dictionary of acronyms | |
| acronyms_url = "https://raw.githubusercontent.com/sugatagh/E-commerce-Text-Classification/main/JSON/english_acronyms.json" | |
| self._acronyms_dict = pd.read_json(acronyms_url, typ="series") | |
| self._acronyms_list = list(self._acronyms_dict.keys()) | |
| # Dictionary of contractions | |
| contractions_url = "https://raw.githubusercontent.com/sugatagh/E-commerce-Text-Classification/main/JSON/english_contractions.json" | |
| self._contractions_dict = pd.read_json(contractions_url, typ="series") | |
| self._contractions_list = list(self._contractions_dict.keys()) | |
| # Initialize translator for language detection | |
| self._translator = Translator() | |
| # Converting to lowercase | |
| def _convert_to_lowercase(self, text): | |
| """ | |
| Convert the input text to lowercase. | |
| Args: | |
| text (str): The input text to be converted. | |
| Returns: | |
| str: The input text converted to lowercase. | |
| """ | |
| try: | |
| return text.lower() | |
| except Exception as e: | |
| print(f"An error occurred during lowercase conversion: {e}") | |
| return text | |
| # Removing whitespaces | |
| def _remove_whitespace(self, text): | |
| """ | |
| Remove leading and trailing whitespaces from the input text. | |
| Args: | |
| text (str): The input text to be processed. | |
| Returns: | |
| str: The input text with leading and trailing whitespaces removed. | |
| """ | |
| try: | |
| return text.strip() | |
| except Exception as e: | |
| print(f"An error occurred during whitespace removal: {e}") | |
| return text | |
| # Removing punctuations | |
| def _remove_punctuation(self, text): | |
| """ | |
| Remove punctuation marks from the input text, except for apostrophes and percent signs. | |
| Args: | |
| text (str): The input text to be processed. | |
| Returns: | |
| str: The input text with punctuation marks removed. | |
| """ | |
| try: | |
| punct_str = string.punctuation | |
| punct_str = punct_str.replace("'", "").replace( | |
| "%", "" | |
| ) # discarding apostrophe from the string to keep the contractions intact | |
| return text.translate(str.maketrans("", "", punct_str)) | |
| except Exception as e: | |
| print(f"An error occurred during punctuation removal: {e}") | |
| return text | |
| # Removing HTML tags | |
| def _remove_html(self, text): | |
| """ | |
| Remove HTML tags from the input text. | |
| Args: | |
| text (str): The input text containing HTML tags. | |
| Returns: | |
| str: The input text with HTML tags removed. | |
| """ | |
| try: | |
| html = re.compile(r"<.*?>") | |
| return html.sub(r"", text) | |
| except Exception as e: | |
| print(f"An error occurred during HTML tag removal: {e}") | |
| return text | |
| # Removing emojis | |
| def _remove_emoji(self, text): | |
| """ | |
| Remove emojis from the input text. | |
| Args: | |
| text (str): The input text containing emojis. | |
| Returns: | |
| str: The input text with emojis removed. | |
| """ | |
| try: | |
| emoji_pattern = re.compile( | |
| "[" | |
| "\U0001F600-\U0001F64F" # emoticons | |
| "\U0001F300-\U0001F5FF" # symbols & pictographs | |
| "\U0001F680-\U0001F6FF" # transport & map symbols | |
| "\U0001F1E0-\U0001F1FF" # flags (iOS) | |
| "\U00002702-\U000027B0" | |
| "\U000024C2-\U0001F251" | |
| "]+", | |
| flags=re.UNICODE, | |
| ) | |
| return emoji_pattern.sub(r"", text) | |
| except Exception as e: | |
| print(f"An error occurred during emoji removal: {e}") | |
| return text | |
| # Removing other unicode characters | |
| def _remove_http(self, text): | |
| """ | |
| Remove HTTP links from the input text. | |
| Args: | |
| text (str): The input text containing HTTP links. | |
| Returns: | |
| str: The input text with HTTP links removed. | |
| """ | |
| try: | |
| http = "https?://\S+|www\.\S+" # matching strings beginning with http (but not just "http") | |
| pattern = r"({})".format(http) # creating pattern | |
| return re.sub(pattern, "", text) | |
| except Exception as e: | |
| print(f"An error occurred during HTTP link removal: {e}") | |
| return text | |
| # Function to convert contractions in a text | |
| def _convert_acronyms(self, text): | |
| """ | |
| Convert acronyms in the text. | |
| Example of acronyms dictionary: | |
| {"LOL": "laugh out loud", "BRB": "be right back", "IDK": "I don't know"} | |
| Args: | |
| text (str): The input text containing acronyms. | |
| Returns: | |
| str: The input text with acronyms expanded. | |
| """ | |
| try: | |
| words = [] | |
| for word in self._regexp.tokenize(text): | |
| if word in self._acronyms_list: | |
| words = words + self._acronyms_dict[word].split() | |
| else: | |
| words = words + word.split() | |
| text_converted = " ".join(words) | |
| return text_converted | |
| except Exception as e: | |
| print(f"An error occurred during acronym conversion: {e}") | |
| return text | |
| # Function to convert contractions in a text | |
| def _convert_contractions(self, text): | |
| """ | |
| Convert contractions in the text. | |
| Example of contractions dictionary: | |
| {"I'm": "I am", "he's": "he is", "won't": "will not"} | |
| Args: | |
| text (str): The input text containing contractions. | |
| Returns: | |
| str: The input text with contractions expanded. | |
| """ | |
| try: | |
| words = [] | |
| for word in self._regexp.tokenize(text): | |
| if word in self._contractions_list: | |
| words = words + self._contractions_dict[word].split() | |
| else: | |
| words = words + word.split() | |
| text_converted = " ".join(words) | |
| return text_converted | |
| except Exception as e: | |
| print(f"An error occurred during contraction conversion: {e}") | |
| return text | |
| def _fix_letter_variations(self, query): | |
| """ | |
| Replace variations of letters with their original counterparts. | |
| Args: | |
| query (str): The input query containing variations of letters. | |
| Returns: | |
| str: The normalized query with variations replaced by their original counterparts. | |
| """ | |
| def replace_variation(match): | |
| """ | |
| Helper function to replace variations with original counterparts. | |
| Args: | |
| match (re.Match): The match object representing the found variation. | |
| Returns: | |
| str: The original character if match is not found in letter_variations, otherwise its original counterpart. | |
| """ | |
| for key in self._letter_variations.keys(): | |
| if match.group(0) in key: | |
| return self._letter_variations[key] | |
| return match.group(0) | |
| try: | |
| # Fixing the query | |
| normalized_query = re.sub(self._pattern, replace_variation, query) | |
| return normalized_query | |
| except Exception as e: | |
| print(f"An error occurred during letter variation fixing: {e}") | |
| return query | |
| def _normalize_query(self, word: str): | |
| """ | |
| Clean the input text by performing the following steps: | |
| 1. Remove non-alphabetic characters and keep specific characters like spaces, dashes, asterisks, and Arabic characters. | |
| 2. Remove non-alphabetic characters between alphabetic characters. | |
| 3. Remove repeating characters. | |
| 4. Remove preceding numbers (e.g. 123phone -> phone). | |
| 5. Add space between numbers and letters. | |
| 6. Remove extra spaces. | |
| Args: | |
| word (str): The input text to be cleaned. | |
| Returns: | |
| str: The cleaned text. | |
| """ | |
| try: | |
| # Remove non-alphabetic characters and keep specific characters like spaces, dashes, asterisks, and Arabic characters | |
| word = re.sub( | |
| r"[^A-Za-z\s\-%*.$\u0621-\u064A0-9\u00E4\u00F6\u00FC\u00C4\u00D6\u00DC\u00df]", | |
| "", | |
| word, | |
| flags=re.UNICODE, | |
| ) | |
| # Remove non-alphabetic characters between alphabetic characters | |
| clean_text = re.sub( | |
| r"(?<=[a-zA-Z])([^A-Za-z\u0621-\u064A\s]+)(?=[a-zA-Z])", "", word | |
| ) | |
| # Remove non-alphabetic characters between alphabetic characters | |
| clean_text = re.sub(r"(?<=[a-zA-Z])([^A-Za-z\s]+)(?=[a-zA-Z])", "", clean_text) | |
| # Remove non-alphabetic characters between Arabic characters | |
| clean_text = re.sub( | |
| r"(?<=[\u0621-\u064A])([^\u0621-\u064A\s]+)(?=[\u0621-\u064A])", | |
| "", | |
| clean_text, | |
| ) | |
| # Remove repeating characters | |
| clean_text = re.sub(r"(.)(\1+)", r"\1\1", clean_text) | |
| # Remove preceding non latin alpha (e.g. صصphone -> phone) | |
| clean_text = re.sub(r"([\u0621-\u064A]+)([a-zA-Z]+)", r"\2", clean_text) | |
| # Add space between numbers and letters | |
| clean_text = re.sub(r"([a-zA-Z]+)([\u0621-\u064A]+)", r"\1", clean_text) | |
| # Remove preceding latin alpha (from arabic words) (e.g. phoneصص -> phone) | |
| clean_text = re.sub(r"([a-zA-Z]+)([\u0621-\u064A]+)", r"\2", clean_text) | |
| # Add space between numbers and letters | |
| clean_text = re.sub(r"([\u0621-\u064A]+)([a-zA-Z]+)", r"\1", clean_text) | |
| # Remove preceding numbers (e.g. 123phone -> phone) | |
| clean_text = re.sub(r"(\d+)([a-zA-Z\u0621-\u064A]+)", r"\1 \2", clean_text) | |
| # Add space between numbers and letters | |
| clean_text = re.sub(r"([a-zA-Z\u0621-\u064A]+)(\d+)", r"\1 \2", clean_text) | |
| # Remove extra spaces | |
| clean_text = re.sub(r"\s+", " ", clean_text) | |
| return clean_text.strip() | |
| except Exception as e: | |
| print(f"An error occurred during query normalization: {e}") | |
| return word | |
| def keep_one_char(self, word: str) -> str: | |
| """ | |
| Keep only one occurrence of consecutive repeated characters in the input word. | |
| Args: | |
| - word (str): The input word to modify. | |
| Returns: | |
| - str: The modified word with only one occurrence of consecutive repeated characters. | |
| """ | |
| try: | |
| return re.sub(r"(.)(\1+)", r"\1", word) | |
| except Exception as e: | |
| print(f"An error occurred during character repetition removal: {e}") | |
| return word | |
| async def translate_text(self, text: str) -> str: | |
| """ | |
| Translate the given text to English and return the translated text. | |
| Args: | |
| - text (str): The text to translate. | |
| Returns: | |
| - str: The translated text. | |
| """ | |
| try: | |
| translated_text = await self._translator.translate(text, dest="en") | |
| translated_text = translated_text.text.lower().strip() | |
| except Exception as e: | |
| print(f"Text Translation failed: {e}") | |
| translated_text = text.lower().strip() # Use original text if translation fails | |
| return translated_text | |
| def check_spelling(self, query: str) -> str: | |
| """ | |
| Check the spelling of the input query and return the corrected version. | |
| Args: | |
| - query (str): The input query to check its spelling. | |
| Returns: | |
| - str: The corrected query. | |
| """ | |
| try: | |
| # Detect the language of the input query | |
| input_language = detect(query) | |
| # Initialize SpellChecker with detected language, fallback to English if language detection fails | |
| try: | |
| spell_checker = SpellChecker(language=input_language) | |
| except: | |
| spell_checker = SpellChecker(language="en") | |
| # Initialize an empty string to store the corrected query | |
| result_query = "" | |
| # Iterate through each word in the query | |
| for word in query.split(" "): | |
| # Get the corrected version of the word | |
| corrected_word = spell_checker.correction(word) | |
| # If the corrected word is not found, try correcting with keeping one character | |
| if corrected_word is None: | |
| corrected_word = spell_checker.correction(self.keep_one_char(word)) | |
| # If still not found, keep the original word | |
| if corrected_word is None: | |
| result_query += word + " " | |
| else: | |
| result_query += corrected_word + " " | |
| else: | |
| result_query += corrected_word + " " | |
| # Remove trailing whitespace and return the corrected query | |
| return result_query.strip() | |
| except Exception as e: | |
| print(f"An error occurred during spelling check: {e}") | |
| return query | |
| def query_suggestions(self, query: str) -> str: | |
| """ | |
| Get suggestions for a given query string using Google's Suggest API. | |
| Parameters: | |
| query (str): The query string for which suggestions are to be retrieved. | |
| Returns: | |
| str: The suggested query string or the original query if no suggestions are available. | |
| Raises: | |
| HTTPException: If an HTTP error occurs during the request to Google Suggest API. | |
| """ | |
| try: | |
| # Detect language of the query | |
| lang = detect(query) | |
| lang = 'en' if lang != 'ar' else 'ar' | |
| # Prepare the query for the URL | |
| modified_query = query.replace(" ", "+") | |
| # Construct the URL with the query and language | |
| url = f"http://suggestqueries.google.com/complete/search?output=firefox&gl={lang}&hl=sa&q={modified_query}" | |
| # logger.info(f"Google Suggest API URL: {url}") | |
| # Generate a random user-agent | |
| ua = UserAgent() | |
| headers = {"user-agent": ua.chrome} | |
| # Make the request to the Google Suggest API | |
| response = requests.get(url, headers=headers, verify=True) | |
| if response.status_code != 200: | |
| raise HTTPException( | |
| status_code=response.status_code, | |
| detail=f"An error occurred during the request to Google Suggest API: {response.text}" | |
| ) | |
| # Parse the response JSON | |
| suggestions = json.loads(response.text) | |
| # logger.info(f"Google Suggest API Response: {suggestions[1]}") | |
| # If suggestions are available, return the first one | |
| if suggestions[1]: | |
| return suggestions[1][0] | |
| # If no suggestions are returned, return the original query | |
| return query | |
| except Exception as e: | |
| # If any other exception occurs, raise an HTTPException | |
| raise HTTPException( | |
| status_code=500, | |
| detail=f"An error occurred during the request to Google Suggest API: {e}" | |
| ) | |
| def clean_text(self, text): | |
| """ | |
| Normalize the input text. | |
| Args: | |
| text (str): The input text to be normalized. | |
| Returns: | |
| str: The normalized text. | |
| """ | |
| try: | |
| # Convert text to lowercase | |
| text = self._convert_to_lowercase(text) | |
| # Remove whitespace | |
| text = self._remove_whitespace(text) | |
| # Convert text to one line | |
| text = re.sub("\n", " ", text) | |
| # Remove square brackets | |
| text = re.sub("\[.*?\]", "", text) | |
| # Remove HTTP links | |
| text = self._remove_http(text) | |
| # Remove HTML tags | |
| text = self._remove_html(text) | |
| # Remove emojis | |
| text = self._remove_emoji(text) | |
| # Fix letter variations | |
| text = self._fix_letter_variations(text) | |
| # Normalize queries | |
| text = self._normalize_query(text) | |
| return text | |
| except Exception as e: | |
| print(f"An error occurred during text cleaning: {e}") | |
| return text | |