import json import pickle import requests import geopy.distance import numpy as np import pandas as pd from sklearn.metrics import mean_squared_error, mean_absolute_error from sklearn.model_selection import train_test_split from sklearn.pipeline import make_pipeline from sklearn.preprocessing import StandardScaler from sklearn.ensemble import RandomForestRegressor def get_data_data_gov(url): query_string = url resp = requests.get(query_string) data = json.loads(resp.content) # convert from json to python dict print('Number of records:', len(data.get('result').get('records'))) data = data['result']['records'] df = pd.DataFrame.from_dict(data).drop(['_id'], axis=1) # print(df.isnull().sum()) # print(df.dtypes) return df def get_data_singstat_price_index(url): query_string = url resp = requests.get(query_string) data = json.loads(resp.content) # convert from json to python dict print('Number of records:', len(data.get('Data').get('row')[0].get('columns'))) df = pd.DataFrame.from_dict(data.get('Data').get('row')[0].get('columns')) # print(df.isnull().sum()) # print(df.dtypes) return df def get_data_one_map(address, is_LAT_LONG_only=False): query_string = 'https://developers.onemap.sg/commonapi/search?searchVal='+str(address)+'&returnGeom=Y&getAddrDetails=Y' resp = requests.get(query_string) data = json.loads(resp.content) # convert from json to python dict if data['found'] != 0 and is_LAT_LONG_only: data = data['results'][0] data = (data['LATITUDE'], data['LONGITUDE']) elif data['found'] != 0: data = data['results'][0] else: data = None return data def distance_to_city(address): hdb_coordinates = get_data_one_map(address, is_LAT_LONG_only = True) return geopy.distance.great_circle((1.29293672227779, 103.852585580366), hdb_coordinates).km def distance_to_nearest_MRT_station(address): hdb_coordinates = get_data_one_map(address, is_LAT_LONG_only = True) df_MRT = pd.read_pickle('./df_MRT.pkl') MRT_coordinates = df_MRT.T.iloc[-1,:].tolist() dist = [] for coordinates in MRT_coordinates: dist.append(geopy.distance.great_circle(hdb_coordinates, coordinates).km) return min(dist) def month_to_quarter(x): year = int(x.split('-')[0]) month = int(x.split('-')[1]) if month <= 3: month = '1Q' elif month <= 6: month = '2Q' elif month <= 9: month = '3Q' else: month = '4Q' return (str(year) + '-' + str(month)) def get_update(data_year): ### DATA EXTRACTION AND PREPROCESSING ### df_raw_data = get_data_data_gov('https://data.gov.sg/api/action/datastore_search?resource_id=f1765b54-a209-4718-8d38-a39237f502b3&limit=1000000') df_raw_data['address'] = df_raw_data['block'] + ' ' + df_raw_data['street_name'] df_raw_data['quarter'] = df_raw_data['month'].apply(month_to_quarter) df_raw_data['year_sold'] = df_raw_data['month'].apply((lambda x: int(x.split('-')[0]))) df_raw_data['month_sold'] = df_raw_data['month'].apply((lambda x: int(x.split('-')[1]))) df_raw_data['remaining_lease_years'] = df_raw_data['remaining_lease'].apply(lambda x: int(x.split()[0])) df_raw_data['floor_area_sqft'] = round((df_raw_data['floor_area_sqm'].astype(float))*10.764) df_price_index = get_data_singstat_price_index('https://tablebuilder.singstat.gov.sg/api/table/tabledata/M212161?isTestApi=true') df_price_index = df_price_index.rename(columns = {'key':'quarter', 'value': 'index'}) df_price_index['quarter'] = df_price_index['quarter'].apply(lambda x : x.replace(' ', '-')) df_selected_year = df_raw_data[df_raw_data['year_sold']>=data_year] quarter_list = list(df_price_index['quarter']) df_selected_year['quarter'] = df_selected_year['quarter'].apply(lambda x : x if x in quarter_list else quarter_list[-1]) df_selected_year = pd.merge(df_selected_year, df_price_index, how='left', on='quarter') # convert to float df_selected_year['index'] = pd.to_numeric(df_selected_year['index']) df_selected_year['resale_price'] = pd.to_numeric(df_selected_year['resale_price']) # normalised to latest price index df_selected_year['normalised_resale_price'] = round(df_selected_year['resale_price']*float(df_price_index.tail(1)['index'])/df_selected_year['index'],0) df_selected_year['price_psf'] = round(df_selected_year['normalised_resale_price']/df_selected_year['floor_area_sqft']) df_selected_year['storey_range'] = df_selected_year['storey_range'] \ .str.replace('01 TO 03', 'Low Floor') \ .str.replace('04 TO 06', 'Mid Floor') \ .str.replace('07 TO 09', 'Mid Floor') \ .str.replace('10 TO 12', 'High Floor') \ .str.replace('13 TO 15', 'High Floor') \ .str.replace('16 TO 18', 'High Floor') \ .str.replace('19 TO 21', 'High Floor') \ .str.replace('22 TO 24', 'High Floor') \ .str.replace('25 TO 27', 'High Floor') \ .str.replace('25 TO 27', 'High Floor') \ .str.replace('25 TO 27', 'High Floor') \ .str.replace('28 TO 30', 'High Floor') \ .str.replace('31 TO 33', 'High Floor') \ .str.replace('34 TO 36', 'High Floor') \ .str.replace('37 TO 39', 'High Floor') \ .str.replace('40 TO 42', 'High Floor') \ .str.replace('43 TO 45', 'High Floor') \ .str.replace('46 TO 48', 'High Floor') \ .str.replace('49 TO 51', 'High Floor') df_selected_year = df_selected_year.drop(columns=['street_name', 'resale_price', 'remaining_lease', 'lease_commence_date', 'block']) HDB_address_list = df_selected_year['address'].unique().tolist() data_list = [] for i in range(0, len(HDB_address_list)): data = get_data_one_map(HDB_address_list[i]) if data is not None: data_list.append(data) df_HDB = pd.DataFrame.from_dict(data_list) #creating primary key for df_HDB for further table join df_HDB['LAT_LONG']= (df_HDB['LATITUDE'] +' '+ df_HDB['LONGITUDE']).apply(lambda x: tuple(x.split(' '))) tmp_df = pd.read_csv('./MRT_LRT_STATION.csv') MRT_list = tmp_df['Station'].unique().tolist() print('Number of MRT stations:', len(MRT_list)) data_list = [] for i in range(0, len(MRT_list)): data = get_data_one_map(MRT_list[i]) if data is not None: data_list.append(data) df_MRT = pd.DataFrame.from_dict(data_list) df_MRT['LAT_LONG'] = (df_MRT['LATITUDE'] +' '+ df_MRT['LONGITUDE']).apply(lambda x: tuple(x.split(' '))) df_MRT.to_pickle('df_MRT.pkl') MRT_coordinates = df_MRT.T.iloc[-1,:].tolist() df_HDB_coordinates = pd.DataFrame(df_HDB['LAT_LONG']) df_HDB_coordinates = pd.DataFrame(df_HDB['LAT_LONG']) for coordinates in MRT_coordinates: df_HDB_coordinates[coordinates]=df_HDB['LAT_LONG'].apply(lambda y: geopy.distance.great_circle(y, coordinates).km) # get the distance from each address to the nearest station df_HDB_coordinates['distance_to_nearest_MRT_station'] = df_HDB_coordinates.iloc[:,1:].apply(lambda x: min(x), axis=1) df_HDB_coordinates_with_MRT_distance = df_HDB_coordinates.iloc[:,[0,-1]] df_HDB = pd.merge(df_HDB, df_HDB_coordinates_with_MRT_distance, on='LAT_LONG', how='left') df_HDB = df_HDB.drop(columns=['SEARCHVAL', 'BUILDING', 'ADDRESS' ,'X', 'Y', 'LONGTITUDE']) # City Hall: 1.29293672227779 103.852585580366 df_HDB['distance_to_city'] = df_HDB['LAT_LONG'].apply(lambda x: geopy.distance.great_circle((1.29293672227779, 103.852585580366), x).km) df_HDB['address'] = df_HDB ['BLK_NO'] + ' ' + df_HDB ['ROAD_NAME'] df_HDB['address'] = df_HDB['address'] \ .str.replace('AVENUE', 'AVE') \ .str.replace('CRESCENT', 'CRES') \ .str.replace('ROAD', 'RD') \ .str.replace('STREET', 'ST') \ .str.replace('CENTRAL', 'CTRL') \ .str.replace('HEIGHTS', 'HTS') \ .str.replace('TERRACE', 'TER') \ .str.replace('JALAN', 'JLN') \ .str.replace('DRIVE', 'DR') \ .str.replace('PLACE', 'PL') \ .str.replace('CLOSE', 'CL') \ .str.replace('PARK', 'PK') \ .str.replace('GARDENS', 'GDNS') \ .str.replace('NORTH', 'NTH') \ .str.replace('SOUTH', 'STH') \ .str.replace('BUKIT', 'BT') \ .str.replace('UPPER', 'UPP}') \ .str.replace('COMMONWEALTH', "C'WEALTH") df_clean_data = pd.merge(df_selected_year, df_HDB, on='address', how='left') df_clean_data = df_clean_data.dropna(subset=['LAT_LONG']) ### FEATURE SELECTION ### features = [ 'flat_type', 'storey_range', 'floor_area_sqft', 'remaining_lease_years', 'distance_to_nearest_MRT_station', 'distance_to_city', 'price_psf'] df = df_clean_data[features] df = pd.get_dummies(df) ### MODEL TRAINING AND TESTING ### X = df.drop('price_psf', axis=1) y = df['price_psf'] print('Average price_per_sqm:', y.mean()) print('Min price_per_sqm:', y.min()) print('Max price_per_sqm:', y.max()) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2) model_RFR = make_pipeline(StandardScaler(),RandomForestRegressor()) model_RFR.fit(X_train, y_train) # apply scaling on training data model_RFR.score(X_test, y_test) print('Mean Absolute Error:', mean_absolute_error(y_test, model_RFR.predict(X_test))) print('Mean Squared Error:', mean_squared_error(y_test, model_RFR.predict(X_test))) print('Root Mean Squared Error:', np.sqrt(mean_squared_error(y_test, model_RFR.predict(X_test)))) f = open("model.log", "r") data = f.readline() model_score = float(data.split()[-1]) MSE = np.sqrt(mean_squared_error(y_test, model_RFR.predict(X_test))) if MSE < model_score: pickle.dump(model_RFR, open('model.sav', 'wb')) f = open("model.log", "w") f.write(f'model_score = {MSE}') f.close()