data_frame / app.py
LaoCzi's picture
Create app.py
408009a
raw
history blame
7.06 kB
import re
import os
from functools import lru_cache
import gradio as gr
import requests
LOGIN = os.getenv("LOGIN")
TOKEN = os.getenv("TOKEN")
BASE_URL = 'https://nethunt.com/api/v1/zapier'
FIELDS = {
'company_code': 'ЄДРПОУ',
'company_codes': 'Всі ЄДРПОУ',
'domain': 'Корпоративний домен',
'linkedin': 'LinkedIn',
'email': 'Email',
'phone': 'Телефон',
'company_name': 'Name',
'first_name': 'Имя',
'last_name': 'Фамилия',
'manager': 'Відповідальний'}
FOLDERS = {
'companies': 'Companies',
'contacts': 'Contacts'}
OUTPUT_LIMIT = 5
def _get_query(
endpoint: str,
params: dict[str, str] = None,
json: dict[str, str] = None,
login: str = LOGIN,
token: str = TOKEN,
base_url: str = BASE_URL
) -> list:
url = f'{base_url}{endpoint}'
responce = requests.get(
url=url, params=params, json=json, auth=(login, token))
responce.raise_for_status()
return responce.json()
@lru_cache(maxsize=None)
def _get_readable_folders(**kwargs) -> list[dict]:
endpoint = '/triggers/writable-folder'
return _get_query(endpoint, **kwargs)
@lru_cache(maxsize=None)
def _get_folder_id(folder_name: str, **kwargs) -> str:
readable_folders = _get_readable_folders(**kwargs)
for folder in readable_folders:
if folder['name'] == folder_name:
return folder['id']
raise ValueError('Wrong folder name.')
def _get_nethunt_records(
folder_name: str,
query: str,
limit: int = OUTPUT_LIMIT,
**kwargs
) -> list[dict]:
folder_id = _get_folder_id(folder_name)
endpoint = f'/searches/find-record/{folder_id}'
params = {
'query': query,
'limit': limit}
return _get_query(endpoint, params, **kwargs)
def _clean_found_records(
folder_name: str,
found_records: str,
check_type: str
) -> list:
result_list = []
for v in found_records:
record = {
'folder_name': folder_name,
'duplicate_type': check_type,
'record_id': v.get('id'),
'link': v.get('link'),
'manager': v.get('fields', {}).get(FIELDS['manager'], '')}
if folder_name == FOLDERS['companies']:
company_name = v.get('fields', {}) \
.get(FIELDS['company_name'], '')
record['name'] = company_name
if folder_name == FOLDERS['contacts']:
first_name = v.get('fields', {}) \
.get(FIELDS['first_name'], '')
last_name = v.get('fields', {}) \
.get(FIELDS['last_name'], '')
record['name'] = (first_name + ' ' + last_name).strip()
result_list.append(record)
return result_list
def _check_field(
folder_name: str,
query: str,
check_type: str
) -> list:
found_records = _get_nethunt_records(folder_name, query=query)
if len(found_records) == 0:
return []
return _clean_found_records(folder_name, found_records, check_type)
def _check_company_code(code: str) -> list:
code = code.strip()
folder = FOLDERS['companies']
query = f'''
("{FIELDS['company_code']}":"{code}")
OR ("{FIELDS['company_codes']}":"{code}")'''
return _check_field(folder, query, 'company_code')
def _check_company_domain(domain: str) -> list:
domain = domain.replace('@', '').strip()
folder = FOLDERS['companies']
query = f'''"{FIELDS['domain']}":"{domain}"'''
return _check_field(folder, query, 'domain')
def _check_company_linkedin(linkedin: str) -> list:
linkedin = linkedin.strip()
folder = FOLDERS['companies']
query = f'''"{FIELDS['linkedin']}":"{linkedin}"'''
return _check_field(folder, query, 'company_linkedin')
def _check_contact_email(email: str) -> list:
email = email.lower().strip()
folder = FOLDERS['contacts']
query = f'''"{FIELDS['email']}":"{email}"'''
return _check_field(folder, query, 'email')
def _check_contact_phone(phone: str) -> list:
phone = re.sub(r'[^\d+]', '', phone)
folder = FOLDERS['contacts']
query = f'''"{FIELDS['phone']}":"{phone}"'''
return _check_field(folder, query, 'phone')
def _check_contact_linkedin(linkedin: str) -> list:
linkedin = linkedin.strip()
folder = FOLDERS['contacts']
query = f'''"{FIELDS['linkedin']}":"{linkedin}"'''
return _check_field(folder, query, 'contact_linkedin')
def check_record(
company_code: str = None,
domain: str = None,
company_linkedin: str = None,
email: str = None,
phone: str = None,
contact_linkedin: str = None
) -> list:
found_records = []
checks = {
_check_company_code: company_code,
_check_company_domain: domain,
_check_company_linkedin: company_linkedin,
_check_contact_email: email,
_check_contact_phone: phone,
_check_contact_linkedin: contact_linkedin}
for check, value in checks.items():
if value:
found_records.extend(check(value))
# Апгрейд для вывода
result = '<span>'
count = 0
for item in found_records:
if item['folder_name'] == 'Companies':
icon = '🏢'
elif item['folder_name'] == 'Contacts':
icon = '👨‍💼'
count = count + 1
result += (
f"{icon} "
f"<b>{item['name']}</b> | "
f"Manager: {item['manager']} | "
f"Found by: {item['duplicate_type']} | "
f"Link: <a href=\"{str(item['link'])}\">"
f"{item['record_id']}</a><br><br>")
if count == OUTPUT_LIMIT:
break
if count == 0:
result += "<h2>Nothing found</h2>"
result += "</span>"
return result
def generate(
company_code, domain, company_linkedin,
email, phone, contact_linkedin):
return check_record(
company_code=company_code,
domain=domain,
company_linkedin=company_linkedin,
email=email,
phone=phone,
contact_linkedin=contact_linkedin)
title = "NetHunt search"
css = """
footer {visibility: hidden}
"""
with gr.Blocks(css=css, title=title) as demo:
gr.Markdown(
"""
# NetHuntCrm Search
"""
)
with gr.Row():
with gr.Column():
input_1 = gr.Text(label="Сompany Code")
input_2 = gr.Textbox(label="Corp Domain")
input_3 = gr.Textbox(label="Company Linkedin")
with gr.Column():
input_4 = gr.Textbox(label="Email")
input_5 = gr.Textbox(label="Phone")
input_6 = gr.Textbox(label="Contact LinkedIn")
greet_btn = gr.Button("Search")
output = gr.outputs.HTML()
input = [input_1, input_2, input_3, input_4, input_5, input_6]
greet_btn.click(fn=generate, inputs=input, outputs=output)
demo.launch(debug=True, share=False)