def flatten(items, seqtypes=(list, tuple)): try: for i, x in enumerate(items): while isinstance(x, seqtypes): items[i:i+1] = x x = items[i] except IndexError: pass return items aliases = [ #('canonical name', ['aliases', ...]) ('почта россия', ['почта', 'почта рф', 'пр', 'gh']), ('почта россия трекинг', ['пр трекинг', 'почта трекинг', 'пр трэкинг', 'почта трэкинг']), ('реестр почта', ['реестр пр', 'реестр почта россии']), ('реестр пэк', []), ('реестры наложенных платежей', ['документы наложенных платежей']), ('реквизиты', []), ('пешкарики', []), ('импорт лидов директ', []), ('яндекс доставка экспресс', ['яндекс доставка express', 'яд экспресс', 'ядоставка экспресс']), ('яндекс доставка ndd', ['ндд', 'ndd', 'яд ндд', 'я доставка ндд', 'ядоставка ндд']), ('яндекс доставка', ['яд', 'я доставка', 'ядоставка']), ('яндекс метрика', ['яндекс метрика импорт']), ('альфабанк', ['альфа банк', 'alfabank', 'альфа']), ('импорт лидов facebook', ['импорт лидов fb', 'загрузка лидов fb', 'лиды фейсбук', 'импорт лидов фб', 'fb lead']), ('импорт лидов вк', ['импорт лидов вконтакте', 'загрузка лидов вк', 'лиды вконтакте', 'импорт лидов vk', 'vk lead']), ('маркетинговые расходы', ['расходы', 'загрузка расходов']), ('cloudpayments', ['клауд', 'клаудпеймент', 'клаудпейментс']), ('robokassa', ['робокасса', 'робокаса']), ('sipuni', ['сипуни', 'сипьюни']), ('mailchimp', ['майлчимп', 'мейлчим', 'мейлчимп']), ('unisender', ['юнисендер']), ('яндекс аудитории', ['экспорт аудитории', 'экспорт яндекс аудитории']), ('экспорт facebook', ['экспорт сегментов facebook', 'экспорт fb', 'экспорт фейсбук', 'экспорт аудиторий фб', 'fb экспорт']), ('экспорт вк', ['экспорт сегментов vkontakte', 'экспорт vk', 'экспорт контакте', 'экспорт сегментов вконтакте']), ('retailcrm', ['срм', 'ритейл', 'ритейл срм', 'ритейлсрм', 'retail crm', 'ритейлцрм', 'ритейл црм']), ('retailcrm services', [ 'retailcrmservices', 'ритейлцрм services', 'лк crm services', 'ритейлцрм сервисес', 'ритейлсрм сервисес', 'ритейлцрм сервисе', 'ритейлцрмсервисес', 'ритейлсрмсервисес', ]), ('digital pipeline', ['digital pipline']), ] vocab_raw = flatten([[k] + keywords for k, keywords in aliases]) import string import pymorphy3 morph = None def normalize_word(word): if word == 'в' or word == 'из': return '' if word == 'лид': return word if word in ['росии', 'росси']: return 'россия' global morph if morph is None: morph = pymorphy3.MorphAnalyzer() return morph.parse(word)[0].normal_form def tokenize_sentence(text): # remove punctuation text = text.translate(str.maketrans(string.punctuation, ' ' * len(string.punctuation))) # tokenize return list(filter(bool, [normalize_word(word) for word in text.split()])) def normalize_sentence(text): return " ".join(tokenize_sentence(text)) def canonical_keywords(keywords): """ replace keyword aliases with canonical keyword names """ result = [] for k in keywords: k = normalize_sentence(k) for canonical_name, alias_names in aliases: canonical_name = normalize_sentence(canonical_name) for a in alias_names: a = normalize_sentence(a) #print('a', a) if a == k: result.append(canonical_name) break else: continue break else: result.append(k) return result def merge_keywords(keywords): """ remove subkeywords """ result = [] sorted_keywords = sorted(keywords, key=len, reverse=True) for k in sorted_keywords: for rk in result: if rk.lower().startswith(k): break else: result.append(k) continue return result vectorizer = None kw_model = None def init_keyword_extractor(): global vectorizer global kw_model from keybert import KeyBERT import spacy from sklearn.feature_extraction.text import CountVectorizer import warnings warnings.filterwarnings("ignore", category=UserWarning) kw_model = KeyBERT(model=spacy.load("ru_core_news_sm", exclude=['tokenizer', 'tagger', 'parser', 'ner', 'attribute_ruler', 'lemmatizer'])) vocab = [" ".join(tokenize_sentence(s)) for s in vocab_raw] vectorizer = CountVectorizer(ngram_range=(1, 4), vocabulary=vocab, tokenizer=tokenize_sentence) def extract_keywords(text): global vectorizer global kw_model if vectorizer is None or kw_model is None: init_keyword_extractor() #print(normalize_sentence(text)) keywords = [k for k, score in kw_model.extract_keywords(normalize_sentence(text), vectorizer=vectorizer)] return merge_keywords(canonical_keywords(keywords)) def extract_keywords2(text): vocab = sorted([" ".join(tokenize_sentence(s)) for s in vocab_raw], key=len, reverse=True) text = normalize_sentence(text) keywords = [w for w in vocab if w in text] #for w in vocab: # if w in text: # keywords.append(w) for k in keywords: text = text.replace(k, '') return set(merge_keywords(canonical_keywords(keywords))), text