Spaces:
Sleeping
Sleeping
import re | |
import os | |
from functools import lru_cache | |
import gradio as gr | |
import requests | |
LOGIN = os.getenv("LOGIN") | |
TOKEN = os.getenv("TOKEN") | |
BASE_URL = 'https://nethunt.com/api/v1/zapier' | |
FIELDS = { | |
'company_code': 'ЄДРПОУ', | |
'company_codes': 'Всі ЄДРПОУ', | |
'domain': 'Корпоративний домен', | |
'linkedin': 'LinkedIn', | |
'email': 'Email', | |
'phone': 'Телефон', | |
'company_name': 'Name', | |
'first_name': 'Имя', | |
'last_name': 'Фамилия', | |
'manager': 'Відповідальний'} | |
FOLDERS = { | |
'companies': 'Companies', | |
'contacts': 'Contacts'} | |
OUTPUT_LIMIT = 5 | |
def _get_query( | |
endpoint: str, | |
params: dict[str, str] = None, | |
json: dict[str, str] = None, | |
login: str = LOGIN, | |
token: str = TOKEN, | |
base_url: str = BASE_URL | |
) -> list: | |
url = f'{base_url}{endpoint}' | |
responce = requests.get( | |
url=url, params=params, json=json, auth=(login, token)) | |
responce.raise_for_status() | |
return responce.json() | |
def _get_readable_folders(**kwargs) -> list[dict]: | |
endpoint = '/triggers/writable-folder' | |
return _get_query(endpoint, **kwargs) | |
def _get_folder_id(folder_name: str, **kwargs) -> str: | |
readable_folders = _get_readable_folders(**kwargs) | |
for folder in readable_folders: | |
if folder['name'] == folder_name: | |
return folder['id'] | |
raise ValueError('Wrong folder name.') | |
def _get_nethunt_records( | |
folder_name: str, | |
query: str, | |
limit: int = OUTPUT_LIMIT, | |
**kwargs | |
) -> list[dict]: | |
folder_id = _get_folder_id(folder_name) | |
endpoint = f'/searches/find-record/{folder_id}' | |
params = { | |
'query': query, | |
'limit': limit} | |
return _get_query(endpoint, params, **kwargs) | |
def _clean_found_records( | |
folder_name: str, | |
found_records: str, | |
check_type: str | |
) -> list: | |
result_list = [] | |
for v in found_records: | |
record = { | |
'folder_name': folder_name, | |
'duplicate_type': check_type, | |
'record_id': v.get('id'), | |
'link': v.get('link'), | |
'manager': v.get('fields', {}).get(FIELDS['manager'], '')} | |
if folder_name == FOLDERS['companies']: | |
company_name = v.get('fields', {}) \ | |
.get(FIELDS['company_name'], '') | |
record['name'] = company_name | |
if folder_name == FOLDERS['contacts']: | |
first_name = v.get('fields', {}) \ | |
.get(FIELDS['first_name'], '') | |
last_name = v.get('fields', {}) \ | |
.get(FIELDS['last_name'], '') | |
record['name'] = (first_name + ' ' + last_name).strip() | |
result_list.append(record) | |
return result_list | |
def _check_field( | |
folder_name: str, | |
query: str, | |
check_type: str | |
) -> list: | |
found_records = _get_nethunt_records(folder_name, query=query) | |
if len(found_records) == 0: | |
return [] | |
return _clean_found_records(folder_name, found_records, check_type) | |
def _check_company_code(code: str) -> list: | |
code = code.strip() | |
folder = FOLDERS['companies'] | |
query = f''' | |
("{FIELDS['company_code']}":"{code}") | |
OR ("{FIELDS['company_codes']}":"{code}")''' | |
return _check_field(folder, query, 'company_code') | |
def _check_company_domain(domain: str) -> list: | |
domain = domain.replace('@', '').strip() | |
folder = FOLDERS['companies'] | |
query = f'''"{FIELDS['domain']}":"{domain}"''' | |
return _check_field(folder, query, 'domain') | |
def _check_company_linkedin(linkedin: str) -> list: | |
linkedin = linkedin.strip() | |
folder = FOLDERS['companies'] | |
query = f'''"{FIELDS['linkedin']}":"{linkedin}"''' | |
return _check_field(folder, query, 'company_linkedin') | |
def _check_contact_email(email: str) -> list: | |
email = email.lower().strip() | |
folder = FOLDERS['contacts'] | |
query = f'''"{FIELDS['email']}":"{email}"''' | |
return _check_field(folder, query, 'email') | |
def _check_contact_phone(phone: str) -> list: | |
phone = re.sub(r'[^\d+]', '', phone) | |
folder = FOLDERS['contacts'] | |
query = f'''"{FIELDS['phone']}":"{phone}"''' | |
return _check_field(folder, query, 'phone') | |
def _check_contact_linkedin(linkedin: str) -> list: | |
linkedin = linkedin.strip() | |
folder = FOLDERS['contacts'] | |
query = f'''"{FIELDS['linkedin']}":"{linkedin}"''' | |
return _check_field(folder, query, 'contact_linkedin') | |
def check_record( | |
company_code: str = None, | |
domain: str = None, | |
company_linkedin: str = None, | |
email: str = None, | |
phone: str = None, | |
contact_linkedin: str = None | |
) -> list: | |
found_records = [] | |
checks = { | |
_check_company_code: company_code, | |
_check_company_domain: domain, | |
_check_company_linkedin: company_linkedin, | |
_check_contact_email: email, | |
_check_contact_phone: phone, | |
_check_contact_linkedin: contact_linkedin} | |
for check, value in checks.items(): | |
if value: | |
found_records.extend(check(value)) | |
# Апгрейд для вывода | |
result = '<span>' | |
count = 0 | |
for item in found_records: | |
if item['folder_name'] == 'Companies': | |
icon = '🏢' | |
elif item['folder_name'] == 'Contacts': | |
icon = '👨💼' | |
count = count + 1 | |
result += ( | |
f"{icon} " | |
f"<b>{item['name']}</b> | " | |
f"Manager: {item['manager']} | " | |
f"Found by: {item['duplicate_type']} | " | |
f"Link: <a href=\"{str(item['link'])}\">" | |
f"{item['record_id']}</a><br><br>") | |
if count == OUTPUT_LIMIT: | |
break | |
if count == 0: | |
result += "<h2>Nothing found</h2>" | |
result += "</span>" | |
return result | |
def generate( | |
company_code, domain, company_linkedin, | |
email, phone, contact_linkedin): | |
return check_record( | |
company_code=company_code, | |
domain=domain, | |
company_linkedin=company_linkedin, | |
email=email, | |
phone=phone, | |
contact_linkedin=contact_linkedin) | |
title = "NetHunt search" | |
css = """ | |
footer {visibility: hidden} | |
""" | |
with gr.Blocks(css=css, title=title) as demo: | |
gr.Markdown( | |
""" | |
# NetHuntCrm Search | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(): | |
input_1 = gr.Text(label="Сompany Code") | |
input_2 = gr.Textbox(label="Corp Domain") | |
input_3 = gr.Textbox(label="Company Linkedin") | |
with gr.Column(): | |
input_4 = gr.Textbox(label="Email") | |
input_5 = gr.Textbox(label="Phone") | |
input_6 = gr.Textbox(label="Contact LinkedIn") | |
greet_btn = gr.Button("Search") | |
output = gr.outputs.HTML() | |
input = [input_1, input_2, input_3, input_4, input_5, input_6] | |
greet_btn.click(fn=generate, inputs=input, outputs=output) | |
demo.launch(debug=True, share=False) | |