Spaces:
Build error
Build error
| import re | |
| import string | |
| from typing import List, Optional, Union, Dict, Any, Callable | |
| import numpy as np | |
| import pandas as pd | |
| from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer | |
| from nltk.corpus import stopwords | |
| from nltk.tokenize import word_tokenize | |
| from nltk import download as nltk_download | |
| from nltk.stem import WordNetLemmatizer | |
| import spacy | |
| from gensim.models import KeyedVectors | |
| from transformers import AutoTokenizer, AutoModel | |
| import torch | |
| import emoji | |
| print('PREPROCESSING IMPORTED') | |
| try: | |
| nltk_download('punkt', quiet=True) | |
| nltk_download('stopwords', quiet=True) | |
| nltk_download('wordnet', quiet=True) | |
| except Exception as e: | |
| print(f"Warning: NLTK data download failed: {e}") | |
| _SPACY_MODEL = None | |
| _NLTK_LEMMATIZER = None | |
| _BERT_TOKENIZER = None | |
| _BERT_MODEL = None | |
| def _load_spacy_model(lang: str = "en_core_web_sm"): | |
| global _SPACY_MODEL | |
| if _SPACY_MODEL is None: | |
| try: | |
| _SPACY_MODEL = spacy.load(lang) | |
| except OSError: | |
| raise ValueError( | |
| f"spaCy model '{lang}' not found. Please install it via: python -m spacy download {lang}" | |
| ) | |
| return _SPACY_MODEL | |
| def _load_nltk_lemmatizer(): | |
| global _NLTK_LEMMATIZER | |
| if _NLTK_LEMMATIZER is None: | |
| _NLTK_LEMMATIZER = WordNetLemmatizer() | |
| return _NLTK_LEMMATIZER | |
| def _load_bert_model(model_name: str = "bert-base-uncased"): | |
| global _BERT_TOKENIZER, _BERT_MODEL | |
| if _BERT_TOKENIZER is None or _BERT_MODEL is None: | |
| _BERT_TOKENIZER = AutoTokenizer.from_pretrained(model_name) | |
| _BERT_MODEL = AutoModel.from_pretrained(model_name) | |
| return _BERT_TOKENIZER, _BERT_MODEL | |
| def clean_text(text: str) -> str: | |
| text = re.sub(r"<[^>]+>", "", text) | |
| text = re.sub(r"https?://\S+|www\.\S+", "", text) | |
| text = "".join(ch for ch in text if ch in string.printable) | |
| text = re.sub(r"\s+", " ", text).strip() | |
| return text | |
| def replace_emojis(text: str) -> str: | |
| return emoji.demojize(text, delimiters=(" ", " ")) | |
| def preprocess_text( | |
| text: str, | |
| lang: str = "en", | |
| remove_stopwords: bool = True, | |
| use_spacy: bool = True, | |
| lemmatize: bool = True, | |
| emoji_to_text: bool = True, | |
| lowercase: bool = True, | |
| spacy_model: Optional[str] = None, | |
| replace_entities: bool = False # ← новая опция: по умолчанию НЕ заменяем числа/URL | |
| ) -> List[str]: | |
| import re | |
| import string | |
| if emoji_to_text: | |
| text = replace_emojis(text) | |
| text = re.sub(r"<[^>]+>", "", text) | |
| text = re.sub(r"[^\w\s]", " ", text) # заменяем НЕ-слова и НЕ-пробелы на пробел | |
| text = re.sub(r"\s+", " ", text).strip() | |
| if replace_entities: | |
| text = re.sub(r"\b\d+\b", "<NUM>", text) | |
| text = re.sub(r"https?://\S+|www\.\S+", "<URL>", text) | |
| text = re.sub(r"\S+@\S+", "<EMAIL>", text) | |
| if lowercase: | |
| text = text.lower() | |
| if use_spacy: | |
| spacy_lang = spacy_model or ("en_core_web_sm" if lang == "en" else f"{lang}_core_news_sm") | |
| nlp = _load_spacy_model(spacy_lang) | |
| doc = nlp(text) | |
| if lemmatize: | |
| tokens = [token.lemma_ for token in doc if not token.is_space and not token.is_punct] | |
| else: | |
| tokens = [token.text for token in doc if not token.is_space and not token.is_punct] | |
| if remove_stopwords: | |
| tokens = [token for token in tokens if not nlp.vocab[token].is_stop] | |
| else: | |
| tokens = word_tokenize(text) | |
| if lemmatize: | |
| lemmatizer = _load_nltk_lemmatizer() | |
| tokens = [lemmatizer.lemmatize(token) for token in tokens] | |
| if remove_stopwords: | |
| stop_words = set(stopwords.words(lang)) if lang in stopwords.fileids() else set() | |
| tokens = [token for token in tokens if token not in stop_words] | |
| tokens = [token for token in tokens if token not in string.punctuation and len(token) > 0] | |
| return tokens | |
| class TextVectorizer: | |
| def __init__(self): | |
| self.bow_vectorizer = None | |
| self.tfidf_vectorizer = None | |
| def bow(self, texts: List[str], **kwargs) -> np.ndarray: | |
| self.bow_vectorizer = CountVectorizer(**kwargs) | |
| return self.bow_vectorizer.fit_transform(texts).toarray() | |
| def tfidf(self, texts: List[str], max_features: int = 5000, **kwargs) -> np.ndarray: | |
| kwargs['max_features'] = max_features | |
| self.tfidf_vectorizer = TfidfVectorizer(lowercase=False, **kwargs) | |
| return self.tfidf_vectorizer.fit_transform(texts).toarray() | |
| def ngrams(self, texts: List[str], ngram_range: tuple = (1, 2), **kwargs) -> np.ndarray: | |
| kwargs.setdefault("ngram_range", ngram_range) | |
| return self.tfidf(texts, **kwargs) | |
| class EmbeddingVectorizer: | |
| def __init__(self): | |
| self.word2vec_model = None | |
| self.fasttext_model = None | |
| self.glove_vectors = None | |
| def load_word2vec(self, path: str): | |
| self.word2vec_model = KeyedVectors.load_word2vec_format(path, binary=True) | |
| def load_fasttext(self, path: str): | |
| self.fasttext_model = KeyedVectors.load(path) | |
| def load_glove(self, glove_file: str, vocab_size: int = 400000, dim: int = 300): | |
| self.glove_vectors = {} | |
| with open(glove_file, "r", encoding="utf-8") as f: | |
| for i, line in enumerate(f): | |
| if i >= vocab_size: | |
| break | |
| values = line.split() | |
| word = values[0] | |
| vector = np.array(values[1:], dtype="float32") | |
| self.glove_vectors[word] = vector | |
| def _get_word_vector(self, word: str, method: str = "word2vec") -> Optional[np.ndarray]: | |
| if method == "word2vec" and self.word2vec_model and word in self.word2vec_model: | |
| return self.word2vec_model[word] | |
| elif method == "fasttext" and self.fasttext_model and word in self.fasttext_model: | |
| return self.fasttext_model[word] | |
| elif method == "glove" and self.glove_vectors and word in self.glove_vectors: | |
| return self.glove_vectors[word] | |
| return None | |
| def _aggregate_vectors( | |
| self, vectors: List[np.ndarray], strategy: str = "mean" | |
| ) -> np.ndarray: | |
| if not vectors: | |
| return np.zeros(300) # default dim | |
| if strategy == "mean": | |
| return np.mean(vectors, axis=0) | |
| elif strategy == "max": | |
| return np.max(vectors, axis=0) | |
| else: | |
| raise ValueError("Strategy must be 'mean' or 'max'") | |
| def get_embeddings( | |
| self, | |
| tokenized_texts: List[List[str]], | |
| method: str = "word2vec", | |
| aggregation: str = "mean", | |
| ) -> np.ndarray: | |
| embeddings = [] | |
| for tokens in tokenized_texts: | |
| vectors = [ | |
| self._get_word_vector(token, method=method) for token in tokens | |
| ] | |
| vectors = [v for v in vectors if v is not None] | |
| doc_vec = self._aggregate_vectors(vectors, strategy=aggregation) | |
| embeddings.append(doc_vec) | |
| return np.array(embeddings) | |
| def get_contextual_embeddings( | |
| texts: List[str], | |
| model_name: str = "bert-base-uncased", | |
| aggregation: str = "mean", | |
| device: str = "cpu", | |
| ) -> np.ndarray: | |
| tokenizer, model = _load_bert_model(model_name) | |
| model.to(device) | |
| model.eval() | |
| embeddings = [] | |
| with torch.no_grad(): | |
| for text in texts: | |
| inputs = tokenizer( | |
| text, | |
| return_tensors="pt", | |
| truncation=True, | |
| padding=True, | |
| max_length=512, | |
| ) | |
| inputs = {k: v.to(device) for k, v in inputs.items()} | |
| outputs = model(**inputs) | |
| token_embeddings = outputs.last_hidden_state[0].cpu().numpy() | |
| # Exclude [CLS] and [SEP] if needed (simple heuristic: skip first and last) | |
| if len(token_embeddings) > 2: | |
| token_embeddings = token_embeddings[1:-1] | |
| if aggregation == "mean": | |
| doc_emb = np.mean(token_embeddings, axis=0) | |
| elif aggregation == "max": | |
| doc_emb = np.max(token_embeddings, axis=0) | |
| else: | |
| raise ValueError("aggregation must be 'mean' or 'max'") | |
| embeddings.append(doc_emb) | |
| return np.array(embeddings) | |
| def extract_meta_features(texts: Union[List[str], pd.Series]) -> pd.DataFrame: | |
| if isinstance(texts, pd.Series): | |
| texts = texts.tolist() | |
| features = [] | |
| for text in texts: | |
| original_len = len(text) | |
| words = text.split() | |
| word_lengths = [len(w) for w in words] if words else [0] | |
| avg_word_len = np.mean(word_lengths) | |
| num_unique_words = len(set(words)) if words else 0 | |
| num_punct = sum(1 for c in text if c in string.punctuation) | |
| num_upper = sum(1 for c in text if c.isupper()) | |
| num_digits = sum(1 for c in text if c.isdigit()) | |
| try: | |
| flesch = np.nan | |
| except Exception: | |
| flesch = np.nan | |
| features.append({ | |
| "text_length": original_len, | |
| "avg_word_length": avg_word_len, | |
| "num_unique_words": num_unique_words, | |
| "punctuation_ratio": num_punct / original_len if original_len > 0 else 0, | |
| "uppercase_ratio": num_upper / original_len if original_len > 0 else 0, | |
| "digit_ratio": num_digits / original_len if original_len > 0 else 0, | |
| "flesch_reading_ease": flesch, | |
| }) | |
| return pd.DataFrame(features) |