nextplase_project / CustomNextPlaceModel.py
vavelychko's picture
fix CustomNextPlaceModel.py
25d9ae3 verified
raw
history blame
9.15 kB
from typing import Tuple, TypedDict, Optional
import datetime
from datetime import datetime, timedelta
import pandas as pd
from next_place_ai.classes import DataPreparation, DatasetManager, AzureScore
from dotenv import load_dotenv
import os
load_dotenv()
class ProcessedSynapse(TypedDict):
id: Optional[str]
nextplace_id: Optional[str]
property_id: Optional[str]
listing_id: Optional[str]
address: Optional[str]
city: Optional[str]
state: Optional[str]
zip_code: Optional[str]
price: Optional[float]
beds: Optional[int]
baths: Optional[float]
sqft: Optional[int]
lot_size: Optional[int]
year_built: Optional[int]
days_on_market: Optional[int]
latitude: Optional[float]
longitude: Optional[float]
property_type: Optional[str]
last_sale_date: Optional[str]
hoa_dues: Optional[float]
query_date: Optional[str]
class CustomNextPlaceModel:
def __init__(self):
self.repo_id = os.getenv('REPO_ID')
self.hf_token = os.getenv('HF_TOKEN')
self._load_model()
def _load_model(self):
"""
Load all required models for the prediction pipeline
"""
try:
# Model A scoring
self.score_a = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='A',
scored_labels='A'
)
# Model B scorings
self.score_b_1 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='B_1',
scored_labels='B'
)
self.score_b_2 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='B_2',
scored_labels='B'
)
self.score_b_3 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='B_3',
scored_labels='B'
)
# Model C scorings
self.score_c_models = {
'1': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[1]', scored_labels='price'),
'2': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[2]', scored_labels='price'),
'3_4': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[3, 4]', scored_labels='price'),
'5_6': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[5, 6]', scored_labels='price'),
'7': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_[7]', scored_labels='price'),
'8_9': AzureScore(repo_id=self.repo_id, token=self.hf_token, model_filename='model_C_8_9', scored_labels='price')
}
# Time model
self.score_t_1 = AzureScore(
repo_id=self.repo_id,
token=self.hf_token,
model_filename='model_T_1',
scored_labels='days'
)
# Data preparation module
self.data_manager = DatasetManager(repo_id=self.repo_id, token=self.hf_token)
except Exception as e:
raise ValueError(f"Error loading models: {str(e)}")
def predict(self, validators_data: pd.DataFrame) -> pd.DataFrame:
"""
Main prediction pipeline for processing input data
Args:
validators_data (pd.DataFrame): Input validation dataset
Returns:
pd.DataFrame: Processed prediction results
"""
# Ensure input is a DataFrame and has at least one row
if not isinstance(validators_data, pd.DataFrame) or validators_data.empty:
raise ValueError("Input must be a non-empty pandas DataFrame")
# Prepare data preparation instance
dp = DataPreparation(validators_data)
# Prepare initial dataset
dp.prepare_data()
# Predict A scores
score_A = self.score_a.predict_proba_dataset(dp.X)
# Combine datasets
combined_dataset = dp.combine_datasets(score_A, dp.X)
combined_dataset = combined_dataset.drop(columns=['0'])
combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3)
# Predict B scores for different categories
# score_B_1 = self.score_b_1.predict_proba_dataset(combined_dataset[combined_dataset['A']==1])
# score_B_2 = self.score_b_2.predict_proba_dataset(combined_dataset[combined_dataset['A']==2])
# score_B_3 = self.score_b_3.predict_proba_dataset(combined_dataset[combined_dataset['A']==3])
b_scores = {
'1': self.score_b_1.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 1])
if not combined_dataset[combined_dataset['A'] == 1].empty else pd.DataFrame(
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}),
'2': self.score_b_2.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 2])
if not combined_dataset[combined_dataset['A'] == 2].empty else pd.DataFrame(
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}),
'3': self.score_b_3.predict_proba_dataset(combined_dataset[combined_dataset['A'] == 3])
if not combined_dataset[combined_dataset['A'] == 3].empty else pd.DataFrame(
{'B_Probability_Class_0': [0], 'B_Probability_Class_1': [0], 'B_Probability_Class_2': [0]}),
}
# Concatenate B scores
df_B = pd.concat([b_scores['1'], b_scores['2'], b_scores['3']], ignore_index=True)
df_B_ = df_B.dropna()
# Further combine and process dataset
combined_dataset = dp.combine_datasets(df_B_, dp.X)
combined_dataset = combined_dataset.drop(columns=['0'])
combined_dataset, _ = dp.create_convolution_features(combined_dataset, combined_dataset.columns.to_list(), 3)
# Predict C scores for different categories
c_scores = {
'1': self.score_c_models['1'].predict_dataset(combined_dataset[combined_dataset['B'].isin([1])])
if not combined_dataset[combined_dataset['B'].isin([1])].empty else pd.DataFrame({'price': [0]}),
'2': self.score_c_models['2'].predict_dataset(combined_dataset[combined_dataset['B'].isin([2])])
if not combined_dataset[combined_dataset['B'].isin([2])].empty else pd.DataFrame({'price': [0]}),
'3_4': self.score_c_models['3_4'].predict_dataset(combined_dataset[combined_dataset['B'].isin([3, 4])])
if not combined_dataset[combined_dataset['B'].isin([3, 4])].empty else pd.DataFrame({'price': [0]}),
'5_6': self.score_c_models['5_6'].predict_dataset(combined_dataset[combined_dataset['B'].isin([5, 6])])
if not combined_dataset[combined_dataset['B'].isin([5, 6])].empty else pd.DataFrame({'price': [0]}),
'7': self.score_c_models['7'].predict_dataset(combined_dataset[combined_dataset['B'].isin([7])])
if not combined_dataset[combined_dataset['B'].isin([7])].empty else pd.DataFrame({'price': [0]}),
'8_9': self.score_c_models['8_9'].predict_dataset(combined_dataset[combined_dataset['B'].isin([8, 9])])
if not combined_dataset[combined_dataset['B'].isin([8, 9])].empty else pd.DataFrame({'price': [0]})
}
df_C = pd.concat(
[c_scores[key][['price']] for key in c_scores
if
isinstance(c_scores[key], pd.DataFrame) and 'price' in c_scores[key].columns and not c_scores[key].empty],
ignore_index=True
)
df_C_ = df_C[df_C['price'] != 0].copy()
# Combine datasets
t_df_ = pd.concat([combined_dataset.reset_index(drop=True), df_C_.reset_index(drop=True)], axis=1)
# Predict time
score_t_1 = self.score_t_1.predict_dataset(t_df_).astype(int)
# Final result
result = pd.concat([df_C_.reset_index(drop=True), score_t_1.reset_index(drop=True)], axis=1)
return result
def run_inference(self, input_data: ProcessedSynapse) -> Tuple[float, str]:
input_data = pd.DataFrame([input_data])
result = self.predict(input_data)
predicted_sale_price, predicted_days = result['price'].iloc[0], result['days'].iloc[0] # кол-во дней нужно преобразовать в дату в виде строки
current_days_on_market = input_data['days_on_market'].iloc[0] if 'days_on_market' in input_data else 0
# Вычисление даты размещения на рынке
date_listed = datetime.now() - timedelta(days=int(current_days_on_market))
# Вычисление предсказанной даты продажи
predicted_sale_date = (date_listed + timedelta(days=int(predicted_days))).strftime('%Y-%m-%d')
return float(predicted_sale_price), predicted_sale_date