Spaces:
Running
Running
| """Экстрактор пользователей на той же логике, что и поиск поставщика.""" | |
| from __future__ import annotations | |
| import re | |
| import importlib | |
| from typing import Any | |
| from extractors.supplier_extractor import ExpenseSupplierExtractor, normalize_text | |
| class ExpenseUserExtractor: | |
| """Ищет пользователя тем же fuzzy-matcher, что и поставщика.""" | |
| MIN_LEXICAL_SUPPORT = 0.40 | |
| MIN_LEXICAL_WITH_PERSON = 0.30 | |
| def __init__( | |
| self, | |
| users: list[str], | |
| suppliers: list[str], | |
| threshold: float = 0.25, | |
| ) -> None: | |
| self.threshold = threshold | |
| self.supplier_terms = {normalize_text(supplier) for supplier in suppliers} | |
| self.user_matcher = ExpenseSupplierExtractor(suppliers=users) | |
| self.morph: Any = None | |
| try: | |
| pymorphy3_module = importlib.import_module("pymorphy3") | |
| self.morph = pymorphy3_module.MorphAnalyzer() | |
| except Exception: | |
| self.morph = None | |
| def _looks_like_person_token(self, token: str) -> tuple[bool, float, bool]: | |
| lexical = self.user_matcher.lexical_support(token) | |
| has_person_grammeme = False | |
| if self.morph is not None: | |
| parses = self.morph.parse(token) | |
| if parses: | |
| has_person_grammeme = bool( | |
| {"Name", "Surn", "Patr"}.intersection(set(parses[0].tag.grammemes)) | |
| ) | |
| # Сохраняем низкий порог для имён, но не пропускаем нарицательные слова. | |
| accepted = lexical >= self.MIN_LEXICAL_SUPPORT or ( | |
| has_person_grammeme and lexical >= self.MIN_LEXICAL_WITH_PERSON | |
| ) | |
| return accepted, lexical, has_person_grammeme | |
| def _build_user_candidate_text( | |
| self, | |
| normalized_text: str, | |
| supplier_phrase: str | None, | |
| date_phrase: str | None, | |
| include_debug: bool = False, | |
| ) -> tuple[str, list[str], list[dict[str, Any]] | None]: | |
| excluded_tokens: set[str] = set(self.user_matcher.noise_terms) | |
| if supplier_phrase: | |
| excluded_tokens.update(normalize_text(supplier_phrase).split()) | |
| if date_phrase: | |
| excluded_tokens.update(normalize_text(date_phrase).split()) | |
| excluded_tokens.update(self.supplier_terms) | |
| candidate_tokens: list[str] = [] | |
| candidate_debug: list[dict[str, Any]] | None = [] if include_debug else None | |
| for token in normalized_text.split(): | |
| if token in excluded_tokens or token.isdigit() or len(token) <= 1: | |
| continue | |
| accepted, lexical, has_person_grammeme = self._looks_like_person_token(token) | |
| if candidate_debug is not None: | |
| candidate_debug.append({ | |
| "token": token, | |
| "lexical_support": round(lexical, 4), | |
| "has_person_grammeme": has_person_grammeme, | |
| "accepted": accepted, | |
| }) | |
| if accepted: | |
| candidate_tokens.append(token) | |
| return " ".join(candidate_tokens), candidate_tokens, candidate_debug | |
| def _match_user_from_candidates( | |
| self, | |
| candidate_tokens: list[str], | |
| include_debug: bool = False, | |
| ) -> tuple[dict[str, Any], dict[str, Any] | None]: | |
| phrases: list[str] = [] | |
| seen: set[str] = set() | |
| max_words = self.user_matcher.max_words | |
| for i in range(len(candidate_tokens)): | |
| for j in range(i + 1, min(i + 1 + max_words, len(candidate_tokens) + 1)): | |
| phrase = " ".join(candidate_tokens[i:j]) | |
| if phrase not in seen: | |
| seen.add(phrase) | |
| phrases.append(phrase) | |
| best_row: dict[str, Any] | None = None | |
| debug_rows: list[dict[str, Any]] = [] | |
| for phrase in phrases: | |
| row = self.user_matcher.score_phrase(phrase) | |
| score = float(row.get("score", -1.0)) | |
| support = self.user_matcher.lexical_support(phrase) | |
| combined = 0.75 * score + 0.25 * support | |
| if include_debug: | |
| debug_rows.append({ | |
| "phrase": phrase, | |
| "supplier": row.get("supplier"), | |
| "score": round(score, 4), | |
| "support": round(support, 4), | |
| "combined": round(combined, 4), | |
| }) | |
| if score >= self.threshold or combined >= self.threshold: | |
| enriched = { | |
| "user": row.get("supplier"), | |
| "user_score": round(score, 4) if score >= 0 else None, | |
| "matched_user_phrase": phrase, | |
| "combined": combined, | |
| } | |
| if best_row is None or combined > float(best_row.get("combined", -1.0)): | |
| best_row = enriched | |
| if best_row is None: | |
| match_payload = { | |
| "user": None, | |
| "user_score": None, | |
| "matched_user_phrase": None, | |
| } | |
| else: | |
| match_payload = { | |
| "user": best_row.get("user"), | |
| "user_score": best_row.get("user_score"), | |
| "matched_user_phrase": best_row.get("matched_user_phrase"), | |
| } | |
| match_debug = None | |
| if include_debug: | |
| match_debug = { | |
| "phrases_count": len(phrases), | |
| "score_threshold": self.threshold, | |
| "combined_threshold": self.threshold, | |
| "top_candidates": sorted(debug_rows, key=lambda item: item["combined"], reverse=True)[:8], | |
| } | |
| return match_payload, match_debug | |
| def extract( | |
| self, | |
| text: str, | |
| supplier_phrase: str | None = None, | |
| date_phrase: str | None = None, | |
| debug: bool = False, | |
| ) -> dict[str, Any]: | |
| normalized_text = normalize_text(text) | |
| if re.search(r"(?<!\S)я(?!\S)", normalized_text, re.IGNORECASE): | |
| payload = { | |
| "user": "Я", | |
| "user_score": 1.0, | |
| "matched_user_phrase": "я", | |
| } | |
| if debug: | |
| payload["user_debug"] = { | |
| "mode": "direct-pronoun", | |
| "normalized_text": normalized_text, | |
| } | |
| return payload | |
| candidate_text, candidate_tokens, candidate_debug = self._build_user_candidate_text( | |
| normalized_text=normalized_text, | |
| supplier_phrase=supplier_phrase, | |
| date_phrase=date_phrase, | |
| include_debug=debug, | |
| ) | |
| if not candidate_text: | |
| payload = { | |
| "user": None, | |
| "user_score": None, | |
| "matched_user_phrase": None, | |
| } | |
| match_debug = None | |
| else: | |
| payload, match_debug = self._match_user_from_candidates(candidate_tokens, include_debug=debug) | |
| if debug: | |
| payload["user_debug"] = { | |
| "mode": "user-matcher", | |
| "threshold": self.threshold, | |
| "rules": { | |
| "min_lexical_support": self.MIN_LEXICAL_SUPPORT, | |
| "min_lexical_with_person_grammeme": self.MIN_LEXICAL_WITH_PERSON, | |
| "morph_enabled": self.morph is not None, | |
| }, | |
| "excluded_supplier_phrase": supplier_phrase, | |
| "normalized_text": normalized_text, | |
| "candidate_text": candidate_text, | |
| "candidate_tokens": candidate_tokens, | |
| "candidate_token_debug": candidate_debug or [], | |
| "matcher_debug": match_debug, | |
| } | |
| return payload | |