hackai24skfo / app.py
zaborshicov's picture
Update app.py
b99387a verified
import numpy as np
import pandas as pd
from catboost import CatBoostRegressor, CatBoostClassifier, Pool
from sklearn.model_selection import ShuffleSplit, train_test_split, GridSearchCV
from sklearn.neighbors import KNeighborsRegressor, NearestNeighbors
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
import gradio as gr
from gradio_rangeslider import RangeSlider
from utils import *
SEED = 8642
np.random.seed(SEED)
config = {'min_xval':55.55, 'max_xval':55.95, 'min_yval':37.3, 'max_yval':37.9, 'x_ngroups': 8, 'y_ngroups': 8}
df = pd.read_json('./train_data.json')
target = df.pop('value')
points = []
for i in df['points']:
for j in i:
points += [(float(j['lat']), float(j['lon']))]
points_set = set(points)
model_path = './catboost_for_polygons'
cb_model = CatBoostClassifier()
cb_model.load_model(model_path)
print('CB INIT: Done.')
def calculate_distance(coord1, coord2):
lat1, lon1 = coord1
lat2, lon2 = coord2
return ((lat1 - lat2) ** 2 + (lon1 - lon2) ** 2) ** 0.5
def find_nearest_points(target_coord, points, n):
distances = [(calculate_distance(target_coord, point), point) for point in points]
distances.sort()
return [point for dist, point in distances[:n]]
def cross_product(o, a, b):
return (a[0] - o[0]) * (b[1] - o[1]) - (a[1] - o[1]) * (b[0] - o[0])
def convex_hull(points):
# алгоритм Джарвиса
points = sorted(points)
if len(points) <= 2:
return points
lower = []
upper = []
for p in points:
while len(lower) >= 2 and cross_product(lower[-2], lower[-1], p) <= 0:
lower.pop()
lower.append(p)
for p in reversed(points):
while len(upper) >= 2 and cross_product(upper[-2], upper[-1], p) <= 0:
upper.pop()
upper.append(p)
return lower[:-1] + upper[:-1]
def process_income(income_categs: list) -> str:
if income_categs == ['a'] or income_categs == ['b']:
return 'ab'
elif income_categs == ['a', 'c']:
return 'abc'
else:
return ''.join(sorted(income_categs))
def sector_inference(cb_model: CatBoostClassifier, income: list, gender: str,
ageFrom: int, ageTo: int, value: float):
"""
income: NON-EMPTY list containing 'a', 'b', or 'c'
gender: one of ['male', 'female', 'all']
ageFrom: lower bound for target audience
ageTo: upper bound
value: campaign results
"""
needed_cols = ['value',
'ageFrom',
'ageTo',
'age_span',
'age_mean',
'all',
'female',
'male',
'ab',
'abc',
'bc',
'c']
str_income = process_income(income)
if str_income not in needed_cols:
str_income = 'ab'
if gender not in needed_cols:
gender = 'all'
x_intervals = split_on_intervals(config['min_xval'], config['max_xval'], config['x_ngroups'])
y_intervals = split_on_intervals(config['min_yval'], config['max_yval'], config['y_ngroups'])
my_df = dict()
my_df['value'] = [value]
my_df['ageFrom'] = [ageFrom]
my_df['ageTo'] = [ageTo]
my_df['age_span'] = [(np.array(my_df['ageTo']) - np.array(my_df['ageFrom'])) ** 1/2]
my_df['age_mean'] = [((np.array(my_df['ageTo']) + np.array(my_df['ageFrom'])) / 2) ** 1/2]
my_df['all'] = [gender == 'all']
my_df['male'] = [gender == 'male']
my_df['female'] = [gender == 'female']
my_df['ab'] = [str_income == 'ab']
my_df['abc'] = [str_income == 'abc']
my_df['bc'] = [str_income == 'bc']
my_df['c'] = [str_income == 'c']
test_df = pd.DataFrame.from_dict(my_df)
sec_id = cb_model.predict(test_df).flatten()[0]
x_coord = x_intervals[sec_id % (config['y_ngroups'] + 2)]
y_coord = y_intervals[sec_id // (config['x_ngroups'] + 2)]
return (x_coord, y_coord)
def get_point_set(df: pd.DataFrame):
points = []
for i in df['points']:
for j in i:
points += [(float(j['lat']), float(j['lon']))]
points_set = set(points)
return points_set
def run(n_neighbors: int, source_df: pd.DataFrame, income: list,
gender: str, ageFrom: int, ageTo: int, value: float, aprox: bool):
target_coord = sector_inference(cb_model, income=income, gender=gender,
ageFrom=ageFrom, ageTo=ageTo, value=value) ####
points_set = get_point_set(source_df)
nearest_points = find_nearest_points(target_coord, points_set, n_neighbors)
if aprox:
return convex_hull(nearest_points)
else:
return nearest_points
def gradio_runner(n_neighbors: float, income: list, gender: str, age: tuple, value: float, aprox: bool):
ageFrom, ageTo = age
errmsg = ''
if n_neighbors < 3:
errmsg += 'ERROR: You need at least 3 points. Change "n_neighbors" param.\n'
if n_neighbors % 1:
errmsg += 'ERROR: "n_neighbors" param must be INTEGER. Change this param.\n'
if ageFrom % 1 or ageTo % 1:
errmsg += 'ERROR: Age must be INTEGER. Edit "age" param.\n'
if not income:
errmsg += 'ERROR: You need "income" field. Change "income" param. \n'
if errmsg:
return errmsg[:-1]
return run(n_neighbors=n_neighbors,
source_df=df,
income=income,
gender=gender,
ageFrom=ageFrom,
ageTo=ageTo,
value=value,
aprox=aprox)
demo = gr.Interface(fn=gradio_runner, inputs=[gr.Number(3), gr.CheckboxGroup(['a', 'b', 'c'], value=['a','b','c']), gr.Radio(['all', 'male', 'female'], value='all'), RangeSlider(minimum=0, maximum=100, value=(0, 100)), gr.Slider(value=100), gr.Checkbox()], outputs="text")
demo.launch()