GaSNet3 / network.py
Fucheng's picture
Upload 2 files
b230261
raw
history blame
No virus
18.1 kB
import tensorflow as tf
import os
import numpy as np
# the package needed when use Spec_Checker class
try:
from astropy.table import Table
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
found = True
except ImportError:
found = False
print('error, pack not found####')
# https://www.tensorflow.org/guide/keras/train_and_evaluate?hl=zh-cn
class Metric_Fun(tf.keras.metrics.Metric):
"""
A customized metric.
metric = accraacy - mae.
The larger it is, the better.
The ideal value is 1.0, where acc=1 and mae=0.
"""
def __init__(self,name="Metric_Fun", **kwargs):
super(Metric_Fun,self).__init__(name=name, **kwargs)
self.evalue = self.add_weight('evalue', initializer='zeros')
# https://www.tensorflow.org/api_docs/python/tf/keras/metrics/BinaryAccuracy
self.acc = tf.keras.metrics.BinaryAccuracy()
# https://www.tensorflow.org/api_docs/python/tf/keras/metrics/MeanAbsoluteError
self.mae = tf.keras.metrics.MeanAbsoluteError()
def update_state(self, y_true, y_pred, sample_weight=None):
y_true = tf.cast(y_true,dtype=tf.float32)
y_pred = tf.cast(y_pred, dtype=tf.float32)
self.mae.update_state(y_true[:,4:5], y_pred[:,4:5])
abs_error = self.mae.result()
self.acc.update_state(y_true[ : , 0:4], y_pred[ : , 0:4])
accracy = self.acc.result()
#evalue = accracy
evalue = accracy - abs_error
self.evalue.assign(evalue)
def result(self):
return self.evalue
def reset_state(self):
# The state of the metric will be reset at the start of each epoch.
self.evalue.assign(0.0)
class GasNet3:
"""
Initialize, setting the input pixel, strat wavelength, end wavelength, and output channel
and the network name
"""
def __init__(self,Network_name,Output_channel):
#self.Input_pixel = 10000
#self.Start_wavelength = 4000
#self.End_wavelength = 9000
#self.Input_wavelength = np.linspace(self.Start_wavelength,self.End_wavelength,self.Input_pixel)
self.Network_name = Network_name
self.Input_wavelength = np.load('./test_data/wavelengths.npy')
self.Input_pixel = len(self.Input_wavelength)
self.Inpt = tf.keras.layers.Input(shape=(self.Input_pixel,1)) #shape of spectra
self.Output_channel = Output_channel
self.batch = 128 # training batch
self.redshift_range = [0,4]
self.class_names = {b'AGN':0,b'GALAXY':1,b'QSO':2,b'STAR':3}
self.lable_dim = len(self.class_names)
def Wavelength_Grid(self):
"""
Return the grid of input wavelength
"""
return self.Input_wavelength
def Interpolate_Flux(self,wavelength,flux):
"""
Interpolate the specturm flux into a suitable shape
"""
if flux.ndim != 1:
Int_flux = [np.interp(self.Input_wavelength,wavelength[i],flux[i]) for i in range(len(flux))]
Int_flux = np.array(Int_flux)
else:
Int_flux = np.interp(self.Input_wavelength,wavelength,flux)
return Int_flux
def Append_Noise_Sample(self):
"""
a extra blank noise will add during training
"""
pass
def Block_ResNet(self,x0,n):
"""
one ResNet Block, to reduce feature dimension
"""
core_size = 5
x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=2,padding='same')(x0)
x=tf.keras.layers.BatchNormalization()(x)
x=tf.keras.layers.Activation('relu')(x)
x=tf.keras.layers.Conv1D(2*n, kernel_size=core_size,padding='same')(x)
x=tf.keras.layers.BatchNormalization()(x)
ShortCut = tf.keras.layers.Conv1D(2*n,kernel_size=2,strides=2,padding='same')(x0)
x = tf.keras.layers.Add()([x,ShortCut])
x=tf.keras.layers.Activation('relu')(x)
x = tf.keras.layers.MaxPooling1D(pool_size=core_size,strides=2)(x)
return x
def Block_ResNet_2(self,x0,n):
"""
one ResNet Block, to not reduce feature dimension, but extend channels.
"""
core_size = 3
x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=1,padding='same')(x0)
x=tf.keras.layers.BatchNormalization()(x)
x=tf.keras.layers.Activation('relu')(x)
x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=1,padding='same')(x)
x=tf.keras.layers.BatchNormalization()(x)
x=tf.keras.layers.Activation('relu')(x)
x=tf.keras.layers.Conv1D(2*n, kernel_size=core_size,strides=1,padding='same')(x)
x=tf.keras.layers.BatchNormalization()(x)
ShortCut = tf.keras.layers.Conv1D(2*n,kernel_size=1,strides=1,padding='same')(x0)
x = tf.keras.layers.Add()([x,ShortCut])
x=tf.keras.layers.Activation('relu')(x)
return x
def ResNet(self,x):
"""
Networks made by Blocks
"""
x = self.Block_ResNet(x,16)
x = self.Block_ResNet(x,32)
x = self.Block_ResNet(x,64)
x = self.Block_ResNet(x,128)
x = self.Block_ResNet(x,256)
#x = self.Block_ResNet(x,512)
#x = self.Block_ResNet(x,1024)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(1024, activation='relu')(x)
x = tf.keras.layers.Dense(self.Output_channel,activation=None)(x)
x0 = tf.keras.layers.Activation('softmax')(x[ : , 0: self.lable_dim])
x1 = x[ : , self.lable_dim : self.Output_channel]
x = tf.keras.layers.Concatenate(axis=-1)([x0, x1])
return x
def ResNet_test(self,x):
"""
Networks for testing
"""
x = self.Block_ResNet(x,16)
x = self.Block_ResNet(x,32)
x = self.Block_ResNet(x,64)
x = self.Block_ResNet(x,128)
x = self.Block_ResNet_2(x,256)
x = self.Block_ResNet_2(x,512)
x = self.Block_ResNet_2(x,1024)
x = tf.keras.layers.Flatten()(x)
x = tf.keras.layers.Dense(1024, activation='relu')(x)
#x = tf.keras.layers.Dropout(0.4)(x)
x = tf.keras.layers.Dense(self.Output_channel,activation=None)(x)
x0 = tf.keras.layers.Activation('softmax')(x[ : , 0: self.lable_dim])
x1 = x[ : , self.lable_dim : self.Output_channel]
x = tf.keras.layers.Concatenate(axis=-1)([x0, x1])
return x
def Built_Model(self,test=False):
"""
Return the ResNet mdoels
"""
if test:
model = tf.keras.Model(inputs=self.Inpt,outputs=self.ResNet_test(self.Inpt),name=self.Network_name)
else:
model = tf.keras.Model(inputs=self.Inpt,outputs=self.ResNet(self.Inpt),name=self.Network_name)
model.summary()
return model
def Plot_Model(self,test=False):
"""
Plot the network architecture
"""
model = self.Built_Model(test)
tf.keras.utils.plot_model(model,to_file=model.name+'.pdf',show_shapes=True,show_layer_names=False)
def Data_Clip(self,label,redshift):
"""
Conevrt the label to one-hot code.
Redshfit are set on a range.
Contact them into a vector.
"""
# reshape the label and reshift array
label = np.array(label)
redshift = np.array(redshift)
redshift = redshift.reshape(len(redshift),1)
# convert to one-hot coded
value = np.vectorize(self.class_names.get)(label)
label = tf.keras.utils.to_categorical(value, num_classes=self.lable_dim)
redshift = np.clip(redshift, self.redshift_range[0]-1, self.redshift_range[1]+1)
redshift = tf.convert_to_tensor(redshift)
vector = tf.concat([label,redshift],axis=-1) # a veter made by label and redshift concation
return vector
def Preprocess(self,flux):
"""
The input flux and label should be propocess
"""
#flux = flux - np.mean(flux,-1)
flux = tf.keras.utils.normalize(flux,axis=-1) # flux/sum(flux**2)**0.5
# https://www.tensorflow.org/api_docs/python/tf/math/divide_no_nan
# flux = tf.math.divide_no_nan(flux, np.max(flux,axis=-1).reshape(flux.shape[0],1)) # Norm to 0-1
# flux = -np.log10(np.maximum(flux,0)+1e-26)
# flux = -np.log10(np.abs(flux)+1e-26)
# flux = np.clip(flux,0,4)
return flux
def Loss_Func(self,y_true,y_pred):
"""
The loss function of this models.
loss = absolute redshift error + label entroy
"""
# redshift_error
Huber = tf.keras.losses.Huber(0.01)
error = Huber(y_true[ : , self.lable_dim : self.Output_channel], y_pred[ : , self.lable_dim : self.Output_channel])
# entropy
Cce = tf.keras.losses.CategoricalCrossentropy()
crossentropy = Cce(y_true[ : , 0:self.lable_dim], y_pred[ : , 0:self.lable_dim])
#loss = crossentropy
loss = error + crossentropy
return loss
def Train_Model(self,data,lr=1e-3,epo=40,test=False):
"""
Training the model.
Input training data.
"""
batch = self.batch
if os.path.exists(self.Network_name+'.h5'):
model = tf.keras.models.load_model(self.Network_name+'.h5',custom_objects={'Loss_Func':self.Loss_Func,'Metric_Fun':Metric_Fun()})
print('loading the existed model')
else:
model = self.Built_Model(test)
optimizer = tf.keras.optimizers.Adam(learning_rate=lr) # Adam
model.compile(optimizer,loss=self.Loss_Func,metrics=Metric_Fun()) # complize model
# https://tensorflow.google.cn/api_docs/python/tf/keras/callbacks/ModelCheckpoint
checkPoint = tf.keras.callbacks.ModelCheckpoint(model.name+'.h5',monitor='val_Metric_Fun',mode='max',verbose=1,save_best_only=True,save_weights_only=False)# callback function
csvLogger = tf.keras.callbacks.CSVLogger(model.name+'.csv',append=True) # save training history
train_x, train_y = self.Preprocess(data['train']['flux']), self.Data_Clip(data['train']['label'],data['train']['redshift'])
valid_x, valid_y = self.Preprocess(data['valid']['flux']), self.Data_Clip(data['valid']['label'],data['valid']['redshift'])
model.fit(train_x,train_y,epochs=epo,batch_size=batch, validation_data=(valid_x,valid_y),callbacks=[checkPoint,csvLogger],shuffle=True)
def Prodiction(self,flux,lamb=[]):
"""
Predition, classes and redshift.
"""
if len(lamb) != 0:
flux = self.Interpolate_Flux(lamb,flux)
model = tf.keras.models.load_model(self.Network_name+'.h5',custom_objects={'Loss_Func':self.Loss_Func,'Metric_Fun':Metric_Fun()})
flux = self.Preprocess(flux)
pred = model.predict(flux) # give classes and reshift
pred_label,pred_redshift = np.hsplit(pred, [self.lable_dim])
pred_label = np.argmax(pred_label,axis=-1) # turn one-hot to integer value, get the max value index
dict = {v:k for k, v in self.class_names.items()} # reverse key and value of a dict
pred_label = np.vectorize(dict.get)(pred_label) # turn integer value to its name
return pred_label,pred_redshift
class Spec_Checker():
def __init__(self):
self.gasnet = GasNet3('test_net',Output_channel=5)
def Show_spec(self,lamb,flux,name=''):
"""
show the detail of spectra after interpolated and preprocessed
"""
plt.figure(figsize=(16,6),dpi=160)
int_flux = self.gasnet.Interpolate_Flux(lamb,flux)
plt.subplot(2,1,1)
plt.title(name + '-After interpolated')
plt.plot(lamb,flux,linewidth=0.5,label='original flux')
plt.plot(self.gasnet.Input_wavelength,int_flux,linewidth=0.5,label='interpolate flux')
plt.legend()
plt.subplot(2,1,2)
plt.title(name + '-After preprocessed')
plt.plot(lamb,self.gasnet.Preprocess(flux)[0],linewidth=0.5,label='original flux')
plt.plot(self.gasnet.Input_wavelength,self.gasnet.Preprocess(int_flux)[0],linewidth=0.5,label='interpolate flux')
plt.legend()
def SDSS_spec(self,file,plot=True):
"""
load the spectra from SDSS files
"""
data = Table.read(file)
flux, lamb = data['flux'], 10**data['loglam']
if plot:
self.Show_spec(lamb,flux, name='SDSS:' + file.rsplit('/')[-1])
spec_info = Table.read(file,2)
redshift, classes = spec_info['Z'][0], spec_info['CLASS'][0]
return {'wavelength':lamb,'flux':flux,'redshift':redshift,'label':classes}
def SDSS_spec_stack(self,num=0,plot=True):
"""
load the spectra of validation
"""
wavelength = self.gasnet.Input_wavelength
data = Table.read('train_data/val.fits')
flux,label,redshift = data['int_flux'],data['train_label'],data['Z']
wavelength = np.repeat([wavelength], len(flux), axis=0)
if plot:
self.Show_spec(wavelength[num],flux[num], name='validation:' + str(num))
return {'wavelength':wavelength,'flux':flux,'redshift':redshift,'label':label}
def JK_spec(self,file):
"""
load the spectrum files from JK mock
"""
data = Table.read(file)
flux, lamb = data['FLUX'][0], data['WAVE'][0]
self.Show_spec(lamb,flux, name='JK:' + file.rsplit('/')[-1])
def npy_file(self,num=0,plot=True):
"""
load the spectrum files from qcp test data
"""
wavelength = np.load('./test_data/wavelengths.npy')
flux = np.load('./test_data/data.npy')
wavelength = np.repeat([wavelength], len(flux), axis=0)
if plot:
self.Show_spec(wavelength[num],flux[num], name='test npy:' + str(num))
label = np.load('./test_data/labels.npy')
dict = {v:k for k, v in self.gasnet.class_names.items()} # reverse key and value of a dict
label = np.vectorize(dict.get)(label) # turn integer value to its name
return {'wavelength':wavelength,'flux':flux,'redshift':None,'label':label}
def Luke_spec(self,num=0,plot=True):
"""
load the spectrum files from Luck mock
"""
spec_file = '../Luke_mock_spectra/Luke_spec.fits' # flux need multiply 1e17
data = Table.read(spec_file)
wavelength = np.load('./test_data/wavelengths.npy')
wavelength = np.repeat([wavelength], len(data), axis=0)
flux = data['int_flux']
if plot:
self.Show_spec(wavelength[num],flux[num], name='Luck :' + str(num))
return {'wavelength':wavelength,'flux':flux,'redshift':data['Redshift'],'label':data['train_label']}
def JK_stack_spec(self,num=0,plot=True):
"""
load the spectrum files from Luck mock
"""
spec_file = './JK_stack_mock.fits'
data = Table.read(spec_file)
wavelength = Table.read('JK_mock_sample.fits')['WAVE'][0]
wavelength = np.repeat([wavelength], len(data), axis=0)
flux,label,redshift = data['FLUX'],data['train_type'],data['REDSHIFT']
if plot:
self.Show_spec(wavelength[num],flux[num], name='JK--num--'+str(num)+'--label--'+str(label[num])+'--redshift--'+str(redshift[num]))
return {'wavelength':wavelength,'flux':flux,'redshift':redshift,'label':label}
def Svae_Figure(self,data,name='test'):
"""
plot a serial of spectra in one pdf file
"""
figfile = 'figure'
if not os.path.exists(figfile):
os.mkdir(figfile)
fig, axes = plt.subplots(nrows=len(data['flux']),ncols=1,sharex=True,figsize=(8,2*len(data)),dpi=50)
fig.suptitle(name)
plt.xlabel('wavelength')
plt.ylabel('flux')
for i in range(len(data['flux'])):
axe = axes[i]
axe.plot(data['wavelength'][i],data['flux'][i],linewidth=0.5,label=data['label'][i]+' z='+str(data['redshift'][i]))
axe.legend()
fname = os.path.join(figfile,str(name)+'.pdf')
plt.savefig(fname)
plt.close()
def Confusion_Matrix(self,pred,real):
"""
plot the confusion matrix
"""
data = {'Actual':np.array(real).flatten(),'Predicted':np.array(pred).flatten()}
df = pd.DataFrame(data)
plt.figure(figsize=(8,6),dpi=160)
confusion_matrix = pd.crosstab(df['Actual'], df['Predicted'], rownames=['Actual'], colnames=['Predicted'])
sns.heatmap(confusion_matrix,cmap="crest", annot=True)
def One2One(self,pred,real,label):
"""
plot the redicted redshift vs. real
"""
data = {'pred_redshift':np.array(pred).flatten(),
'real_redshift':np.array(real).flatten(),
'label':np.array(label).flatten()}
df = pd.DataFrame(data)
# print(df.dtypes)
df['real_redshift'] = df['real_redshift'].astype('float32')
# https://seaborn.pydata.org/generated/seaborn.lmplot.html#seaborn.lmplot
sns.lmplot(data=df, x='pred_redshift', y='real_redshift', hue='label',col='label',
col_wrap=2, height=6, #plot size
line_kws={"alpha":0.1}, #ci=None, #line style
scatter_kws={"s":1,"alpha":1},sharex=False, sharey=False)