Spaces:
Running
Running
| """ | |
| Экстрактор поставщиков из текста. | |
| Использует комбинацию методов: | |
| - TF-IDF для символьных n-грамм | |
| - Фонетическое сравнение | |
| - Выравнивание токенов | |
| - Расстояние Левенштейна | |
| """ | |
| from __future__ import annotations | |
| import re | |
| import unicodedata | |
| from typing import Any | |
| import iuliia | |
| from rapidfuzz import fuzz | |
| from rapidfuzz.distance import Levenshtein | |
| from sklearn.feature_extraction.text import TfidfVectorizer | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| from extractors.date_extractor import UniversalDateParser | |
| def normalize_text(text: str) -> str: | |
| """Нормализует текст: lowercase, удаление диакритики и пунктуации.""" | |
| text = unicodedata.normalize("NFKD", text.lower()) | |
| text = "".join(ch for ch in text if not unicodedata.combining(ch)) | |
| return re.sub(r"[^\w\s]", "", text).strip() | |
| def variants(text: str) -> list[str]: | |
| """Генерирует варианты текста (транслитерация).""" | |
| base = normalize_text(text) | |
| result = [base] | |
| for schema in (iuliia.WIKIPEDIA, iuliia.MOSMETRO, iuliia.ALA_LC): | |
| try: | |
| v = normalize_text(schema.translate(base)) | |
| if v and v not in result: | |
| result.append(v) | |
| except Exception: | |
| pass | |
| for v in list(result): | |
| core = " ".join(w for w in v.split() if len(w) > 1 and any(ch.isalpha() for ch in w)) | |
| core = normalize_text(core) | |
| if core and core not in result: | |
| result.insert(0, core) | |
| return result | |
| def token_alignment_score(phrase_variant: str, candidate_tokens: list[str]) -> float: | |
| """Вычисляет выравнивание токенов.""" | |
| phrase_tokens = [t for t in phrase_variant.split() if len(t) > 2] | |
| if not phrase_tokens or not candidate_tokens: | |
| return 0.0 | |
| best_scores = [] | |
| for pt in phrase_tokens: | |
| best = 0.0 | |
| for ct in candidate_tokens: | |
| sim = Levenshtein.normalized_similarity(pt, ct) | |
| if sim > best: | |
| best = sim | |
| best_scores.append(best) | |
| return sum(best_scores) / len(best_scores) | |
| def length_penalty(phrase_len: int, candidate_len: int) -> float: | |
| """Штраф за разницу в длине.""" | |
| if phrase_len == 0 or candidate_len == 0: | |
| return 0.0 | |
| ratio = min(phrase_len, candidate_len) / max(phrase_len, candidate_len) | |
| if ratio >= 0.80: | |
| return 1.0 | |
| if ratio >= 0.60: | |
| return 0.90 | |
| if ratio >= 0.40: | |
| return 0.70 | |
| return 0.50 | |
| def canonicalize_for_similarity(text: str) -> str: | |
| """Каноникализирует текст для фонетического сравнения.""" | |
| t = normalize_text(text).replace(" ", "") | |
| replacements = ( | |
| ("sch", "sh"), | |
| ("tch", "ch"), | |
| ("dzh", "j"), | |
| ("zh", "j"), | |
| ("sh", "s"), | |
| ("ch", "c"), | |
| ("kh", "h"), | |
| ("ph", "f"), | |
| ("ck", "k"), | |
| ("qu", "k"), | |
| ("q", "k"), | |
| ("w", "v"), | |
| ("x", "ks"), | |
| ("ts", "z"), | |
| ("tz", "z"), | |
| ) | |
| for src, dst in replacements: | |
| t = t.replace(src, dst) | |
| return re.sub(r"(.)\1+", r"\1", t) | |
| def phonetic_similarity(left: str, right: str) -> float: | |
| """Вычисляет фонетическую схожесть.""" | |
| l = canonicalize_for_similarity(left) | |
| r = canonicalize_for_similarity(right) | |
| if not l or not r: | |
| return 0.0 | |
| char = fuzz.ratio(l, r) / 100.0 | |
| lev = Levenshtein.normalized_similarity(l, r) | |
| return 0.50 * char + 0.50 * lev | |
| class ExpenseSupplierExtractor: | |
| """ | |
| Экстрактор поставщиков из текста. | |
| Ищет наиболее похожего поставщика из списка известных. | |
| """ | |
| def __init__(self, suppliers: list[str]) -> None: | |
| self.suppliers = suppliers | |
| self.sup_norm = [normalize_text(s) for s in suppliers] | |
| self.sup_tokens = [s.split() for s in self.sup_norm] | |
| self.sup_num_sets = [self.numeric_tokens(s) for s in self.sup_norm] | |
| self.sup_number_tokens = {num for nums in self.sup_num_sets for num in nums} | |
| self.supplier_lexicon = [ | |
| token | |
| for token in sorted({tok for tokens in self.sup_tokens for tok in tokens}) | |
| if token and not token.isdigit() | |
| ] | |
| self.tfidf = TfidfVectorizer(analyzer="char_wb", ngram_range=(3, 5)) | |
| self.sup_mat = self.tfidf.fit_transform(self.sup_norm) | |
| self.max_words = max(len(s.split()) for s in self.sup_norm) | |
| self.variant_cache: dict[str, list[str]] = {} | |
| self.lexical_token_cache: dict[str, float] = {} | |
| self.phrase_support_cache: dict[str, float] = {} | |
| self.noise_terms = { | |
| "для", "под", "над", "при", "без", "или", | |
| "купил", "купила", "купили", "покупка", "заказал", "заказала", "заказали", | |
| "оплатил", "оплатила", "оплатили", "заплатил", "заплатила", "заплатили", | |
| "был", "была", "было", "были", "утром", "днем", "днём", "вечером", "ночью", | |
| "товар", "товары", "продукт", "продукты", "десерт", "еда", | |
| "лей", "лея", "леи", "целых", "сотых", "сом", "сомов", "руб", "рублей", "грн", "usd", "eur", | |
| } | |
| self.noise_terms.update(UniversalDateParser.temporal_vocabulary()) | |
| def numeric_tokens(text: str) -> set[str]: | |
| """Извлекает числовые токены.""" | |
| return set(re.findall(r"\d+", text)) | |
| def cached_variants(self, text: str) -> list[str]: | |
| """Кэширует варианты текста.""" | |
| key = normalize_text(text) | |
| cached = self.variant_cache.get(key) | |
| if cached is None: | |
| cached = variants(key) | |
| self.variant_cache[key] = cached | |
| return cached | |
| def split_words(text: str) -> list[str]: | |
| """Разбивает текст на слова.""" | |
| return [w for w in normalize_text(text).split() if w] | |
| def is_supplier_extension(cls, base_supplier: str, extended_supplier: str) -> bool: | |
| """Проверяет, является ли один поставщик расширением другого.""" | |
| base_tokens = cls.split_words(base_supplier) | |
| extended_tokens = cls.split_words(extended_supplier) | |
| return len(base_tokens) < len(extended_tokens) and extended_tokens[:len(base_tokens)] == base_tokens | |
| def phrase_token_count(cls, phrase: str | None) -> int: | |
| """Считает количество токенов во фразе.""" | |
| return len(cls.split_words(phrase or "")) | |
| def resolve_overlapping_suppliers(cls, ranking: list[dict[str, Any]]) -> dict[str, Any]: | |
| """Разрешает конфликты между похожими поставщиками.""" | |
| if not ranking: | |
| return {"supplier": None, "score": -1.0, "phrase": None} | |
| best = ranking[0] | |
| best_combined = float(best.get("combined", best.get("score", -1.0))) | |
| best_phrase_len = cls.phrase_token_count(best.get("phrase")) | |
| for alt in ranking[1:]: | |
| if not cls.is_supplier_extension(str(best.get("supplier") or ""), str(alt.get("supplier") or "")): | |
| continue | |
| alt_combined = float(alt.get("combined", alt.get("score", -1.0))) | |
| alt_phrase_len = cls.phrase_token_count(alt.get("phrase")) | |
| if alt_phrase_len > best_phrase_len and alt_combined >= best_combined - 0.15: | |
| best = alt | |
| best_combined = alt_combined | |
| best_phrase_len = alt_phrase_len | |
| return best | |
| def numeric_compatibility_multiplier(phrase_nums: set[str], candidate_nums: set[str]) -> float: | |
| """Множитель совместимости числовых токенов.""" | |
| if not phrase_nums and not candidate_nums: | |
| return 1.0 | |
| if phrase_nums == candidate_nums: | |
| return 1.08 | |
| if phrase_nums and candidate_nums: | |
| return 1.03 if phrase_nums & candidate_nums else 0.80 | |
| return 0.82 | |
| def lexical_support(self, phrase: str) -> float: | |
| """Вычисляет лексическую поддержку фразы.""" | |
| tokens = [token for token in normalize_text(phrase).split() if token and not token.isdigit()] | |
| if not tokens or not self.supplier_lexicon: | |
| return 0.0 | |
| support_scores: list[float] = [] | |
| for token in tokens: | |
| cached = self.lexical_token_cache.get(token) | |
| if cached is not None: | |
| support_scores.append(cached) | |
| continue | |
| best = 0.0 | |
| for token_variant in self.cached_variants(token): | |
| for lex in self.supplier_lexicon: | |
| lev = Levenshtein.normalized_similarity(token_variant, lex) | |
| phon = phonetic_similarity(token_variant, lex) | |
| sim = max(lev, phon) | |
| if sim > best: | |
| best = sim | |
| self.lexical_token_cache[token] = best | |
| support_scores.append(best) | |
| return sum(support_scores) / len(support_scores) | |
| def score_phrase(self, phrase: str) -> dict[str, Any]: | |
| """Оценивает фразу на соответствие поставщикам.""" | |
| vs = self.cached_variants(phrase) | |
| q = self.tfidf.transform(vs) | |
| tf = cosine_similarity(q, self.sup_mat) | |
| best: dict[str, Any] = {"supplier": None, "score": -1.0, "phrase": phrase, "variant": ""} | |
| for i, cand in enumerate(self.sup_norm): | |
| local = -1.0 | |
| local_variant = "" | |
| candidate_nums = self.sup_num_sets[i] | |
| for j, v in enumerate(vs): | |
| char = fuzz.ratio(v, cand) / 100.0 | |
| tf_val = float(tf[j, i]) | |
| penalty = length_penalty(len(v), len(cand)) | |
| phon = phonetic_similarity(v, cand) | |
| phrase_nums = self.numeric_tokens(v) | |
| if len(v.split()) == 1 and len(cand.split()) == 1: | |
| lev = Levenshtein.normalized_similarity(v, cand) | |
| val = (0.45 * lev + 0.25 * char + 0.10 * tf_val + 0.20 * phon) * penalty | |
| else: | |
| align = token_alignment_score(v, self.sup_tokens[i]) | |
| tok = fuzz.token_set_ratio(v, cand) / 100.0 | |
| val = (0.30 * char + 0.20 * tok + 0.10 * tf_val + 0.20 * align + 0.20 * phon) * penalty | |
| compact_v = v.replace(" ", "") | |
| compact_cand = cand.replace(" ", "") | |
| compact_char = fuzz.ratio(compact_v, compact_cand) / 100.0 | |
| compact_lev = Levenshtein.normalized_similarity(compact_v, compact_cand) | |
| compact_phon = phonetic_similarity(compact_v, compact_cand) | |
| compact = max(compact_char, compact_lev, compact_phon) | |
| if compact > 0.55: | |
| val = max(val, compact * penalty) | |
| val *= self.numeric_compatibility_multiplier(phrase_nums, candidate_nums) | |
| if val > local: | |
| local = val | |
| local_variant = v | |
| if local > best["score"]: | |
| best = {"supplier": self.suppliers[i], "score": local, "phrase": phrase, "variant": local_variant} | |
| return best | |
| def extract( | |
| self, | |
| text: str, | |
| date_phrase: str | None = None, | |
| excluded_phrases: list[str] | None = None, | |
| debug: bool = False, | |
| score_threshold: float = 0.50, | |
| combined_threshold: float = 0.48, | |
| ) -> dict[str, Any]: | |
| """ | |
| Извлекает поставщика из текста. | |
| Args: | |
| text: Текст для анализа | |
| date_phrase: Фраза даты для исключения | |
| excluded_phrases: Дополнительные фразы для исключения | |
| debug: Включить отладочную информацию | |
| score_threshold: Минимальный raw-score для принятия совпадения | |
| combined_threshold: Минимальный combined-score для принятия совпадения | |
| Returns: | |
| Словарь с supplier, supplier_score, matched_supplier_phrase | |
| """ | |
| excluded_tokens: set[str] = set() | |
| if date_phrase: | |
| excluded_tokens.update(normalize_text(date_phrase).split()) | |
| if excluded_phrases: | |
| for phrase in excluded_phrases: | |
| if phrase: | |
| excluded_tokens.update(normalize_text(phrase).split()) | |
| excluded_tokens.update(self.noise_terms) | |
| raw_tokens = normalize_text(text).split() | |
| tokens: list[str] = [] | |
| for token in raw_tokens: | |
| if token in excluded_tokens: | |
| continue | |
| if token.isdigit(): | |
| if token in self.sup_number_tokens: | |
| tokens.append(token) | |
| continue | |
| if len(token) > 1: | |
| tokens.append(token) | |
| tokens = [t for t in tokens if (len(t) > 1 or t.isdigit()) and t not in excluded_tokens] | |
| phrases: list[str] = [] | |
| seen: set[str] = set() | |
| for i in range(len(tokens)): | |
| for j in range(i + 1, min(i + 1 + self.max_words, len(tokens) + 1)): | |
| p = " ".join(tokens[i:j]) | |
| if p not in seen: | |
| seen.add(p) | |
| phrases.append(p) | |
| results = [self.score_phrase(p) for p in phrases] | |
| candidate_rows: list[dict[str, Any]] = [] | |
| best_by_supplier: dict[str, dict[str, Any]] = {} | |
| for row in results: | |
| supplier = row["supplier"] | |
| score = float(row.get("score", -1.0)) | |
| phrase = str(row.get("phrase") or "") | |
| support = self.phrase_support_cache.get(phrase) | |
| if support is None: | |
| support = self.lexical_support(phrase) | |
| self.phrase_support_cache[phrase] = support | |
| combined = 0.75 * score + 0.25 * support | |
| if debug: | |
| candidate_rows.append({ | |
| "supplier": supplier, | |
| "phrase": phrase, | |
| "score": round(score, 4), | |
| "support": round(support, 4), | |
| "combined": round(combined, 4), | |
| }) | |
| enriched = {**row, "combined": combined} | |
| passes = score >= score_threshold or combined >= combined_threshold | |
| if passes and (supplier not in best_by_supplier or combined > float(best_by_supplier[supplier].get("combined", -1.0))): | |
| best_by_supplier[supplier] = enriched | |
| if not best_by_supplier and results: | |
| def support_for_phrase(phrase: str) -> float: | |
| cached_support = self.phrase_support_cache.get(phrase) | |
| if cached_support is None: | |
| cached_support = self.lexical_support(phrase) | |
| self.phrase_support_cache[phrase] = cached_support | |
| return cached_support | |
| fallback = max( | |
| results, | |
| key=lambda item: 0.75 * float(item.get("score", -1.0)) + 0.25 * support_for_phrase(str(item.get("phrase") or "")), | |
| ) | |
| fallback_score = float(fallback.get("score", -1.0)) | |
| fallback_phrase = str(fallback.get("phrase") or "") | |
| fallback_support = support_for_phrase(fallback_phrase) | |
| fallback_combined = 0.75 * fallback_score + 0.25 * fallback_support | |
| if fallback_score >= 0.40 and fallback_support >= 0.43 and fallback_combined >= 0.43: | |
| best_by_supplier[fallback["supplier"]] = {**fallback, "combined": fallback_combined} | |
| supplier_ranking = sorted(best_by_supplier.values(), key=lambda x: float(x.get("combined", x["score"])), reverse=True) | |
| best = self.resolve_overlapping_suppliers(supplier_ranking) | |
| payload = { | |
| "supplier": best["supplier"], | |
| "supplier_score": round(best["score"], 4) if best["score"] >= 0 else None, | |
| "matched_supplier_phrase": best.get("phrase"), | |
| } | |
| if debug: | |
| top_candidates = sorted(candidate_rows, key=lambda item: item["combined"], reverse=True)[:8] | |
| payload["supplier_debug"] = { | |
| "tokens": tokens, | |
| "phrases_count": len(phrases), | |
| "excluded_tokens": sorted(excluded_tokens)[:80], | |
| "score_threshold": score_threshold, | |
| "combined_threshold": combined_threshold, | |
| "top_candidates": top_candidates, | |
| } | |
| return payload | |