import numpy as np import pandas as pd from tensorflow.data import Dataset from tensorflow import lite,float16,random,where,cast,float32,constant,reshape from sklearn.model_selection import train_test_split from os.path import isdir import os from keras.callbacks import EarlyStopping,ModelCheckpoint from keras.models import Model,load_model from keras.layers import Layer,Dense,Dropout,Input,Embedding,Concatenate from keras.optimizers import Adam from keras.losses import mean_absolute_error from keras.metrics import R2Score import sys User=sys.argv[1] print(sys.argv[0]) Data=pd.read_csv(f"./IndoorLocalization/Data/{User}/Data.csv") print(Data) PhonsId=np.sort(Data["PHONEID"].unique()) phoneidMap={phoneid:i for i,phoneid in enumerate(PhonsId)} def ReplaceId(id): if (np.random.randint(low=0,high=20,size=1)==3): return len(phoneidMap) return phoneidMap[id] Data["PHONEID"]=Data["PHONEID"].apply(ReplaceId) Data=Data.dropna() def CleanTrainData(df): target=df[["SPACEID","LONGITUDE","LATITUDE"]] df=df.drop(['LONGITUDE', 'LATITUDE',"SPACEID"], axis=1) return df, target DataX,TargetY=CleanTrainData(Data) LONGITUDEMax=TargetY["LONGITUDE"].max() LATITUDEMax=TargetY["LATITUDE"].max() LONGITUDEMin=TargetY["LONGITUDE"].min() LATITUDEMin=TargetY["LATITUDE"].min() BuildingWidth=20 BuildingLength=20 def LONGITUDE_min_max_newrange(item): return ((item-LONGITUDEMin)/(LONGITUDEMax-LONGITUDEMin))*BuildingWidth def LATITUDE_min_max_newrange(item): return ((item-LATITUDEMin)/(LATITUDEMax-LATITUDEMin))*BuildingLength TargetY["LONGITUDE"]=TargetY["LONGITUDE"].apply(LONGITUDE_min_max_newrange) TargetY["LATITUDE"]=TargetY["LATITUDE"].apply(LATITUDE_min_max_newrange) X_train, X_test, y_train, y_test = train_test_split(DataX.values,TargetY.values[:,1:], test_size=0.2, random_state=42,shuffle=True,stratify=TargetY.values[:,0]) SPACESGroups=TargetY.groupby("SPACEID") SPACESGroupsmean=SPACESGroups.mean() SPACEIDPosition={f"{SPACEID}":(SPACESGroupsmean.query(f"SPACEID=={SPACEID}")["LONGITUDE"].values[0],SPACESGroupsmean.query(f"SPACEID=={SPACEID}")["LATITUDE"].values[0]) for SPACEID in list(SPACESGroups.groups.keys()) } SPACEIDPositionArray=np.array([list(SPACEIDPosition[f"{i}"]) for i in SPACESGroups.groups.keys()]) PlacesNumber=len(np.unique(TargetY.iloc[:,0])) PhonesNumber=np.unique(DataX["PHONEID"]).size def ApplyNormalizationthenNois(X,Phoneid,Y): X=cast(X,dtype=float32) Y=cast(Y,dtype=float32) additem=np.random.choice([0,1,2]) Nuknow=np.random.randint(0,high=5,size=1) X=(X+100)/200 if additem ==1: if Nuknow==0: return (X,0),Y return (X,Phoneid),Y else: noise=random.normal(shape=X.shape,mean=0,stddev=0.1,dtype=float32) NoisedX=X+noise NoisedX=where(NoisedX<0,x=0.0,y=NoisedX) NoisedX=where(NoisedX>1,x=1.0,y=NoisedX) if Nuknow==0: return (NoisedX,0),Y return (NoisedX,Phoneid),Y def ApplyNormalizationOnly(X,Phoneid,Y): X=cast(X,dtype=float32) Y=cast(Y,dtype=float32) X=(X+100)/200 if Phoneid ==1: return (X,1),Y elif Phoneid ==2: return (X,2),Y else: return (X,0),Y TrainDataPipeline=Dataset.from_tensor_slices((X_train[:,:-1],X_train[:,-1],y_train)).map(ApplyNormalizationthenNois).batch(100) TestDataPipeline=Dataset.from_tensor_slices((X_test[:,:-1],X_test[:,-1],y_test)).map(ApplyNormalizationOnly).batch(10) class PositionAproxmator(Layer): def __init__(self,PlacesPosition,name="PositionAproxmator"): super(PositionAproxmator,self).__init__() self.PlacesPosition=constant(PlacesPosition,dtype=float32,name="PlacesPositions") def build(self,inputs_shape): self.W=self.add_weight(shape=(inputs_shape[1],2),trainable=True,dtype=float32,name="PlacesWeight") def call(self,Probilites): return Probilites@(self.PlacesPosition+self.W) def MakeModel(SPACEIDPosition,PhonesNumber): if isdir(f"./IndoorLocalization/IndoorModels/{User}/kerasModel"): return load_model(f"./IndoorLocalization/IndoorModels/{User}/kerasModel") WiFiReadings=Input(168) Phoneid=Input(1) Embeding=Embedding(PhonesNumber,64, embeddings_regularizer="l2")(Phoneid) X=Dense(128,activation="relu")(WiFiReadings) X=Dropout(0.2)(X) z=Dense(64,activation="relu")(X) X=z+reshape(Embeding,shape=(-1,64)) X=Concatenate()([z,X,reshape(Embeding,shape=(-1,64))]) X=Dropout(0.2)(X) X=Dense(100,activation="relu", kernel_regularizer="l2")(X) X=Dropout(0.1)(X) X=Dense(64,activation="relu")(X) X=X+z X=Dense(64,activation="relu")(X) X=Dropout(0.2)(X) X=Dense(PlacesNumber,activation="softmax")(X) X=PositionAproxmator(SPACEIDPosition)(X) return Model(inputs=[WiFiReadings,Phoneid],outputs=[X]) model=MakeModel(SPACEIDPositionArray,PhonesNumber) model.compile(optimizer=Adam(learning_rate=1e-4),loss=mean_absolute_error,metrics=[R2Score()]) hsitory=model.fit(TrainDataPipeline,validation_data=TestDataPipeline,epochs=5,callbacks=[EarlyStopping(patience=3),ModelCheckpoint(f"./IndoorLocalization/IndoorModels/{User}/kerasModel")]) # hsitory=model.fit(TrainDataPipeline,validation_data=TestDataPipeline,epochs=5,callbacks=[EarlyStopping(patience=3),ModelCheckpoint(r"C:\Users\mf\Desktop\AIProjects")]) converter=lite.TFLiteConverter.from_keras_model(model) converter.optimizations=[lite.Optimize.DEFAULT] converter.target_spec.supported_types=[float16] tflitemodel=converter.convert() with open(f"./IndoorLocalization/IndoorModels/{User}/FinalHistoryModel.tflite","wb") as file: file.write(tflitemodel)