Spaces:
Runtime error
Runtime error
| # --- | |
| # jupyter: | |
| # jupytext: | |
| # text_representation: | |
| # extension: .py | |
| # format_name: light | |
| # format_version: '1.5' | |
| # jupytext_version: 1.16.2 | |
| # kernelspec: | |
| # display_name: temps | |
| # language: python | |
| # name: temps | |
| # --- | |
| # # DOMAIN ADAPTATION INTUITION | |
| # %load_ext autoreload | |
| # %autoreload 2 | |
| import pandas as pd | |
| import numpy as np | |
| import os | |
| from astropy.io import fits | |
| from astropy.table import Table | |
| import torch | |
| from pathlib import Path | |
| import seaborn as sns | |
| #matplotlib settings | |
| from matplotlib import rcParams | |
| import matplotlib.pyplot as plt | |
| rcParams["mathtext.fontset"] = "stix" | |
| rcParams["font.family"] = "STIXGeneral" | |
| from temps.archive import Archive | |
| from temps.utils import nmad | |
| from temps.temps_arch import EncoderPhotometry, MeasureZ | |
| from temps.temps import TempsModule | |
| # ## LOAD DATA | |
| #define here the directory containing the photometric catalogues | |
| parent_dir = Path('/data/astro/scratch/lcabayol/insight/data/Euclid_EXT_MER_PHZ_DC2_v1.5') | |
| modules_dir = Path('../data/models/') | |
| filename_calib = 'euclid_cosmos_DC2_S1_v2.1_calib_clean.fits' | |
| filename_valid = 'euclid_cosmos_DC2_S1_v2.1_valid_matched.fits' | |
| # + | |
| hdu_list = fits.open(parent_dir/filename_valid) | |
| cat = Table(hdu_list[1].data).to_pandas() | |
| cat = cat[cat['FLAG_PHOT']==0] | |
| cat = cat[cat['mu_class_L07']==1] | |
| cat['SNR_VIS'] = cat.FLUX_VIS / cat.FLUXERR_VIS | |
| #cat = cat[cat.SNR_VIS>10] | |
| # - | |
| ztarget = [cat['z_spec_S15'].values[ii] if cat['z_spec_S15'].values[ii]> 0 else cat['photo_z_L15'].values[ii] for ii in range(len(cat))] | |
| specz_or_photo = [0 if cat['z_spec_S15'].values[ii]> 0 else 1 for ii in range(len(cat))] | |
| ID = cat['ID'] | |
| VISmag = cat['MAG_VIS'] | |
| zsflag = cat['reliable_S15'] | |
| cat['ztarget']=ztarget | |
| cat['specz_or_photo']=specz_or_photo | |
| # ### EXTRACT PHOTOMETRY | |
| photoz_archive = Archive(path_calib = parent_dir/filename_calib, | |
| path_valid = parent_dir/filename_valid, | |
| only_zspec=False) | |
| f = photoz_archive._extract_fluxes(catalogue= cat) | |
| col = photoz_archive._to_colors(f) | |
| # ### MEASURE FEATURES | |
| features_all = np.zeros((3,len(cat),10)) | |
| for il, lab in enumerate(['z','L15','DA']): | |
| nn_features = EncoderPhotometry() | |
| nn_features.load_state_dict(torch.load(modules_dir/f'modelF_{lab}.pt',map_location=torch.device('cpu'))) | |
| features = nn_features(torch.Tensor(col)) | |
| features = features.detach().cpu().numpy() | |
| features_all[il]=features | |
| # ### TRAIN AUTOENCODER TO REDUCE TO 2 DIMENSIONS | |
| import torch | |
| from torch import nn | |
| class Autoencoder(nn.Module): | |
| def __init__(self, input_dim, latent_dim): | |
| super(Autoencoder, self).__init__() | |
| # Encoder layers | |
| self.encoder = nn.Sequential( | |
| nn.Linear(input_dim, 100), | |
| nn.ReLU(), | |
| nn.Linear(100, 50), | |
| nn.ReLU(), | |
| nn.Linear(50, latent_dim) | |
| ) | |
| # Decoder layers | |
| self.decoder = nn.Sequential( | |
| nn.Linear(latent_dim, 50), | |
| nn.ReLU(), | |
| nn.Linear(50, 100), | |
| nn.ReLU(), | |
| nn.Linear(100, input_dim), | |
| ) | |
| def forward(self, x): | |
| x = self.encoder(x) | |
| y = self.decoder(x) | |
| return y,x | |
| # + | |
| from torch.utils.data import DataLoader, dataset, TensorDataset | |
| ds =TensorDataset(torch.Tensor(features_all[0])) | |
| train_loader = DataLoader(ds, batch_size=100, shuffle=True, drop_last=False) | |
| # - | |
| import torch.optim as optim | |
| autoencoder = Autoencoder(input_dim=10, | |
| latent_dim=2) | |
| criterion = nn.L1Loss() | |
| optimizer = optim.Adam(autoencoder.parameters(), lr=0.0001) | |
| # + | |
| # Define the number of epochs | |
| num_epochs = 100 | |
| for epoch in range(num_epochs): | |
| running_loss = 0.0 | |
| for data in train_loader: # Assuming 'train_loader' is your DataLoader | |
| # Forward pass | |
| outputs,f1 = autoencoder(data[0]) | |
| loss_autoencoder = criterion(outputs, data[0]) | |
| optimizer.zero_grad() | |
| # Backward pass | |
| loss_autoencoder.backward() | |
| # Update the weights | |
| optimizer.step() | |
| # Accumulate the loss | |
| running_loss += loss_autoencoder.item() | |
| # Print the average loss for the epoch | |
| print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, running_loss / len(train_loader))) | |
| print('Training finished') | |
| # - | |
| # #### EVALUTATE AUTOENCODER | |
| # cat.to_csv('features_cat.csv', header=True, sep=',') | |
| indexes_specz = cat[(cat.specz_or_photo==0)&(cat.reliable_S15>0)].reset_index().index | |
| features_all_reduced = np.zeros(shape=(3,len(cat),2)) | |
| for i in range(3): | |
| _, features = autoencoder(torch.Tensor(features_all[i])) | |
| features_all_reduced[i] = features.detach().cpu().numpy() | |
| # ### Plot the features | |
| start = 0 | |
| end = len(cat) | |
| all_values = set(range(start, end)) | |
| values_not_in_indexes_specz = all_values - set(indexes_specz) | |
| indexes_nospecz = sorted(values_not_in_indexes_specz) | |
| # + | |
| # Create subplots with three panels | |
| fig, axs = plt.subplots(1, 3, figsize=(15, 5)) | |
| # Set style for all subplots | |
| sns.set_style("white") | |
| # First subplot | |
| sns.kdeplot(x=features_all_reduced[0, indexes_nospecz,0], | |
| y=features_all_reduced[0, indexes_nospecz,1], | |
| clip=(-150, 150), | |
| ax=axs[0], | |
| color='salmon') | |
| sns.kdeplot(x=features_all_reduced[0, indexes_specz,0], | |
| y=features_all_reduced[0, indexes_specz,1], | |
| clip=(-150, 150), | |
| ax=axs[0], | |
| color='lightskyblue') | |
| axs[0].set_xlim(-150, 150) | |
| axs[0].set_ylim(-150, 150) | |
| axs[0].set_title(r'Trained on $z_{\rm s}$') | |
| # Second subplot | |
| sns.kdeplot(x=features_all_reduced[1, indexes_nospecz, 0], | |
| y=features_all_reduced[1, indexes_nospecz, 1], | |
| clip=(-50, 50), | |
| ax=axs[1], | |
| color='salmon') | |
| sns.kdeplot(x=features_all_reduced[1, indexes_specz, 0], | |
| y=features_all_reduced[1, indexes_specz,1], | |
| clip=(-50, 50), | |
| ax=axs[1], | |
| color='lightskyblue') | |
| axs[1].set_xlim(-50, 50) | |
| axs[1].set_ylim(-50, 50) | |
| axs[1].set_title('Trained on L15') | |
| # Third subplot | |
| features_all_reduced_nospecz = pd.DataFrame(features_all_reduced[2, indexes_nospecz, :]).drop_duplicates().values | |
| sns.kdeplot(x=features_all_reduced[2, indexes_nospecz, 0], | |
| y=features_all_reduced[2, indexes_nospecz, 1], | |
| clip=(-1, 5), | |
| ax=axs[2], | |
| color='salmon', | |
| label='Wide-field sample') | |
| sns.kdeplot(x=features_all_reduced[2, indexes_specz, 0], | |
| y=features_all_reduced[2, indexes_specz,1], | |
| clip=(-1, 5), | |
| ax=axs[2], | |
| color='lightskyblue', | |
| label=r'$z_{\rm s}$ sample') | |
| axs[2].set_xlim(-2, 5) | |
| axs[2].set_ylim(-2, 5) | |
| axs[2].set_title('TEMPS') | |
| axs[0].set_xlabel('Feature 1') | |
| axs[1].set_xlabel('Feature 1') | |
| axs[2].set_xlabel('Feature 1') | |
| axs[0].set_ylabel('Feature 2') | |
| # Create custom legend with desired colors | |
| legend_labels = ['Wide-field sample', r'$z_{\rm s}$ sample'] | |
| legend_handles = [plt.Line2D([0], [0], color='salmon', lw=2), | |
| plt.Line2D([0], [0], color='lightskyblue', lw=2)] | |
| axs[2].legend(legend_handles, legend_labels, loc='upper right', fontsize=16) | |
| # Adjust layout | |
| plt.tight_layout() | |
| #plt.savefig('Contourplot.pdf', bbox_inches='tight') | |
| plt.show() | |
| # - | |