import tensorflow as tf import os import numpy as np # the package needed when use Spec_Checker class try: from astropy.table import Table import matplotlib.pyplot as plt import numpy as np import pandas as pd import seaborn as sns found = True except ImportError: found = False print('error, pack not found####') # https://www.tensorflow.org/guide/keras/train_and_evaluate?hl=zh-cn class Metric_Fun(tf.keras.metrics.Metric): """ A customized metric. metric = accraacy - mae. The larger it is, the better. The ideal value is 1.0, where acc=1 and mae=0. """ def __init__(self,name="Metric_Fun", **kwargs): super(Metric_Fun,self).__init__(name=name, **kwargs) self.evalue = self.add_weight('evalue', initializer='zeros') # https://www.tensorflow.org/api_docs/python/tf/keras/metrics/BinaryAccuracy self.acc = tf.keras.metrics.BinaryAccuracy() # https://www.tensorflow.org/api_docs/python/tf/keras/metrics/MeanAbsoluteError self.mae = tf.keras.metrics.MeanAbsoluteError() def update_state(self, y_true, y_pred, sample_weight=None): y_true = tf.cast(y_true,dtype=tf.float32) y_pred = tf.cast(y_pred, dtype=tf.float32) self.mae.update_state(y_true[:,4:5], y_pred[:,4:5]) abs_error = self.mae.result() self.acc.update_state(y_true[ : , 0:4], y_pred[ : , 0:4]) accracy = self.acc.result() #evalue = accracy evalue = accracy - abs_error self.evalue.assign(evalue) def result(self): return self.evalue def reset_state(self): # The state of the metric will be reset at the start of each epoch. self.evalue.assign(0.0) class GasNet3: """ Initialize, setting the input pixel, strat wavelength, end wavelength, and output channel and the network name """ def __init__(self,Network_name,Output_channel): #self.Input_pixel = 10000 #self.Start_wavelength = 4000 #self.End_wavelength = 9000 #self.Input_wavelength = np.linspace(self.Start_wavelength,self.End_wavelength,self.Input_pixel) self.Network_name = Network_name self.Input_wavelength = np.load('./test_data/wavelengths.npy') self.Input_pixel = len(self.Input_wavelength) self.Inpt = tf.keras.layers.Input(shape=(self.Input_pixel,1)) #shape of spectra self.Output_channel = Output_channel self.batch = 128 # training batch self.redshift_range = [0,4] self.class_names = {b'AGN':0,b'GALAXY':1,b'QSO':2,b'STAR':3} self.lable_dim = len(self.class_names) def Wavelength_Grid(self): """ Return the grid of input wavelength """ return self.Input_wavelength def Interpolate_Flux(self,wavelength,flux): """ Interpolate the specturm flux into a suitable shape """ if flux.ndim != 1: Int_flux = [np.interp(self.Input_wavelength,wavelength[i],flux[i]) for i in range(len(flux))] Int_flux = np.array(Int_flux) else: Int_flux = np.interp(self.Input_wavelength,wavelength,flux) return Int_flux def Append_Noise_Sample(self): """ a extra blank noise will add during training """ pass def Block_ResNet(self,x0,n): """ one ResNet Block, to reduce feature dimension """ core_size = 5 x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=2,padding='same')(x0) x=tf.keras.layers.BatchNormalization()(x) x=tf.keras.layers.Activation('relu')(x) x=tf.keras.layers.Conv1D(2*n, kernel_size=core_size,padding='same')(x) x=tf.keras.layers.BatchNormalization()(x) ShortCut = tf.keras.layers.Conv1D(2*n,kernel_size=2,strides=2,padding='same')(x0) x = tf.keras.layers.Add()([x,ShortCut]) x=tf.keras.layers.Activation('relu')(x) x = tf.keras.layers.MaxPooling1D(pool_size=core_size,strides=2)(x) return x def Block_ResNet_2(self,x0,n): """ one ResNet Block, to not reduce feature dimension, but extend channels. """ core_size = 3 x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=1,padding='same')(x0) x=tf.keras.layers.BatchNormalization()(x) x=tf.keras.layers.Activation('relu')(x) x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=1,padding='same')(x) x=tf.keras.layers.BatchNormalization()(x) x=tf.keras.layers.Activation('relu')(x) x=tf.keras.layers.Conv1D(2*n, kernel_size=core_size,strides=1,padding='same')(x) x=tf.keras.layers.BatchNormalization()(x) ShortCut = tf.keras.layers.Conv1D(2*n,kernel_size=1,strides=1,padding='same')(x0) x = tf.keras.layers.Add()([x,ShortCut]) x=tf.keras.layers.Activation('relu')(x) return x def ResNet(self,x): """ Networks made by Blocks """ x = self.Block_ResNet(x,16) x = self.Block_ResNet(x,32) x = self.Block_ResNet(x,64) x = self.Block_ResNet(x,128) x = self.Block_ResNet(x,256) #x = self.Block_ResNet(x,512) #x = self.Block_ResNet(x,1024) x = tf.keras.layers.Flatten()(x) x = tf.keras.layers.Dense(1024, activation='relu')(x) x = tf.keras.layers.Dense(self.Output_channel,activation=None)(x) x0 = tf.keras.layers.Activation('softmax')(x[ : , 0: self.lable_dim]) x1 = x[ : , self.lable_dim : self.Output_channel] x = tf.keras.layers.Concatenate(axis=-1)([x0, x1]) return x def ResNet_test(self,x): """ Networks for testing """ x = self.Block_ResNet(x,16) x = self.Block_ResNet(x,32) x = self.Block_ResNet(x,64) x = self.Block_ResNet(x,128) x = self.Block_ResNet_2(x,256) x = self.Block_ResNet_2(x,512) x = self.Block_ResNet_2(x,1024) x = tf.keras.layers.Flatten()(x) x = tf.keras.layers.Dense(1024, activation='relu')(x) #x = tf.keras.layers.Dropout(0.4)(x) x = tf.keras.layers.Dense(self.Output_channel,activation=None)(x) x0 = tf.keras.layers.Activation('softmax')(x[ : , 0: self.lable_dim]) x1 = x[ : , self.lable_dim : self.Output_channel] x = tf.keras.layers.Concatenate(axis=-1)([x0, x1]) return x def Built_Model(self,test=False): """ Return the ResNet mdoels """ if test: model = tf.keras.Model(inputs=self.Inpt,outputs=self.ResNet_test(self.Inpt),name=self.Network_name) else: model = tf.keras.Model(inputs=self.Inpt,outputs=self.ResNet(self.Inpt),name=self.Network_name) model.summary() return model def Plot_Model(self,test=False): """ Plot the network architecture """ model = self.Built_Model(test) tf.keras.utils.plot_model(model,to_file=model.name+'.pdf',show_shapes=True,show_layer_names=False) def Data_Clip(self,label,redshift): """ Conevrt the label to one-hot code. Redshfit are set on a range. Contact them into a vector. """ # reshape the label and reshift array label = np.array(label) redshift = np.array(redshift) redshift = redshift.reshape(len(redshift),1) # convert to one-hot coded value = np.vectorize(self.class_names.get)(label) label = tf.keras.utils.to_categorical(value, num_classes=self.lable_dim) redshift = np.clip(redshift, self.redshift_range[0]-1, self.redshift_range[1]+1) redshift = tf.convert_to_tensor(redshift) vector = tf.concat([label,redshift],axis=-1) # a veter made by label and redshift concation return vector def Preprocess(self,flux): """ The input flux and label should be propocess """ #flux = flux - np.mean(flux,-1) flux = tf.keras.utils.normalize(flux,axis=-1) # flux/sum(flux**2)**0.5 # https://www.tensorflow.org/api_docs/python/tf/math/divide_no_nan # flux = tf.math.divide_no_nan(flux, np.max(flux,axis=-1).reshape(flux.shape[0],1)) # Norm to 0-1 # flux = -np.log10(np.maximum(flux,0)+1e-26) # flux = -np.log10(np.abs(flux)+1e-26) # flux = np.clip(flux,0,4) return flux def Loss_Func(self,y_true,y_pred): """ The loss function of this models. loss = absolute redshift error + label entroy """ # redshift_error Huber = tf.keras.losses.Huber(0.01) error = Huber(y_true[ : , self.lable_dim : self.Output_channel], y_pred[ : , self.lable_dim : self.Output_channel]) # entropy Cce = tf.keras.losses.CategoricalCrossentropy() crossentropy = Cce(y_true[ : , 0:self.lable_dim], y_pred[ : , 0:self.lable_dim]) #loss = crossentropy loss = error + crossentropy return loss def Train_Model(self,data,lr=1e-3,epo=40,test=False): """ Training the model. Input training data. """ batch = self.batch if os.path.exists(self.Network_name+'.h5'): model = tf.keras.models.load_model(self.Network_name+'.h5',custom_objects={'Loss_Func':self.Loss_Func,'Metric_Fun':Metric_Fun()}) print('loading the existed model') else: model = self.Built_Model(test) optimizer = tf.keras.optimizers.Adam(learning_rate=lr) # Adam model.compile(optimizer,loss=self.Loss_Func,metrics=Metric_Fun()) # complize model # https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/ModelCheckpoint checkPoint = tf.keras.callbacks.ModelCheckpoint(model.name+'.h5',monitor='val_Metric_Fun',mode='max',verbose=1,save_best_only=True,save_weights_only=False)# callback function csvLogger = tf.keras.callbacks.CSVLogger(model.name+'.csv',append=True) # save training history train_x, train_y = self.Preprocess(data['train']['flux']), self.Data_Clip(data['train']['label'],data['train']['redshift']) valid_x, valid_y = self.Preprocess(data['valid']['flux']), self.Data_Clip(data['valid']['label'],data['valid']['redshift']) model.fit(train_x,train_y,epochs=epo,batch_size=batch, validation_data=(valid_x,valid_y),callbacks=[checkPoint,csvLogger],shuffle=True) def Prodiction(self,flux,lamb=[]): """ Predition, classes and redshift. """ if len(lamb) != 0: flux = self.Interpolate_Flux(lamb,flux) model = tf.keras.models.load_model(self.Network_name+'.h5',custom_objects={'Loss_Func':self.Loss_Func,'Metric_Fun':Metric_Fun()}) flux = self.Preprocess(flux) pred = model.predict(flux) # give classes and reshift pred_label,pred_redshift = np.hsplit(pred, [self.lable_dim]) pred_label = np.argmax(pred_label,axis=-1) # turn one-hot to integer value, get the max value index dict = {v:k for k, v in self.class_names.items()} # reverse key and value of a dict pred_label = np.vectorize(dict.get)(pred_label) # turn integer value to its name return pred_label,pred_redshift class Spec_Checker(): def __init__(self): self.gasnet = GasNet3('test_net',Output_channel=5) def Show_spec(self,lamb,flux,name=''): """ show the detail of spectra after interpolated and preprocessed """ plt.figure(figsize=(16,6),dpi=160) int_flux = self.gasnet.Interpolate_Flux(lamb,flux) plt.subplot(2,1,1) plt.title(name + '-After interpolated') plt.plot(lamb,flux,linewidth=0.5,label='original flux') plt.plot(self.gasnet.Input_wavelength,int_flux,linewidth=0.5,label='interpolate flux') plt.legend() plt.subplot(2,1,2) plt.title(name + '-After preprocessed') plt.plot(lamb,self.gasnet.Preprocess(flux)[0],linewidth=0.5,label='original flux') plt.plot(self.gasnet.Input_wavelength,self.gasnet.Preprocess(int_flux)[0],linewidth=0.5,label='interpolate flux') plt.legend() def SDSS_spec(self,file,plot=True): """ load the spectra from SDSS files """ data = Table.read(file) flux, lamb = data['flux'], 10**data['loglam'] if plot: self.Show_spec(lamb,flux, name='SDSS:' + file.rsplit('/')[-1]) spec_info = Table.read(file,2) redshift, classes = spec_info['Z'][0], spec_info['CLASS'][0] return {'wavelength':lamb,'flux':flux,'redshift':redshift,'label':classes} def SDSS_spec_stack(self,num=0,plot=True): """ load the spectra of validation """ wavelength = self.gasnet.Input_wavelength data = Table.read('train_data/val.fits') flux,label,redshift = data['int_flux'],data['train_label'],data['Z'] wavelength = np.repeat([wavelength], len(flux), axis=0) if plot: self.Show_spec(wavelength[num],flux[num], name='validation:' + str(num)) return {'wavelength':wavelength,'flux':flux,'redshift':redshift,'label':label} def JK_spec(self,file): """ load the spectrum files from JK mock """ data = Table.read(file) flux, lamb = data['FLUX'][0], data['WAVE'][0] self.Show_spec(lamb,flux, name='JK:' + file.rsplit('/')[-1]) def npy_file(self,num=0,plot=True): """ load the spectrum files from qcp test data """ wavelength = np.load('./test_data/wavelengths.npy') flux = np.load('./test_data/data.npy') wavelength = np.repeat([wavelength], len(flux), axis=0) if plot: self.Show_spec(wavelength[num],flux[num], name='test npy:' + str(num)) label = np.load('./test_data/labels.npy') dict = {v:k for k, v in self.gasnet.class_names.items()} # reverse key and value of a dict label = np.vectorize(dict.get)(label) # turn integer value to its name return {'wavelength':wavelength,'flux':flux,'redshift':None,'label':label} def Luke_spec(self,num=0,plot=True): """ load the spectrum files from Luck mock """ spec_file = '../Luke_mock_spectra/Luke_spec.fits' # flux need multiply 1e17 data = Table.read(spec_file) wavelength = np.load('./test_data/wavelengths.npy') wavelength = np.repeat([wavelength], len(data), axis=0) flux = data['int_flux'] if plot: self.Show_spec(wavelength[num],flux[num], name='Luck :' + str(num)) return {'wavelength':wavelength,'flux':flux,'redshift':data['Redshift'],'label':data['train_label']} def JK_stack_spec(self,num=0,plot=True): """ load the spectrum files from Luck mock """ spec_file = './JK_stack_mock.fits' data = Table.read(spec_file) wavelength = Table.read('JK_mock_sample.fits')['WAVE'][0] wavelength = np.repeat([wavelength], len(data), axis=0) flux,label,redshift = data['FLUX'],data['train_type'],data['REDSHIFT'] if plot: self.Show_spec(wavelength[num],flux[num], name='JK--num--'+str(num)+'--label--'+str(label[num])+'--redshift--'+str(redshift[num])) return {'wavelength':wavelength,'flux':flux,'redshift':redshift,'label':label} def Svae_Figure(self,data,name='test'): """ plot a serial of spectra in one pdf file """ figfile = 'figure' if not os.path.exists(figfile): os.mkdir(figfile) fig, axes = plt.subplots(nrows=len(data['flux']),ncols=1,sharex=True,figsize=(8,2*len(data)),dpi=50) fig.suptitle(name) plt.xlabel('wavelength') plt.ylabel('flux') for i in range(len(data['flux'])): axe = axes[i] axe.plot(data['wavelength'][i],data['flux'][i],linewidth=0.5,label=data['label'][i]+' z='+str(data['redshift'][i])) axe.legend() fname = os.path.join(figfile,str(name)+'.pdf') plt.savefig(fname) plt.close() def Confusion_Matrix(self,pred,real): """ plot the confusion matrix """ data = {'Actual':np.array(real).flatten(),'Predicted':np.array(pred).flatten()} df = pd.DataFrame(data) plt.figure(figsize=(8,6),dpi=160) confusion_matrix = pd.crosstab(df['Actual'], df['Predicted'], rownames=['Actual'], colnames=['Predicted']) sns.heatmap(confusion_matrix,cmap="crest", annot=True) def One2One(self,pred,real,label): """ plot the redicted redshift vs. real """ data = {'pred_redshift':np.array(pred).flatten(), 'real_redshift':np.array(real).flatten(), 'label':np.array(label).flatten()} df = pd.DataFrame(data) # print(df.dtypes) df['real_redshift'] = df['real_redshift'].astype('float32') # https://seaborn.pydata.org/generated/seaborn.lmplot.html#seaborn.lmplot sns.lmplot(data=df, x='pred_redshift', y='real_redshift', hue='label',col='label', col_wrap=2, height=6, #plot size line_kws={"alpha":0.1}, #ci=None, #line style scatter_kws={"s":1,"alpha":1},sharex=False, sharey=False)