|
import tensorflow as tf |
|
import os |
|
import numpy as np |
|
|
|
|
|
try: |
|
from astropy.table import Table |
|
import matplotlib.pyplot as plt |
|
import numpy as np |
|
import pandas as pd |
|
import seaborn as sns |
|
found = True |
|
except ImportError: |
|
found = False |
|
print('error, pack not found####') |
|
|
|
|
|
|
|
class Metric_Fun(tf.keras.metrics.Metric): |
|
""" |
|
A customized metric. |
|
metric = accraacy - mae. |
|
The larger it is, the better. |
|
The ideal value is 1.0, where acc=1 and mae=0. |
|
""" |
|
def __init__(self,name="Metric_Fun", **kwargs): |
|
super(Metric_Fun,self).__init__(name=name, **kwargs) |
|
self.evalue = self.add_weight('evalue', initializer='zeros') |
|
|
|
self.acc = tf.keras.metrics.BinaryAccuracy() |
|
|
|
self.mae = tf.keras.metrics.MeanAbsoluteError() |
|
|
|
def update_state(self, y_true, y_pred, sample_weight=None): |
|
|
|
y_true = tf.cast(y_true,dtype=tf.float32) |
|
y_pred = tf.cast(y_pred, dtype=tf.float32) |
|
|
|
self.mae.update_state(y_true[:,4:5], y_pred[:,4:5]) |
|
abs_error = self.mae.result() |
|
|
|
self.acc.update_state(y_true[ : , 0:4], y_pred[ : , 0:4]) |
|
accracy = self.acc.result() |
|
|
|
|
|
evalue = accracy - abs_error |
|
self.evalue.assign(evalue) |
|
|
|
def result(self): |
|
return self.evalue |
|
|
|
def reset_state(self): |
|
|
|
self.evalue.assign(0.0) |
|
|
|
|
|
|
|
|
|
class GasNet3: |
|
""" |
|
Initialize, setting the input pixel, strat wavelength, end wavelength, and output channel |
|
and the network name |
|
""" |
|
def __init__(self,Network_name,Output_channel): |
|
|
|
|
|
|
|
|
|
|
|
self.Network_name = Network_name |
|
self.Input_wavelength = np.load('./test_data/wavelengths.npy') |
|
self.Input_pixel = len(self.Input_wavelength) |
|
self.Inpt = tf.keras.layers.Input(shape=(self.Input_pixel,1)) |
|
self.Output_channel = Output_channel |
|
self.batch = 128 |
|
self.redshift_range = [0,4] |
|
self.class_names = {b'AGN':0,b'GALAXY':1,b'QSO':2,b'STAR':3} |
|
self.lable_dim = len(self.class_names) |
|
|
|
def Wavelength_Grid(self): |
|
""" |
|
Return the grid of input wavelength |
|
""" |
|
return self.Input_wavelength |
|
|
|
def Interpolate_Flux(self,wavelength,flux): |
|
""" |
|
Interpolate the specturm flux into a suitable shape |
|
""" |
|
if flux.ndim != 1: |
|
Int_flux = [np.interp(self.Input_wavelength,wavelength[i],flux[i]) for i in range(len(flux))] |
|
Int_flux = np.array(Int_flux) |
|
else: |
|
Int_flux = np.interp(self.Input_wavelength,wavelength,flux) |
|
return Int_flux |
|
|
|
|
|
def Append_Noise_Sample(self): |
|
""" |
|
a extra blank noise will add during training |
|
""" |
|
pass |
|
|
|
|
|
def Block_ResNet(self,x0,n): |
|
""" |
|
one ResNet Block, to reduce feature dimension |
|
""" |
|
core_size = 5 |
|
x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=2,padding='same')(x0) |
|
x=tf.keras.layers.BatchNormalization()(x) |
|
x=tf.keras.layers.Activation('relu')(x) |
|
x=tf.keras.layers.Conv1D(2*n, kernel_size=core_size,padding='same')(x) |
|
x=tf.keras.layers.BatchNormalization()(x) |
|
ShortCut = tf.keras.layers.Conv1D(2*n,kernel_size=2,strides=2,padding='same')(x0) |
|
x = tf.keras.layers.Add()([x,ShortCut]) |
|
x=tf.keras.layers.Activation('relu')(x) |
|
x = tf.keras.layers.MaxPooling1D(pool_size=core_size,strides=2)(x) |
|
return x |
|
|
|
def Block_ResNet_2(self,x0,n): |
|
""" |
|
one ResNet Block, to not reduce feature dimension, but extend channels. |
|
""" |
|
core_size = 3 |
|
x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=1,padding='same')(x0) |
|
x=tf.keras.layers.BatchNormalization()(x) |
|
x=tf.keras.layers.Activation('relu')(x) |
|
x=tf.keras.layers.Conv1D(n,kernel_size=core_size,strides=1,padding='same')(x) |
|
x=tf.keras.layers.BatchNormalization()(x) |
|
x=tf.keras.layers.Activation('relu')(x) |
|
x=tf.keras.layers.Conv1D(2*n, kernel_size=core_size,strides=1,padding='same')(x) |
|
x=tf.keras.layers.BatchNormalization()(x) |
|
ShortCut = tf.keras.layers.Conv1D(2*n,kernel_size=1,strides=1,padding='same')(x0) |
|
x = tf.keras.layers.Add()([x,ShortCut]) |
|
x=tf.keras.layers.Activation('relu')(x) |
|
return x |
|
|
|
def ResNet(self,x): |
|
""" |
|
Networks made by Blocks |
|
""" |
|
x = self.Block_ResNet(x,16) |
|
x = self.Block_ResNet(x,32) |
|
x = self.Block_ResNet(x,64) |
|
x = self.Block_ResNet(x,128) |
|
x = self.Block_ResNet(x,256) |
|
|
|
|
|
x = tf.keras.layers.Flatten()(x) |
|
x = tf.keras.layers.Dense(1024, activation='relu')(x) |
|
x = tf.keras.layers.Dense(self.Output_channel,activation=None)(x) |
|
x0 = tf.keras.layers.Activation('softmax')(x[ : , 0: self.lable_dim]) |
|
x1 = x[ : , self.lable_dim : self.Output_channel] |
|
x = tf.keras.layers.Concatenate(axis=-1)([x0, x1]) |
|
return x |
|
|
|
def ResNet_test(self,x): |
|
""" |
|
Networks for testing |
|
""" |
|
x = self.Block_ResNet(x,16) |
|
x = self.Block_ResNet(x,32) |
|
x = self.Block_ResNet(x,64) |
|
x = self.Block_ResNet(x,128) |
|
x = self.Block_ResNet_2(x,256) |
|
x = self.Block_ResNet_2(x,512) |
|
x = self.Block_ResNet_2(x,1024) |
|
|
|
x = tf.keras.layers.Flatten()(x) |
|
x = tf.keras.layers.Dense(1024, activation='relu')(x) |
|
|
|
x = tf.keras.layers.Dense(self.Output_channel,activation=None)(x) |
|
x0 = tf.keras.layers.Activation('softmax')(x[ : , 0: self.lable_dim]) |
|
x1 = x[ : , self.lable_dim : self.Output_channel] |
|
x = tf.keras.layers.Concatenate(axis=-1)([x0, x1]) |
|
return x |
|
|
|
def Built_Model(self,test=False): |
|
""" |
|
Return the ResNet mdoels |
|
""" |
|
if test: |
|
model = tf.keras.Model(inputs=self.Inpt,outputs=self.ResNet_test(self.Inpt),name=self.Network_name) |
|
else: |
|
model = tf.keras.Model(inputs=self.Inpt,outputs=self.ResNet(self.Inpt),name=self.Network_name) |
|
model.summary() |
|
return model |
|
|
|
def Plot_Model(self,test=False): |
|
""" |
|
Plot the network architecture |
|
""" |
|
model = self.Built_Model(test) |
|
tf.keras.utils.plot_model(model,to_file=model.name+'.pdf',show_shapes=True,show_layer_names=False) |
|
|
|
def Data_Clip(self,label,redshift): |
|
""" |
|
Conevrt the label to one-hot code. |
|
Redshfit are set on a range. |
|
Contact them into a vector. |
|
""" |
|
|
|
label = np.array(label) |
|
redshift = np.array(redshift) |
|
redshift = redshift.reshape(len(redshift),1) |
|
|
|
value = np.vectorize(self.class_names.get)(label) |
|
label = tf.keras.utils.to_categorical(value, num_classes=self.lable_dim) |
|
redshift = np.clip(redshift, self.redshift_range[0]-1, self.redshift_range[1]+1) |
|
redshift = tf.convert_to_tensor(redshift) |
|
vector = tf.concat([label,redshift],axis=-1) |
|
return vector |
|
|
|
def Preprocess(self,flux): |
|
""" |
|
The input flux and label should be propocess |
|
""" |
|
|
|
flux = tf.keras.utils.normalize(flux,axis=-1) |
|
|
|
|
|
|
|
|
|
|
|
return flux |
|
|
|
def Loss_Func(self,y_true,y_pred): |
|
""" |
|
The loss function of this models. |
|
loss = absolute redshift error + label entroy |
|
""" |
|
|
|
Huber = tf.keras.losses.Huber(0.01) |
|
error = Huber(y_true[ : , self.lable_dim : self.Output_channel], y_pred[ : , self.lable_dim : self.Output_channel]) |
|
|
|
Cce = tf.keras.losses.CategoricalCrossentropy() |
|
crossentropy = Cce(y_true[ : , 0:self.lable_dim], y_pred[ : , 0:self.lable_dim]) |
|
|
|
loss = error + crossentropy |
|
return loss |
|
|
|
|
|
def Train_Model(self,data,lr=1e-3,epo=40,test=False): |
|
""" |
|
Training the model. |
|
Input training data. |
|
""" |
|
batch = self.batch |
|
if os.path.exists(self.Network_name+'.h5'): |
|
model = tf.keras.models.load_model(self.Network_name+'.h5',custom_objects={'Loss_Func':self.Loss_Func,'Metric_Fun':Metric_Fun()}) |
|
print('loading the existed model') |
|
else: |
|
model = self.Built_Model(test) |
|
optimizer = tf.keras.optimizers.Adam(learning_rate=lr) |
|
model.compile(optimizer,loss=self.Loss_Func,metrics=Metric_Fun()) |
|
|
|
checkPoint = tf.keras.callbacks.ModelCheckpoint(model.name+'.h5',monitor='val_Metric_Fun',mode='max',verbose=1,save_best_only=True,save_weights_only=False) |
|
csvLogger = tf.keras.callbacks.CSVLogger(model.name+'.csv',append=True) |
|
train_x, train_y = self.Preprocess(data['train']['flux']), self.Data_Clip(data['train']['label'],data['train']['redshift']) |
|
valid_x, valid_y = self.Preprocess(data['valid']['flux']), self.Data_Clip(data['valid']['label'],data['valid']['redshift']) |
|
model.fit(train_x,train_y,epochs=epo,batch_size=batch, validation_data=(valid_x,valid_y),callbacks=[checkPoint,csvLogger],shuffle=True) |
|
|
|
def Prodiction(self,flux,lamb=[]): |
|
""" |
|
Predition, classes and redshift. |
|
""" |
|
if len(lamb) != 0: |
|
flux = self.Interpolate_Flux(lamb,flux) |
|
model = tf.keras.models.load_model(self.Network_name+'.h5',custom_objects={'Loss_Func':self.Loss_Func,'Metric_Fun':Metric_Fun()}) |
|
flux = self.Preprocess(flux) |
|
pred = model.predict(flux) |
|
|
|
pred_label,pred_redshift = np.hsplit(pred, [self.lable_dim]) |
|
pred_label = np.argmax(pred_label,axis=-1) |
|
dict = {v:k for k, v in self.class_names.items()} |
|
pred_label = np.vectorize(dict.get)(pred_label) |
|
|
|
return pred_label,pred_redshift |
|
|
|
|
|
|
|
class Spec_Checker(): |
|
|
|
def __init__(self): |
|
self.gasnet = GasNet3('test_net',Output_channel=5) |
|
|
|
def Show_spec(self,lamb,flux,name=''): |
|
""" |
|
show the detail of spectra after interpolated and preprocessed |
|
""" |
|
plt.figure(figsize=(16,6),dpi=160) |
|
int_flux = self.gasnet.Interpolate_Flux(lamb,flux) |
|
|
|
plt.subplot(2,1,1) |
|
plt.title(name + '-After interpolated') |
|
plt.plot(lamb,flux,linewidth=0.5,label='original flux') |
|
plt.plot(self.gasnet.Input_wavelength,int_flux,linewidth=0.5,label='interpolate flux') |
|
plt.legend() |
|
|
|
plt.subplot(2,1,2) |
|
plt.title(name + '-After preprocessed') |
|
plt.plot(lamb,self.gasnet.Preprocess(flux)[0],linewidth=0.5,label='original flux') |
|
plt.plot(self.gasnet.Input_wavelength,self.gasnet.Preprocess(int_flux)[0],linewidth=0.5,label='interpolate flux') |
|
plt.legend() |
|
|
|
def SDSS_spec(self,file,plot=True): |
|
""" |
|
load the spectra from SDSS files |
|
""" |
|
data = Table.read(file) |
|
flux, lamb = data['flux'], 10**data['loglam'] |
|
if plot: |
|
self.Show_spec(lamb,flux, name='SDSS:' + file.rsplit('/')[-1]) |
|
spec_info = Table.read(file,2) |
|
redshift, classes = spec_info['Z'][0], spec_info['CLASS'][0] |
|
return {'wavelength':lamb,'flux':flux,'redshift':redshift,'label':classes} |
|
|
|
def SDSS_spec_stack(self,num=0,plot=True): |
|
""" |
|
load the spectra of validation |
|
""" |
|
wavelength = self.gasnet.Input_wavelength |
|
data = Table.read('train_data/val.fits') |
|
flux,label,redshift = data['int_flux'],data['train_label'],data['Z'] |
|
wavelength = np.repeat([wavelength], len(flux), axis=0) |
|
if plot: |
|
self.Show_spec(wavelength[num],flux[num], name='validation:' + str(num)) |
|
return {'wavelength':wavelength,'flux':flux,'redshift':redshift,'label':label} |
|
|
|
def JK_spec(self,file): |
|
""" |
|
load the spectrum files from JK mock |
|
""" |
|
data = Table.read(file) |
|
flux, lamb = data['FLUX'][0], data['WAVE'][0] |
|
self.Show_spec(lamb,flux, name='JK:' + file.rsplit('/')[-1]) |
|
|
|
def npy_file(self,num=0,plot=True): |
|
""" |
|
load the spectrum files from qcp test data |
|
""" |
|
wavelength = np.load('./test_data/wavelengths.npy') |
|
flux = np.load('./test_data/data.npy') |
|
wavelength = np.repeat([wavelength], len(flux), axis=0) |
|
if plot: |
|
self.Show_spec(wavelength[num],flux[num], name='test npy:' + str(num)) |
|
label = np.load('./test_data/labels.npy') |
|
dict = {v:k for k, v in self.gasnet.class_names.items()} |
|
label = np.vectorize(dict.get)(label) |
|
return {'wavelength':wavelength,'flux':flux,'redshift':None,'label':label} |
|
|
|
def Luke_spec(self,num=0,plot=True): |
|
""" |
|
load the spectrum files from Luck mock |
|
""" |
|
spec_file = '../Luke_mock_spectra/Luke_spec.fits' |
|
data = Table.read(spec_file) |
|
wavelength = np.load('./test_data/wavelengths.npy') |
|
wavelength = np.repeat([wavelength], len(data), axis=0) |
|
flux = data['int_flux'] |
|
if plot: |
|
self.Show_spec(wavelength[num],flux[num], name='Luck :' + str(num)) |
|
return {'wavelength':wavelength,'flux':flux,'redshift':data['Redshift'],'label':data['train_label']} |
|
|
|
def JK_stack_spec(self,num=0,plot=True): |
|
""" |
|
load the spectrum files from Luck mock |
|
""" |
|
spec_file = './JK_stack_mock.fits' |
|
data = Table.read(spec_file) |
|
wavelength = Table.read('JK_mock_sample.fits')['WAVE'][0] |
|
wavelength = np.repeat([wavelength], len(data), axis=0) |
|
flux,label,redshift = data['FLUX'],data['train_type'],data['REDSHIFT'] |
|
if plot: |
|
self.Show_spec(wavelength[num],flux[num], name='JK--num--'+str(num)+'--label--'+str(label[num])+'--redshift--'+str(redshift[num])) |
|
return {'wavelength':wavelength,'flux':flux,'redshift':redshift,'label':label} |
|
|
|
def Svae_Figure(self,data,name='test'): |
|
""" |
|
plot a serial of spectra in one pdf file |
|
""" |
|
figfile = 'figure' |
|
if not os.path.exists(figfile): |
|
os.mkdir(figfile) |
|
fig, axes = plt.subplots(nrows=len(data['flux']),ncols=1,sharex=True,figsize=(8,2*len(data)),dpi=50) |
|
fig.suptitle(name) |
|
plt.xlabel('wavelength') |
|
plt.ylabel('flux') |
|
for i in range(len(data['flux'])): |
|
axe = axes[i] |
|
axe.plot(data['wavelength'][i],data['flux'][i],linewidth=0.5,label=data['label'][i]+' z='+str(data['redshift'][i])) |
|
axe.legend() |
|
fname = os.path.join(figfile,str(name)+'.pdf') |
|
plt.savefig(fname) |
|
plt.close() |
|
|
|
def Confusion_Matrix(self,pred,real): |
|
""" |
|
plot the confusion matrix |
|
""" |
|
data = {'Actual':np.array(real).flatten(),'Predicted':np.array(pred).flatten()} |
|
df = pd.DataFrame(data) |
|
plt.figure(figsize=(8,6),dpi=160) |
|
confusion_matrix = pd.crosstab(df['Actual'], df['Predicted'], rownames=['Actual'], colnames=['Predicted']) |
|
sns.heatmap(confusion_matrix,cmap="crest", annot=True) |
|
|
|
def One2One(self,pred,real,label): |
|
""" |
|
plot the redicted redshift vs. real |
|
""" |
|
data = {'pred_redshift':np.array(pred).flatten(), |
|
'real_redshift':np.array(real).flatten(), |
|
'label':np.array(label).flatten()} |
|
df = pd.DataFrame(data) |
|
|
|
df['real_redshift'] = df['real_redshift'].astype('float32') |
|
|
|
sns.lmplot(data=df, x='pred_redshift', y='real_redshift', hue='label',col='label', |
|
col_wrap=2, height=6, |
|
line_kws={"alpha":0.1}, |
|
scatter_kws={"s":1,"alpha":1},sharex=False, sharey=False) |