Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python | |
| # coding: utf-8 | |
| import warnings | |
| import os | |
| import plotly as plt | |
| import seaborn as sb | |
| import plotly.express as px | |
| import panel as pn | |
| import holoviews as hv | |
| import hvplot.pandas | |
| import pandas as pd | |
| import numpy as np | |
| import json | |
| import matplotlib.pyplot as plt | |
| from bokeh.plotting import figure | |
| from bokeh.io import push_notebook, show | |
| from bokeh.io.export import export_png | |
| from bokeh.resources import INLINE | |
| from bokeh.embed import file_html | |
| from bokeh.io import curdoc | |
| from bokeh.models import Span, Label | |
| from bokeh.models import ColumnDataSource, Button | |
| from my_modules import * | |
| from datasets import load_dataset | |
| os.getcwd() | |
| #Silence FutureWarnings & UserWarnings | |
| warnings.filterwarnings('ignore', category= FutureWarning) | |
| warnings.filterwarnings('ignore', category= UserWarning) | |
| #present_dir = os.path.dirname(os.path.realpath(__file__)) | |
| #input_path = os.path.join(present_dir, 'wetransfer_data-zip_2024-05-17_1431') | |
| base_dir = '/code/wetransfer_data-zip_2024-05-17_1431' | |
| set_path = 'test' | |
| selected_metadata_files = ['Slide_B_DD1s1.one_1.tif.csv', 'Slide_B_DD1s1.one_2.tif.csv'] | |
| ls_samples = ['DD3S1.csv', 'DD3S2.csv', 'DD3S3.csv', 'TMA.csv'] | |
| pn.extension() | |
| update_button = pn.widgets.Button(name='CSV Files', button_type='primary') | |
| def update_samples(event): | |
| with open('stored_variables.json', 'r') as file: | |
| stored_vars = json.load(file) | |
| # ls_samples = stored_vars['ls_samples'] | |
| print(ls_samples) | |
| update_button.on_click(update_samples) | |
| csv_files_button = pn.widgets.Button(icon="clipboard", button_type="primary") | |
| indicator = pn.indicators.LoadingSpinner(value=False, size=25) | |
| def handle_click(clicks): | |
| with open('stored_variables.json', 'r') as file: | |
| stored_vars = json.load(file) | |
| # ls_samples = stored_vars['ls_samples'] | |
| return f'CSV Files Selected: {ls_samples}' | |
| pn.Row( | |
| csv_files_button, | |
| pn.bind(handle_click, csv_files_button.param.clicks), | |
| ) | |
| # ## I.2. *DIRECTORIES | |
| set_path = 'test' | |
| # Set base directory | |
| directorio_actual = os.getcwd() | |
| print(directorio_actual) | |
| ##### MAC WORKSTATION ##### | |
| #base_dir = r'/Volumes/LaboLabrie/Projets/OC_TMA_Pejovic/Temp/Zoe/CyCIF_pipeline/' | |
| ########################### | |
| ##### WINDOWS WORKSTATION ##### | |
| #base_dir = r'C:\Users\LaboLabrie\gerz2701\cyCIF-pipeline\Set_B' | |
| ############################### | |
| input_path = base_dir | |
| ##### LOCAL WORKSTATION ##### | |
| #base_dir = r'/Users/harshithakolipaka/Downloads/wetransfer_data-zip_2024-05-17_1431/' | |
| base_dir = input_path | |
| print(base_dir) | |
| ############################# | |
| #set_name = 'Set_A' | |
| #set_name = 'test' | |
| set_name = set_path | |
| project_name = set_name # Project name | |
| step_suffix = 'qc_eda' # Curent part (here part I) | |
| previous_step_suffix_long = "" # Previous part (here empty) | |
| # Initial input data directory | |
| input_data_dir = os.path.join(base_dir, project_name + "_data") | |
| # QC/EDA output directories | |
| # global output | |
| output_data_dir = os.path.join(base_dir, project_name + "_" + step_suffix) | |
| # images subdirectory | |
| output_images_dir = os.path.join(output_data_dir,"images") | |
| # Data and Metadata directories | |
| # global data | |
| metadata_dir = os.path.join(base_dir, project_name + "_metadata") | |
| # images subdirectory | |
| metadata_images_dir = os.path.join(metadata_dir,"images") | |
| # Create directories if they don't already exist | |
| for d in [base_dir, input_data_dir, output_data_dir, output_images_dir, metadata_dir, metadata_images_dir]: | |
| if not os.path.exists(d): | |
| print("Creation of the" , d, "directory...") | |
| os.makedirs(d) | |
| else : | |
| print("The", d, "directory already exists !") | |
| os.chdir(input_data_dir) | |
| with open('stored_variables.json', 'r') as file: | |
| stored_vars = json.load(file) | |
| # ls_samples = stored_vars['ls_samples'] | |
| selected_metadata_files = stored_vars['selected_metadata_files'] | |
| directories = [] | |
| for i in [base_dir, input_data_dir, output_data_dir, output_images_dir, metadata_dir, metadata_images_dir]: | |
| directories.append(i) | |
| directories | |
| def print_directories(directories): | |
| label_path = [] | |
| labels = [ | |
| "base_dir", | |
| "input_data_dir", | |
| "output_data_dir", | |
| "output_images_dir", | |
| "metadata_dir", | |
| "metadata_images_dir" | |
| ] | |
| for label, path in zip(labels, directories): | |
| label_path.append(f"{label} : {path}") | |
| return label_path | |
| print_directories | |
| # Verify paths | |
| print('base_dir :', base_dir) | |
| print('input_data_dir :', input_data_dir) | |
| print('output_data_dir :', output_data_dir) | |
| print('output_images_dir :', output_images_dir) | |
| print('metadata_dir :', metadata_dir) | |
| print('metadata_images_dir :', metadata_images_dir) | |
| # ## I.3. FILES | |
| # Listing all the .csv files in the metadata/data directory | |
| # Don't forget to move the csv files into the proj_data directory | |
| # if the data dir is empty it's not going to work | |
| #ls_samples = [sample for sample in os.listdir(input_data_dir) if sample.endswith(".csv")] | |
| print("The following CSV files were detected:\n\n",[sample for sample in ls_samples], "\n\nin", input_data_dir, "directory.") | |
| # In[26]: | |
| import os | |
| import pandas as pd | |
| def combine_and_save_metadata_files(metadata_dir, selected_metadata_files): | |
| if len(selected_metadata_files) == []: | |
| if not file: | |
| warnings.warn("No Ashlar file uploaded. Please upload a valid file.", UserWarning) | |
| return | |
| elif len(selected_metadata_files) > 1: | |
| combined_metadata_df = pd.DataFrame() | |
| for file in selected_metadata_files: | |
| file_path = os.path.join(metadata_dir, file) | |
| df = pd.read_csv(file_path) | |
| combined_metadata_df = pd.concat([combined_metadata_df, df], ignore_index=True) | |
| combined_metadata_df.to_csv(os.path.join(metadata_dir, "combined_metadata.csv"), index=False) | |
| print(f"Combined metadata file saved as 'combined_metadata.csv' in {metadata_dir}") | |
| return combined_metadata_df | |
| else: | |
| if selected_metadata_files: | |
| single_file_path = os.path.join(metadata_dir, selected_metadata_files[0]) | |
| single_file_df = pd.read_csv(single_file_path) | |
| print(f"Only one file selected: {selected_metadata_files[0]}") | |
| return single_file_df | |
| else: | |
| print("No metadata files selected.") | |
| return pd.DataFrame() | |
| # In[27]: | |
| print(combine_and_save_metadata_files(metadata_dir, selected_metadata_files)) | |
| # In[28]: | |
| ls_samples | |
| # In[29]: | |
| path = os.path.join(input_data_dir, ls_samples[0]) | |
| #df = load_dataset('csv', data_files = path ) | |
| df = pd.read_csv(os.path.join(input_data_dir, ls_samples[0]),index_col = 0, nrows = 1) | |
| df.head(10) | |
| # In[30]: | |
| # First gather information on expected headers using first file in ls_samples | |
| # Read in the first row of the file corresponding to the first sample (index = 0) in ls_samples | |
| df = pd.read_csv(os.path.join(input_data_dir, ls_samples[0]) , index_col = 0, nrows = 1) | |
| # Make sure the file was imported correctly | |
| print("df :\n", df.head(), "\n") | |
| print("df's columns :\n", df.columns, "\n") | |
| print("df's index :\n", df.index, "\n") | |
| print("df's index name :\n", df.index.name) | |
| # In[31]: | |
| df.head() | |
| # In[32]: | |
| # Verify that the ID column in input file became the index | |
| # Verify that the index name column is "ID", if not, rename it | |
| if df.index.name != "ID": | |
| print("Expected the first column in input file (index_col = 0) to be 'ID'. \n" | |
| "This column will be used to set the index names (cell number for each sample). \n" | |
| "It appears that the column '" + df.index.name + "' was actually the imported as the index column.") | |
| #df.index.name = 'ID' | |
| print("A new index name (first column) will be given ('ID') to replace the current one '" + df.index.name + "'\n") | |
| # Apply the changes to the headers as specified with apply_header_changes() function (in my_modules.py) | |
| # Apply the changes to the dataframe rows as specified with apply_df_changes() function (in my_modules.py) | |
| #df = apply_header_changes(df) | |
| print(df.index) | |
| df.index = df.index.str.replace(r'@1$', '') | |
| df = apply_df_changes(df) | |
| # Set variable to hold default header values | |
| expected_headers = df.columns.values | |
| expected_header = True | |
| print(expected_header) | |
| intial_dataframe = df | |
| # Make sure the file is now formated correctly | |
| print("\ndf :\n", df.head(), "\n") | |
| print("df's columns :\n", df.columns, "\n") | |
| print("df's index :\n", df.index, "\n") | |
| print("df's index name :\n", df.index.name) | |
| # In[33]: | |
| df.head() | |
| # In[34]: | |
| df.head() | |
| # In[35]: | |
| print("Used " + ls_samples[0] + " to determine the expected and corrected headers for all files.\n") | |
| print("These headers are: \n" + ", ".join([h for h in expected_headers])) | |
| corrected_headers = True | |
| # In[36]: | |
| for sample in ls_samples: | |
| file_path = os.path.join(input_data_dir,sample) | |
| print(file_path) | |
| # In[37]: | |
| # Import all the others files | |
| dfs = {} | |
| ############################### | |
| # !! This may take a while !! # | |
| ############################### | |
| errors = [] | |
| for sample in ls_samples: | |
| file_path = os.path.join(input_data_dir,sample) | |
| try: | |
| # Read the CSV file | |
| df = load_dataset("csv", data_files = file_path) | |
| df = pd.read_csv(file_path, index_col=0) | |
| # Check if the DataFrame is empty, if so, don't continue trying to process df and remove it | |
| if not df.empty: | |
| # Manipulations necessary for concatenation | |
| df = apply_header_changes(df) | |
| df = apply_df_changes(df) | |
| # Reorder the columns to match the expected headers list | |
| #df = df.reindex(columns=expected_headers) | |
| print(df.head(1)) | |
| print(sample, "file is processed !\n") | |
| #print(df) | |
| # Compare df's header df against what is expected | |
| compare_headers(expected_headers, df.columns.values, sample) | |
| #print(df.columns.values) | |
| # Add a new colunm to identify the csv file (sample) where the df comes from | |
| df['Sample_ID'] = sample | |
| except pd.errors.EmptyDataError: | |
| errors.append(f'\nEmpty data error in {sample} file. Removing from analysis...') | |
| print(f'\nEmpty data error in {sample} file. Removing from analysis...') | |
| ls_samples.remove(sample) | |
| # Add df to dfs | |
| dfs[sample] = df | |
| print(dfs) | |
| dfs.values() | |
| # Merge dfs into one df | |
| df = pd.concat(dfs.values(), ignore_index=False , sort = False) | |
| del dfs | |
| merge = True | |
| merged_dataframe = df | |
| df.head() | |
| # Set index to Sample_ID + cell number : | |
| # create a new custom index for df based on the sample names and integer cell numbers, and then remove the temporary columns 'level_0' and 'index' that were introduced during the operations | |
| # Creates a copy of the DataFrame df and resets its index without creating a new column for the old index | |
| # This essentially removes the old index column and replaces it with a default integer index | |
| df = df.copy().reset_index(drop=True) | |
| #print(df) | |
| # Initializing an empty list index to store the new index labels for the DataFrame | |
| index = [] | |
| for sample in ls_samples: | |
| # Extract a chunk of data from the original df where the 'Sample_ID' column matches the current sample name | |
| # This chunk is stored in the df_chunk df, which is a subset of the original data for that specific sample | |
| df_chunk = df.loc[df['Sample_ID'] == sample,:].copy() | |
| old_index = df_chunk.index | |
| # Reset the index of the df_chunk df, removing the old index and replacing it with a default integer index | |
| df_chunk = df_chunk.reset_index(drop=True) | |
| # A new index is created for the df_chunk df. It combines the sample name with 'Cell_' and the integer index values, converting them to strings | |
| # This new index will have labels like 'SampleName_Cell_0', 'SampleName_Cell_1', and so on. | |
| sample = sample.split('.')[0] | |
| df_chunk = df_chunk.set_index(f'{sample}_Cell_' + df_chunk.index.astype(str)) | |
| # The index values of df_chunk are then added to the index list | |
| index = index + df_chunk.index.values.tolist() | |
| # After processing all the samples in the loop, assign the index list as the new index of the original df. | |
| df.index = index | |
| # Remove the 'level_0' and 'index' columns from df | |
| df = df.loc[:,~df.columns.isin(['level_0','index'])] | |
| assigned_new_index = True | |
| df.head() | |
| # ### I.3.2. NOT_INTENSITIES | |
| # not_intensities is the list of the columns unrelated to the markers fluorescence intensities | |
| # Can include items that aren't in a given header. | |
| #not_intensitiehttp://localhost:8888/lab/tree/Downloads/wetransfer_data-zip_2024-05-17_1431/1_qc_eda.ipynb | |
| #I.3.2.-NOT_INTENSITIESs = ['Nuc_X', 'Nuc_X_Inv', 'Nuc_Y', 'Nuc_Y_Inv', 'Nucleus_Roundness', 'Nucleus_Size', 'Cell_Size', | |
| # 'ROI_index', 'Sample_ID', 'replicate_ID', 'Cell_ID','cell_type', 'cell_subtype', 'cluster','ID', | |
| # 'Cytoplasm_Size', 'immune_checkpoint', 'Unique_ROI_index', 'Patient', 'Primary_chem(1)_vs_surg(0)'] | |
| # not_intensities is the list of the columns unrelated to the markers fluorescence intensities | |
| # Can include items that aren't in a given header. | |
| #not_intensities = ['Nuc_X', 'Nuc_X_Inv', 'Nuc_Y', 'Nuc_Y_Inv', 'Nucleus_Roundness', 'Nucleus_Size', 'Cell_Size', | |
| # 'ROI_index', 'Sample_ID', 'replicate_ID', 'Cell_ID','cell_type', 'cell_subtype', 'cluster','ID', | |
| # 'Cytoplasm_Size', 'immune_checkpoint', 'Unique_ROI_index', 'Patient', 'Primary_chem(1)_vs_surg(0)'] | |
| # Get all column names | |
| all_columns = df.columns.tolist() | |
| # Create a list to store non-intensity column names | |
| not_intensities = [] | |
| intensity_columns = [] | |
| # Iterate over each column name | |
| for column in all_columns: | |
| # Check if the column name contains 'Intensity_Average' | |
| if 'Intensity_Average' not in column: | |
| print(not_intensities) | |
| not_intensities.append(column) | |
| else: | |
| intensity_columns.append(column) | |
| # Create a new DataFrame with non-intensity columns | |
| not_intensities_df = pd.DataFrame(not_intensities) | |
| print("Non-intensity columns:") | |
| print(not_intensities) | |
| print("non-intensity DataFrame:") | |
| not_intensities | |
| #print(len(intensity_columns)) | |
| pd.DataFrame(not_intensities) | |
| path_not_intensities = os.path.join(metadata_dir,"not_intensities.csv") | |
| # If this file already exists, add only not_intensities items of the list not already present in file | |
| if os.path.exists(path_not_intensities): | |
| print("'not_intensities.csv' already exists.") | |
| print("Reconciling file and Jupyter notebook lists.") | |
| file_not_intensities = open(path_not_intensities, "r") | |
| file_ni = file_not_intensities.read().splitlines() | |
| # Set difference to identify items not already in file | |
| to_add = set(not_intensities) - set(file_ni) | |
| # We want not_intensities to the a complete list | |
| not_intensities = list(set(file_ni) | set(not_intensities)) | |
| file_not_intensities.close() | |
| file_not_intensities = open(path_not_intensities, "a") | |
| for item in to_add: | |
| file_not_intensities.write(item +"\n") | |
| file_not_intensities.close() | |
| else: | |
| # The file does not yet exist | |
| print("Could not find " + path_not_intensities + ". Creating now.") | |
| file_not_intensities = open(path_not_intensities, "w") | |
| for item in not_intensities: | |
| file_not_intensities.write(item + "\n") | |
| file_not_intensities.close() | |
| # In[46]: | |
| not_intensities_df = pd.read_csv(path_not_intensities) | |
| not_intensities_df | |
| # In[47]: | |
| # Columns we want to keep: not_intensities, and any intensity column that contains 'Intensity_Average' (drop any intensity marker column that is not a mean intensity) | |
| to_keep = not_intensities + [x for x in df.columns.values[~df.columns.isin(not_intensities)] if 'Intensity_Average' in x] | |
| to_keep | |
| # In[48]: | |
| print(len(to_keep) - 1) | |
| # In[49]: | |
| # However, our to_keep list contains items that might not be in our df headers! | |
| # These items are from our not_intensities list. So let's ask for only those items from to_keep that are actually found in our df | |
| # Retains only the columns from the to_keep list that are found in the df's headers (columns). | |
| # This ensures that we are only keeping the columns that exist in your df, avoiding any potential issues with non-existent column names. | |
| # The result is a df containing only the specified columns. | |
| df = df[[x for x in to_keep if x in df.columns.values]] | |
| df.head() | |
| # In[50]: | |
| import pandas as pd | |
| # Assuming you have a DataFrame named 'df' | |
| # df = pd.read_csv('your_file.csv') | |
| # Get all column names | |
| all_columns = df.columns.tolist() | |
| # Create an empty list to store intensity markers | |
| intensity_marker = [] | |
| # Iterate over each column name | |
| for column in all_columns: | |
| # Check if the column name contains 'Intensity_Average' | |
| if 'Intensity_Average' in column: | |
| # Split the column name by underscore | |
| parts = column.split('_') | |
| # Extract the word before the first underscore | |
| marker = parts[0] | |
| # Add the marker to the intensity_marker list | |
| intensity_marker.append(marker) | |
| # Remove duplicates from the intensity_marker list | |
| intensity_marker = list(set(intensity_marker)) | |
| print("Intensity Markers:") | |
| print(intensity_marker) | |
| # Create a callback function to update the intensities array | |
| def update_intensities(event): | |
| global intensities | |
| global intensities_df | |
| new_intensities = [] | |
| selected_columns = [] | |
| for marker, cell, cytoplasm, nucleus in zip(marker_options_df['Marker'], marker_options_df['Cell'], marker_options_df['Cytoplasm'], marker_options_df['Nucleus']): | |
| if cell: | |
| new_intensities.append(f"{marker}_Cell_Intensity_Average") | |
| selected_columns.append(f"{marker}_Cell_Intensity_Average") | |
| if cytoplasm: | |
| new_intensities.append(f"{marker}_Cytoplasm_Intensity_Average") | |
| selected_columns.append(f"{marker}_Cytoplasm_Intensity_Average") | |
| if nucleus: | |
| new_intensities.append(f"{marker}_Nucleus_Intensity_Average") | |
| selected_columns.append(f"{marker}_Nucleus_Intensity_Average") | |
| intensities = new_intensities | |
| if selected_columns: | |
| intensities_df = merged_dataframe[selected_columns] | |
| else: | |
| intensities_df = pd.DataFrame() | |
| print("Updated intensities DataFrame:") | |
| print(intensities_df) | |
| # In[54]: | |
| tabulator_formatters = { | |
| 'bool': {'type': 'tickCross'} | |
| } | |
| # Create a DataFrame with the intensity markers and default values | |
| marker_options_df = pd.DataFrame({ | |
| 'Marker': intensity_marker, | |
| 'Cell': [False] * len(intensity_marker), | |
| 'Cytoplasm': [False] * len(intensity_marker), | |
| 'Nucleus': [False] * len(intensity_marker) | |
| }) | |
| # Create the Tabulator widget and link the callback function | |
| tabulator = pn.widgets.Tabulator(marker_options_df, formatters=tabulator_formatters, sizing_mode='stretch_width') | |
| tabulator.param.watch(update_intensities,'value') | |
| # Create a Panel layout with the Tabulator widget | |
| marker_options_layout = pn.Column(tabulator, sizing_mode="stretch_width") | |
| import panel as pn | |
| import pandas as pd | |
| import random | |
| import asyncio | |
| # Initialize the Panel extension with Tabulator | |
| pn.extension('tabulator') | |
| # Create a DataFrame with the intensity markers and default values | |
| marker_options_df = pd.DataFrame({ | |
| 'Marker': intensity_marker, | |
| 'Cell': [True] * len(intensity_marker), | |
| 'Cytoplasm': [False] * len(intensity_marker), | |
| 'Nucleus': [False] * len(intensity_marker) | |
| }) | |
| # Define formatters for the Tabulator widget | |
| tabulator_formatters = { | |
| 'Cell': {'type': 'tickCross'}, | |
| 'Cytoplasm': {'type': 'tickCross'}, | |
| 'Nucleus': {'type': 'tickCross'} | |
| } | |
| # Create the Tabulator widget | |
| tabulator = pn.widgets.Tabulator(marker_options_df, formatters=tabulator_formatters, sizing_mode='stretch_width') | |
| # Create a DataFrame to store the initial intensities | |
| new_data = [{'Description': f"{marker}_Cell_Intensity_Average"} for marker in intensity_marker if True] | |
| new_data_df = pd.DataFrame(new_data) | |
| # Create a widget to display the new data as a DataFrame | |
| new_data_table = pn.widgets.Tabulator(new_data_df, name='New Data Table', sizing_mode='stretch_width') | |
| # Create a button to start the update process | |
| run_button = pn.widgets.Button(name="Save Selection", button_type='primary') | |
| # Define the update_intensities function | |
| def update_intensities(): | |
| global new_data, new_data_df | |
| new_data = [] | |
| for _, row in tabulator.value.iterrows(): | |
| marker = row['Marker'] | |
| if row['Cell']: | |
| new_data.append({'Description': f"{marker}_Cell_Intensity_Average"}) | |
| if row['Cytoplasm']: | |
| new_data.append({'Description': f"{marker}_Cytoplasm_Intensity_Average"}) | |
| if row['Nucleus']: | |
| new_data.append({'Description': f"{marker}_Nucleus_Intensity_Average"}) | |
| new_data_df = pd.DataFrame(new_data) | |
| new_data_table.value = new_data_df | |
| # Define the runner function | |
| async def runner(event): | |
| update_intensities() | |
| # Bind the runner function to the button | |
| run_button.on_click(runner) | |
| # Layout | |
| updated_intensities = pn.Column(tabulator, run_button, new_data_table, sizing_mode="stretch_width") | |
| pn.extension() | |
| # Serve the layout | |
| #updated_intensities.servable() | |
| intensities_df = new_data_table | |
| intensities_df | |
| intensities_df = pn.pane.DataFrame(intensities_df) | |
| intensities_df | |
| print(intensities_df) | |
| # ## I.4. QC CHECKS | |
| def quality_check_results(check_shape, check_no_null,check_zero_intensities): | |
| results = [ | |
| f"Check Index: {check_index}", | |
| f"Check Shape: {check_shape}", | |
| f"Check No Null: {check_no_null}", | |
| f"Check Zero Intensities: {check_zero_intensities}" | |
| ] | |
| return pn.Column(*[pn.Row(result) for result in results], sizing_mode="stretch_width") | |
| print(ls_samples) | |
| def check_index_format(index_str, ls_samples): | |
| """ | |
| Checks if the given index string follows the specified format. | |
| Args: | |
| index_str (str): The index string to be checked. | |
| ls_samples (list): A list of valid sample names. | |
| Returns: | |
| bool: True if the index string follows the format, False otherwise. | |
| """ | |
| # Split the index string into parts | |
| parts = index_str.split('_') | |
| # Check if there are exactly 3 parts | |
| if len(parts) != 3: | |
| print(len(parts)) | |
| return False | |
| # Check if the first part is in ls_samples | |
| sample_name = parts[0] | |
| if f'{sample_name}.csv' not in ls_samples: | |
| print(sample_name) | |
| return False | |
| # Check if the second part is in ['cell', 'cytoplasm', 'nucleus'] | |
| location = parts[1] | |
| valid_locations = ['Cell', 'Cytoplasm', 'Nucleus'] | |
| if location not in valid_locations: | |
| print(location) | |
| return False | |
| # Check if the third part is a number | |
| try: | |
| index = int(parts[2]) | |
| except ValueError: | |
| print(index) | |
| return False | |
| # If all checks pass, return True | |
| return True | |
| # In[70]: | |
| # Let's take a look at a few features to make sure our dataframe is as expected | |
| df.index | |
| def check_format_ofindex(index): | |
| for index in df.index: | |
| check_index = check_index_format(index, ls_samples) | |
| if check_index is False: | |
| index_format = "Bad" | |
| return index_format | |
| index_format = "Good" | |
| return index_format | |
| print(check_format_ofindex(df.index)) | |
| # In[71]: | |
| df.shape | |
| check_index = df.index | |
| check_shape = df.shape | |
| print(check_shape) | |
| # In[72]: | |
| # Check for NaN entries (should not be any unless columns do not align) | |
| # False means no NaN entries | |
| # True means NaN entries | |
| df.isnull().any().any() | |
| check_no_null = df.isnull().any().any() | |
| # In[73]: | |
| # Check that all expected files were imported into final dataframe | |
| if sorted(df.Sample_ID.unique()) == sorted(ls_samples): | |
| print("All expected filenames are present in big df Sample_ID column.") | |
| check_all_expected_files_present = "All expected filenames are present in big df Sample_ID column." | |
| else: | |
| compare_headers(['no samples'], df.Sample_ID.unique(), "big df Sample_ID column") | |
| check_all_expected_files_present = compare_headers(['no samples'], df.Sample_ID.unique(), "big df Sample_ID column") | |
| print(df.Sample_ID) | |
| # In[74]: | |
| # Delete rows that have 0 value mean intensities for intensity columns | |
| print("df.shape before removing 0 mean values: ", df.shape) | |
| # We use the apply method on df to calculate the mean intensity for each row. It's done this by applying a lambda function to each row. | |
| # The lambda function excludes the columns listed in the not_intensities list (which are not to be considered for mean intensity calculations) | |
| # and calculates the mean of the remaining values in each row. | |
| ############################### | |
| # !! This may take a while !! # | |
| ############################### | |
| # Calculate mean intensity excluding 'not_intensities' columns | |
| mean_intensity = df.loc[:, ~df.columns.isin(not_intensities)].mean(axis=1) | |
| # Check if there are any 0 mean intensity values | |
| if (mean_intensity == 0).any(): | |
| df = df.loc[mean_intensity > 0, :] | |
| print("Shape after removing 0 mean values: ", df.shape) | |
| check_zero_intensities = f'df.shape after removing 0 mean values: {df.shape}' | |
| else: | |
| print("No zero intensity values.") | |
| check_zero_intensities = " No zero intensity values found in the DataFrame." | |
| # Get quantiles (5th, 50th, 95th) | |
| # List of nucleus size percentiles to extract | |
| #qs = [0.05,0.50,0.95] | |
| #df["Nucleus_Size"].quantile(q=qs) | |
| quality_control_df = df | |
| quality_control_df.head() | |
| # Function to perform quality checks | |
| def perform_quality_checks(df, ls_samples, not_intensities): | |
| results = {} | |
| errors = [] | |
| # Check index | |
| results['index'] = df.index | |
| # Check shape | |
| results['shape'] = df.shape | |
| # Check for NaN entries | |
| results['nan_entries'] = df.isnull().any().any() | |
| # Remove rows with 0 mean intensity values | |
| mean_intensity = df.loc[:, ~df.columns.isin(not_intensities)].mean(axis=1) | |
| if (mean_intensity == 0).any(): | |
| df = df.loc[mean_intensity > 0, :] | |
| results['zero_intensity_removal'] = f"Zero intensity entires are found and removed. Shape after removing: {df.shape}" | |
| else: | |
| results['zero_intensity_removal'] = "No zero intensity values found in the DataFrame." | |
| return results | |
| # Example usage of the function | |
| quality_check_results = perform_quality_checks(df, ls_samples, not_intensities) | |
| # Print results | |
| for key, value in quality_check_results.items(): | |
| print(f"{key}: {value}") | |
| # In[80]: | |
| import panel as pn | |
| import pandas as pd | |
| def quality_check(file, not_intensities): | |
| # Load the output file | |
| df = file | |
| # Check Index | |
| check_index = check_format_ofindex(df.index) | |
| # Check Shape | |
| check_shape = df.shape | |
| # Check for NaN entries | |
| check_no_null = df.isnull().any().any() | |
| mean_intensity = df.loc[:, ~df.columns.isin(not_intensities)].mean(axis=1) | |
| if (mean_intensity == 0).any(): | |
| df = df.loc[mean_intensity > 0, :] | |
| print("df.shape after removing 0 mean values: ", df.shape) | |
| check_zero_intensities = f'df.shape after removing 0 mean values: {df.shape}' | |
| else: | |
| print("No zero intensity values found in the DataFrame.") | |
| check_zero_intensities = "No zero intensities." | |
| # Create a quality check results table | |
| quality_check_results_table = pd.DataFrame({ | |
| 'Check': ['Index', 'Shape', 'Check for NaN Entries', 'Check for Zero Intensities'], | |
| 'Result': [str(check_index), str(check_shape), str(check_no_null), check_zero_intensities] | |
| }) | |
| # Create a quality check results component | |
| quality_check_results_component = pn.Card( | |
| pn.pane.DataFrame(quality_check_results_table), | |
| title="Quality Control Results", | |
| header_background="#2196f3", | |
| header_color="white", | |
| ) | |
| return quality_check_results_component | |
| quantile_slider = pn.widgets.FloatSlider(name='Quantile', start=0.01, end=0.99, step=0.01, value=0.05) | |
| # Function to calculate quantile values | |
| def calculate_quantiles(quantile): | |
| quantile_value_intensity = df["AF555_Cell_Intensity_Average"].quantile(q=[quantile, 0.50, 1 - quantile]) | |
| return quantile_value_intensity | |
| # Function to create the Panel app | |
| def create_app(quantile = quantile_slider.param.value): | |
| quantiles = calculate_quantiles(quantile) | |
| output = pd.DataFrame(quantiles) | |
| # Create a Markdown widget to display the output | |
| output_widget = pn.pane.DataFrame(output) | |
| return output_widget | |
| # Bind the create_app function to the quantile slider | |
| quantile_output_app = pn.bind(create_app, quantile_slider.param.value) | |
| #pn.Column(quantile_slider,quantile_output_app).servable() | |
| # Function to create the line graph plot using Bokeh | |
| def create_line_graph2(quantile): | |
| # Calculate histogram | |
| hist, edges = np.histogram(df['Nucleus_Size'], bins=30) | |
| # Calculate the midpoints of bins for plotting | |
| midpoints = (edges[:-1] + edges[1:]) / 2 | |
| # Calculate quantiles | |
| qs = [quantile, 0.50, 1.00 - quantile] | |
| quantiles = df['Nucleus_Size'].quantile(q=qs).values | |
| # Create Bokeh line graph plot | |
| p = figure(title='Frequency vs. Nucleus_Size', | |
| x_axis_label='Nucleus_Size', | |
| y_axis_label='Frequency', | |
| width=800, height=400) | |
| # Plotting histogram | |
| p.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:], | |
| fill_color='skyblue', line_color='black', alpha=0.6) | |
| # Plotting line graph | |
| p.line(midpoints, hist, line_width=2, color='blue', alpha=0.7) | |
| # Add quantile lines | |
| for q in quantiles: | |
| span = Span(location=q, dimension='height', line_color='red', line_dash='dashed', line_width=2) | |
| p.add_layout(span) | |
| p.add_layout(Label(x=q, y=max(hist), text=f'{q:.1f}', text_color='red')) | |
| return p | |
| # Bind the create_line_graph function to the quantile slider | |
| nucleus_size_line_graph_with_histogram = pn.bind(create_line_graph2, quantile=quantile_slider.param.value) | |
| # Clean the 'Nucleus_Size' column by removing NaN and infinite values | |
| df = df[np.isfinite(df['Nucleus_Size'])] # This will keep only finite values | |
| # Check if the DataFrame is not empty after cleaning | |
| if df.empty: | |
| raise ValueError("No valid data available after cleaning.") | |
| else: | |
| # Calculate the histogram | |
| hist, edges = np.histogram(df['Nucleus_Size'], bins=30) | |
| print("Histogram calculated successfully.") | |
| print("Histogram:", hist) | |
| print("Edges:", edges) | |
| plot1 = pn.Column(quantile_slider, pn.pane.Bokeh(nucleus_size_line_graph_with_histogram)) | |
| #Removing cells based on nucleus size | |
| quantile = quantile_slider.value | |
| qs = [quantile, 0.50, 1.00 - quantile] | |
| quantiles = df['Nucleus_Size'].quantile(q=qs).values | |
| threshold = quantiles[2] | |
| # In[89]: | |
| print(threshold) | |
| # In[90]: | |
| import panel as pn | |
| import pandas as pd | |
| import numpy as np | |
| from bokeh.plotting import figure | |
| from bokeh.models import Span, Label | |
| # Define the quantile slider | |
| #quantile_slider = pn.widgets.FloatSlider(name='Quantile', start=0.01, end=0.99, step=0.01, value=0.05) | |
| # Function to update the threshold and display number of cells removed | |
| def update_threshold_and_display(quantile): | |
| qs = [quantile, 0.50, 1.00 - quantile] | |
| quantiles = df['Nucleus_Size'].quantile(q=qs).values | |
| threshold = quantiles[2] | |
| # Filter the DataFrame based on the new threshold | |
| df_filtered = df.loc[(df['Nucleus_Size'] > 42) & (df['Nucleus_Size'] < threshold)] | |
| # Calculate the number of cells removed | |
| cells_before_filter = df.shape[0] | |
| cells_after_filter = df_filtered.shape[0] | |
| cells_removed = cells_before_filter - cells_after_filter | |
| # Display the results | |
| results = pn.Column( | |
| f"Number of cells before filtering: {cells_before_filter}", | |
| f"Number of cells after filtering on nucleus size: {cells_after_filter}", | |
| f"Number of cells removed: {cells_removed}" | |
| ) | |
| return results | |
| # Bind the update function to the quantile slider | |
| results_display = pn.bind(update_threshold_and_display, quantile_slider) | |
| # Layout the components in a Panel app | |
| layout2 = results_display | |
| # In[91]: | |
| print("Number of cells before filtering :", df.shape[0]) | |
| cells_before_filter = f"Number of cells before filtering :{df.shape[0]}" | |
| # Delete small cells and objects w/high AF555 Signal (RBCs) | |
| # We usually use the 95th percentile calculated during QC_EDA | |
| df = df.loc[(df['Nucleus_Size'] > 42 )] | |
| df = df.loc[(df['Nucleus_Size'] < threshold)] | |
| cells_after_filter_nucleus_shape = df.shape[0] | |
| print("Number of cells after filtering on nucleus size:", df.shape[0]) | |
| df = df.loc[(df['AF555_Cell_Intensity_Average'] < 2000)] | |
| print("Number of cells after filtering on AF555A ___ intensity:", df.shape[0]) | |
| cells_after_filter_intensity_shape = df.shape[0] | |
| cells_after_filter_nucleus = f"Number of cells after filtering on nucleus size: {cells_after_filter_nucleus_shape}" | |
| cells_after_filter_intensity = f"Number of cells after filtering on AF555A ___ intensity: {cells_after_filter_intensity_shape}" | |
| num_of_cell_removal_intensity = cells_after_filter_intensity | |
| print(num_of_cell_removal_intensity ) | |
| num_of_cell_removal = pn.Column(cells_before_filter, cells_after_filter_nucleus) | |
| # Assuming you have a DataFrame 'df' with the intensity columns | |
| intensities = df.filter(like='Intensity').columns.tolist() | |
| # Create a ColumnDataSource from the DataFrame | |
| source = ColumnDataSource(df) | |
| # Function to calculate quantile values | |
| def calculate_quantiles(column, quantile): | |
| quantiles = df[column].quantile(q=[quantile, 0.50, 1 - quantile]).values | |
| return quantiles | |
| # Create the dropdown menu | |
| column_dropdown = pn.widgets.Select(name='Select Column', options=intensities) | |
| quantile_slider = pn.widgets.FloatSlider(name='Quantile', start=0.01, end=0.99, step=0.01, value=0.05) | |
| # Function to create the Bokeh plot | |
| def create_intensity_plot(column, quantile): | |
| quantiles = calculate_quantiles(column, quantile) | |
| hist, edges = np.histogram(df[column], bins = 30) | |
| # Calculate the midpoints of bins for plotting | |
| midpoints = (edges[:-1] + edges[1:]) / 2 | |
| # Create Bokeh plot | |
| p = figure(title=f'Distribution of {column} with Quantiles', | |
| x_axis_label=f'{column} Values', | |
| y_axis_label='Frequency', | |
| width=800, height=400) | |
| p.quad(top=hist, bottom=0, left=edges[:-1], right= edges[1:], | |
| fill_color='skyblue', line_color='black', alpha=0.7) | |
| # Plotting line graph | |
| p.line(midpoints, hist, line_width=2, color='blue', alpha=0.7) | |
| # Add quantile lines | |
| for q in quantiles: | |
| span = Span(location=q, dimension='height', line_color='red', line_dash='dashed', line_width=2) | |
| p.add_layout(span) | |
| p.add_layout(Label(x=q, y=max(hist), text=f'{q:.1f}', text_color='red')) | |
| return p | |
| # Bind the create_plot function to the quantile slider, column dropdown, and button click | |
| marker_intensity_with_histogram = pn.bind(create_intensity_plot,column_dropdown.param.value, quantile_slider.param.value, watch=True) | |
| # Create the button | |
| generate_plot_button = Button(label='Generate Plot', button_type='primary') | |
| def update_plot(column, quantile): | |
| plot = create_intensity_plot(column, quantile) | |
| plot.renderers[0].data_source = source # Update the data source for the renderer | |
| return plot | |
| #Display the dropdown menu, quantile slider, button, and plot | |
| #plot = update_plot(column_dropdown.param.value, quantile_slider.param.value) | |
| def generate_plot(event): | |
| updated_plot = update_plot(column_dropdown.param.value, quantile_slider.param.value) | |
| #pn.Column(pn.Row(column_dropdown, generate_plot_button), quantile_slider, updated_plot).servable() | |
| generate_plot_button.on_click(generate_plot) | |
| selected_marker_plot = pn.Column(pn.Row(pn.Column(column_dropdown, marker_intensity_with_histogram ))) | |
| #pn.Column(pn.Row(pn.Column(column_dropdown, marker_intensity_with_histogram ), generate_plot_button)).servable() | |
| import panel as pn | |
| import numpy as np | |
| import pandas as pd | |
| from bokeh.plotting import figure | |
| from bokeh.models import ColumnDataSource, Button, Span, Label | |
| # Assuming you have a DataFrame 'df' with the intensity columns | |
| intensities = df.filter(like='Intensity').columns.tolist() | |
| # Create a ColumnDataSource from the DataFrame | |
| source = ColumnDataSource(df) | |
| # Function to calculate quantile values | |
| def calculate_quantiles(column, quantile): | |
| quantiles = df[column].quantile(q=[quantile, 0.50, 1 - quantile]) | |
| return quantiles | |
| # In[105]: | |
| quantile_slider = pn.widgets.FloatSlider(name='Quantile', start=0.01, end=0.99, step=0.01, value=0.05) | |
| # Bind the create_line_graph function to the quantile slider | |
| #nucleus_size_line_graph = pn.bind(create_line_graph, quantile=quantile_slider.param.value) | |
| # Layout the components in a Panel app | |
| #nucleus_size_graph = pn.Column(nucleus_size_line_graph) | |
| # In[106]: | |
| #df["CKs_Cytoplasm_Intensity_Average"].quantile(q=qs) | |
| # In[107]: | |
| len(intensities) | |
| if 'CKs_Cytoplasm_Intensity_Average' in intensities: | |
| print(1) | |
| # In[108]: | |
| df | |
| # In[109]: | |
| def calculate_cytoplasm_quantiles(column, quantile): | |
| # Print the columns of the DataFrame | |
| print("DataFrame columns:", df.columns) | |
| # Check if the column exists in the DataFrame | |
| if column not in df.columns: | |
| raise KeyError(f"Column '{column}' does not exist in the DataFrame.") | |
| quantiles = df[column].quantile(q=[quantile, 0.50, 1 - quantile]) | |
| return quantiles | |
| def create_cytoplasm_intensity_df(column, quantile): | |
| quantiles = calculate_cytoplasm_quantiles(column, quantile) | |
| output = pd.DataFrame(quantiles) | |
| return pn.pane.DataFrame(output) | |
| # Bind the create_app function to the quantile slider | |
| cytoplasm_quantile_output_app = pn.bind(create_cytoplasm_intensity_df, column='CKs_Cytoplasm_Intensity_Average', quantile=quantile_slider.param.value) | |
| pn.Column(quantile_slider, cytoplasm_quantile_output_app) | |
| # In[110]: | |
| def calculate_cytoplasm_quantiles(column, quantile): | |
| quantiles = df[column].quantile(q=[quantile, 0.50, 1 - quantile]) | |
| return quantiles | |
| def create_cytoplasm_intensity_df(column, quantile): | |
| quantiles = calculate_cytoplasm_quantiles(column, quantile) | |
| output = pd.DataFrame(quantiles) | |
| # Create a Dataframe widget to display the output | |
| output_widget = pn.pane.DataFrame(output) | |
| return output_widget | |
| # Bind the create_app function to the quantile slider | |
| cytoplasm_quantile_output_app = pn.bind(create_cytoplasm_intensity_df, column='CKs_Cytoplasm_Intensity_Average', quantile = quantile_slider.param.value) | |
| pn.Column(quantile_slider,cytoplasm_quantile_output_app) | |
| # ## I.5. COLUMNS OF INTERESTS | |
| # In[111]: | |
| # Remove columns containing "DAPI" | |
| df = df[[x for x in df.columns.values if 'DAPI' not in x]] | |
| print("Columns are now...") | |
| print([c for c in df.columns.values]) | |
| # In[112]: | |
| # Create lists of full names and shortened names to use in plotting | |
| full_to_short_names, short_to_full_names = \ | |
| shorten_feature_names(df.columns.values[~df.columns.isin(not_intensities)]) | |
| short_to_full_names | |
| # In[113]: | |
| # Save this data to a metadata file | |
| filename = os.path.join(metadata_dir, "full_to_short_column_names.csv") | |
| fh = open(filename, "w") | |
| fh.write("full_name,short_name\n") | |
| for k,v in full_to_short_names.items(): | |
| fh.write(k + "," + v + "\n") | |
| fh.close() | |
| print("The full_to_short_column_names.csv file was created !") | |
| # In[114]: | |
| # Save this data to a metadata file | |
| filename = os.path.join(metadata_dir, "short_to_full_column_names.csv") | |
| fh = open(filename, "w") | |
| fh.write("short_name,full_name\n") | |
| for k,v in short_to_full_names.items(): | |
| fh.write(k + "," + v + "\n") | |
| fh.close() | |
| print("The short_to_full_column_names.csv file was created !") | |
| # ## I.6. EXPOSURE TIME | |
| # In[115]: | |
| #import the ashlar analysis file | |
| file_path = os.path.join(metadata_dir, 'combined_metadata.csv') | |
| ashlar_analysis = pd.read_csv(file_path) | |
| ashlar_analysis | |
| # In[116]: | |
| # Extracting and renaming columns | |
| new_df = ashlar_analysis[['Name', 'Cycle', 'ChannelIndex', 'ExposureTime']].copy() | |
| new_df.rename(columns={ | |
| 'Name': 'Target', | |
| 'Cycle': 'Round', | |
| 'ChannelIndex': 'Channel' | |
| }, inplace=True) | |
| # Applying suffixes to the columns | |
| new_df['Round'] = 'R' + new_df['Round'].astype(str) | |
| new_df['Channel'] = 'c' + new_df['Channel'].astype(str) | |
| # Save to CSV | |
| new_df.to_csv('Ashlar_Exposure_Time.csv', index=False) | |
| # Print the new dataframe | |
| print(new_df) | |
| # In[117]: | |
| # Here, we want to end up with a data structure that incorporates metadata on each intensity marker column used in our big dataframe in an easy-to-use format. | |
| # This is going to include the full name of the intensity marker columns in the big data frame, | |
| # the corresponding round and channel, | |
| # the target protein (e.g., CD45), | |
| # and the segmentation localization information (cell, cytoplasm, nucleus) | |
| # We can use this data structure to assign unique colors to all channels and rounds, for example, for use in later visualizations | |
| # Exposure_time file from ASHLAR analysis | |
| filename = "Exposure_Time.csv" | |
| filename = os.path.join(metadata_dir, filename) | |
| exp_df = pd.read_csv(filename) | |
| print(exp_df) | |
| # Verify file imported correctly | |
| # File length | |
| print("df's shape: ", exp_df.shape) | |
| # Headers | |
| expected_headers =['Round','Target','Exp','Channel'] | |
| compare_headers(expected_headers, exp_df.columns.values, "Imported metadata file") | |
| # Missingness | |
| if exp_df.isnull().any().any(): | |
| print("\nexp_df has null value(s) in row(s):") | |
| print(exp_df[exp_df.isna().any(axis=1)]) | |
| else: | |
| print("\nNo null values detected.") | |
| # In[118]: | |
| if len(exp_df['Target']) > len(exp_df['Target'].unique()): | |
| print("One or more non-unique Target values in exp_df. Currently not supported.") | |
| exp_df = exp_df.drop_duplicates(subset = 'Target').reindex() | |
| # In[119]: | |
| # sort exp_df by the values in the 'Target' column in ascending order and then retrieve the first few rows of the sorted df | |
| exp_df.sort_values(by = ['Target']).head() | |
| # In[120]: | |
| # Create lowercase version of target | |
| exp_df['target_lower'] = exp_df['Target'].str.lower() | |
| exp_df.head() | |
| # In[121]: | |
| # Create df that contains marker intensity columns in our df that aren't in not_intensities | |
| intensities = pd.DataFrame({'full_column':df.columns.values[~df.columns.isin(not_intensities)]}) | |
| intensities | |
| # In[122]: | |
| # Extract the marker information from the `full_column`, which corresponds to full column in big dataframe | |
| # Use regular expressions (regex) to isolate the part of the field that begins (^) with an alphanumeric value (W), and ends with an underscore (_) | |
| # '$' is end of line | |
| intensities['marker'] = intensities['full_column'].str.extract(r'([^\W_]+)') | |
| # convert to lowercase | |
| intensities['marker_lower'] = intensities['marker'].str.lower() | |
| intensities | |
| # In[123]: | |
| # Subset the intensities df to exclude any column pertaining to DAPI | |
| intensities = intensities.loc[intensities['marker_lower'] != 'dapi'] | |
| intensities.head() | |
| # In[124]: | |
| # Merge the intensities andexp_df together to create metadata | |
| metadata = pd.merge(exp_df, intensities, how = 'left', left_on = 'target_lower',right_on = 'marker_lower') | |
| metadata = metadata.drop(columns = ['marker_lower']) | |
| metadata = metadata.dropna() | |
| # Target is the capitalization from the Exposure_Time.csv | |
| # target_lower is Target in small caps | |
| # marker is the extracted first component of the full column in segmentation data, with corresponding capitalization | |
| metadata | |
| # In[125]: | |
| # Add a column to signify marker target localisation. | |
| # Use a lambda to determine segmented location of intensity marker column and update metadata accordingly | |
| # Using the add_metadata_location() function in my_modules.py | |
| metadata['localisation'] = metadata.apply( | |
| lambda row: add_metadata_location(row), axis = 1) | |
| # In[126]: | |
| mlid = metadata | |
| # In[127]: | |
| # Save this data structure to the metadata folder | |
| # don't want to add color in because that's better off treating color the same for round, channel, and sample | |
| filename = "marker_intensity_metadata.csv" | |
| filename = os.path.join(metadata_dir, filename) | |
| metadata.to_csv(filename, index = False) | |
| print("The marker_intensity_metadata.csv file was created !") | |
| # ## I.7. COLORS WORKFLOW | |
| # ### I.7.1. CHANNELS COLORS | |
| # we want colors that are categorical, since Channel is a non-ordered category (yes, they are numbered, but arbitrarily). | |
| # A categorical color palette will have dissimilar colors. | |
| # Get those unique colors | |
| if len(metadata.Channel.unique()) > 10: | |
| print("WARNING: There are more unique channel values than \ | |
| there are colors to choose from. Select different palette, e.g., \ | |
| continuous palette 'husl'.") | |
| channel_color_values = sb.color_palette("bright",n_colors = len(metadata.Channel.unique())) | |
| # chose 'colorblind' because it is categorical and we're unlikely to have > 10 | |
| # You can customize the colors for each channel here | |
| custom_colors = { | |
| 'c2': 'lightgreen', | |
| 'c3': 'tomato', | |
| 'c4': 'pink', | |
| 'c5': 'turquoise' | |
| } | |
| custom_colors_values = sb.palplot(sb.color_palette([custom_colors.get(ch, 'blue') for ch in metadata.Channel.unique()])) | |
| # Display those unique customs colors | |
| print("Unique channels are:", metadata.Channel.unique()) | |
| sb.palplot(sb.color_palette(channel_color_values)) | |
| # In[131]: | |
| # Function to create a palette plot with custom colors | |
| def create_palette_plot(): | |
| # Get unique channels | |
| unique_channels = metadata.Channel.unique() | |
| # Define custom colors for each channel | |
| custom_colors = { | |
| 'c2': 'lightgreen', | |
| 'c3': 'tomato', | |
| 'c4': 'pink', | |
| 'c5': 'turquoise' | |
| } | |
| # Get custom colors for each channel | |
| colors = [custom_colors.get(ch, 'blue') for ch in unique_channels] | |
| # Create a palette plot (palplot) | |
| palette_plot = sb.palplot(sb.color_palette(colors)) | |
| channel_color_values = sb.color_palette("bright",n_colors = len(metadata.Channel.unique())) | |
| channel_color_values = sb.palplot(channel_color_values) | |
| return palette_plot, channel_color_values | |
| # Create the palette plot directly | |
| palette_plot = create_palette_plot() | |
| # Define the Panel app layout | |
| app_palette_plot = pn.Column( | |
| pn.pane.Markdown("### Custom Color Palette"), | |
| palette_plot, | |
| ) | |
| # Function to create a palette plot with custom colors | |
| def create_palette_plot(custom_colors): | |
| # Get unique channels | |
| unique_channels = metadata.Channel.unique() | |
| # Get custom colors for each channel | |
| colors = [custom_colors.get(ch, 'blue') for ch in unique_channels] | |
| # Create a palette plot (palplot) | |
| palette_plot = sb.palplot(sb.color_palette(colors)) | |
| return palette_plot | |
| # Define custom colors for each channel | |
| custom_colors = { | |
| 'c2': 'lightgreen', | |
| 'c3': 'tomato', | |
| 'c4': 'pink', | |
| 'c5': 'turquoise' | |
| } | |
| # Display those unique customs colo | |
| print("Unique channels are:", metadata.Channel.unique()) | |
| # Function to bind create_palette_plot | |
| app_palette_plot = create_palette_plot(custom_colors) | |
| #app_palette_plot.servable() | |
| # In[133]: | |
| # Store in a dictionary | |
| channel_color_dict = dict(zip(metadata.Channel.unique(), channel_color_values)) | |
| channel_color_dict | |
| for k,v in channel_color_dict.items(): | |
| channel_color_dict[k] = np.float64(v) | |
| channel_color_dict | |
| # In[134]: | |
| color_df_channel = color_dict_to_df(channel_color_dict, "Channel") | |
| # Save to file in metadatadirectory | |
| filename = "channel_color_data.csv" | |
| filename = os.path.join(metadata_dir, filename) | |
| color_df_channel.to_csv(filename, index = False) | |
| color_df_channel | |
| # In[135]: | |
| # Legend of channel info only | |
| g = plt.figure(figsize = (1,1)).add_subplot(111) | |
| g.axis('off') | |
| handles = [] | |
| for item in channel_color_dict.keys(): | |
| h = g.bar(0,0, color = channel_color_dict[item], | |
| label = item, linewidth =0) | |
| handles.append(h) | |
| first_legend = plt.legend(handles=handles, loc='upper right', title = 'Channel'), | |
| # box_to_anchor=(10,10), | |
| # bbox_transform=plt.gcf().transFigure) | |
| filename = "Channel_legend.png" | |
| filename = os.path.join(metadata_images_dir, filename) | |
| plt.savefig(filename, bbox_inches = 'tight') | |
| # ### I.7.2. ROUNDS COLORS | |
| # we want colors that are sequential, since Round is an ordered category. | |
| # We can still generate colors that are easy to distinguish. Also, many of the categorical palettes cap at at about 10 or so unique colors, and repeat from there. | |
| # We do not want any repeats! | |
| round_color_values = sb.cubehelix_palette( | |
| len(metadata.Round.unique()), start=1, rot= -0.75, dark=0.19, light=.85, reverse=True) | |
| # round_color_values = sb.color_palette("cubehelix",n_colors = len(metadata.Round.unique())) | |
| # chose 'cubehelix' because it is sequential, and round is a continuous process | |
| # each color value is a tuple of three values: (R, G, B) | |
| print(metadata.Round.unique()) | |
| sb.palplot(sb.color_palette(round_color_values)) | |
| ## TO-DO: write what these parameters mean | |
| # In[137]: | |
| # Store in a dictionary | |
| round_color_dict = dict(zip(metadata.Round.unique(), round_color_values)) | |
| for k,v in round_color_dict.items(): | |
| round_color_dict[k] = np.float64(v) | |
| round_color_dict | |
| # In[138]: | |
| color_df_round = color_dict_to_df(round_color_dict, "Round") | |
| # Save to file in metadatadirectory | |
| filename = "round_color_data.csv" | |
| filename = os.path.join(metadata_dir, filename) | |
| color_df_round.to_csv(filename, index = False) | |
| color_df_round | |
| # Legend of round info only | |
| round_legend = plt.figure(figsize = (1,1)).add_subplot(111) | |
| round_legend.axis('off') | |
| handles = [] | |
| for item in round_color_dict.keys(): | |
| h = round_legend.bar(0,0, color = round_color_dict[item], | |
| label = item, linewidth =0) | |
| handles.append(h) | |
| first_legend = plt.legend(handles=handles, loc='upper right', title = 'Round'), | |
| # bbox_to_anchor=(10,10), | |
| # bbox_transform=plt.gcf().transFigure) | |
| filename = "Round_legend.png" | |
| filename = os.path.join(metadata_images_dir, filename) | |
| plt.savefig(filename, bbox_inches = 'tight') | |
| # ### I.7.3. SAMPLES COLORS | |
| # In[140]: | |
| # we want colors that are neither sequential nor categorical. | |
| # Categorical would be ideal if we could generate an arbitrary number of colors, but I do not think that we can. | |
| # Hense, we will choose `n` colors from a continuous palette. First we will generate the right number of colors. Later, we will assign TMA samples to gray. | |
| # Get those unique colors | |
| color_values = sb.color_palette("husl",n_colors = len(ls_samples))#'HLS' | |
| # each color value is a tuple of three values: (R, G, B) | |
| # Display those unique colors | |
| sb.palplot(sb.color_palette(color_values)) | |
| # In[141]: | |
| TMA_samples = [s for s in df.Sample_ID.unique() if 'TMA' in s] | |
| TMA_color_values = sb.color_palette(n_colors = len(TMA_samples),palette = "gray") | |
| sb.palplot(sb.color_palette(TMA_color_values)) | |
| # In[142]: | |
| # Store in a dictionary | |
| color_dict = dict() | |
| color_dict = dict(zip(df.Sample_ID.unique(), color_values)) | |
| # Replace all TMA samples' colors with gray | |
| i = 0 | |
| for key in color_dict.keys(): | |
| if 'TMA' in key: | |
| color_dict[key] = TMA_color_values[i] | |
| i +=1 | |
| color_dict | |
| color_df_sample = color_dict_to_df(color_dict, "Sample_ID") | |
| # Save to file in metadatadirectory | |
| filename = "sample_color_data.csv" | |
| filename = os.path.join(metadata_dir, filename) | |
| color_df_sample.to_csv(filename, index = False) | |
| color_df_sample | |
| # Legend of sample info only | |
| g = plt.figure(figsize = (1,1)).add_subplot(111) | |
| g.axis('off') | |
| handles = [] | |
| for item in color_dict.keys(): | |
| h = g.bar(0,0, color = color_dict[item], | |
| label = item, linewidth =0) | |
| handles.append(h) | |
| first_legend = plt.legend(handles=handles, loc='upper right', title = 'Sample') | |
| filename = "Sample_legend.png" | |
| filename = os.path.join(metadata_images_dir, filename) | |
| plt.savefig(filename, bbox_inches = 'tight') | |
| # ### I.7.4. CLUSTERS COLORS | |
| '''if 'cluster' in df.columns: | |
| cluster_color_values = sb.color_palette("hls",n_colors = len(df.cluster.unique())) | |
| #print(sorted(test_df.cluster.unique())) | |
| # Display those unique colors | |
| sb.palplot(sb.color_palette(cluster_color_values)) | |
| cluster_color_dict = dict(zip(sorted(test_df.cluster.unique()), cluster_color_values)) | |
| print(cluster_color_dict) | |
| # Create dataframe | |
| cluster_color_df = color_dict_to_df(cluster_color_dict, "cluster") | |
| cluster_color_df.head() | |
| # Save to file in metadatadirectory | |
| filename = "cluster_color_data.csv" | |
| filename = os.path.join(metadata_dir, filename) | |
| cluster_color_df.to_csv(filename, index = False) | |
| # Legend of cluster info only | |
| if 'cluster' in df.columns: | |
| g = plt.figure(figsize = (1,1)).add_subplot(111) | |
| g.axis('off') | |
| handles = [] | |
| for item in sorted(cluster_color_dict.keys()): | |
| h = g.bar(0,0, color = cluster_color_dict[item], | |
| label = item, linewidth =0) | |
| handles.append(h) | |
| first_legend = plt.legend(handles=handles, loc='upper right', title = 'Cluster'), | |
| filename = "Clustertype_legend.png" | |
| filename = os.path.join(metadata_images_dir, filename) | |
| plt.savefig(filename, bbox_inches = 'tight')''' | |
| mlid.head() | |
| metadata | |
| import io | |
| import panel as pn | |
| pn.extension() | |
| file_input = pn.widgets.FileInput() | |
| file_input | |
| def transform_data(variable, window, sigma): | |
| """Calculates the rolling average and identifies outliers""" | |
| avg = metadata[variable].rolling(window=window).mean() | |
| residual = metadata[variable] - avg | |
| std = residual.rolling(window=window).std() | |
| outliers = np.abs(residual) > std * sigma | |
| return avg, avg[outliers] | |
| def get_plot(variable="Exp", window=30, sigma=10): | |
| """Plots the rolling average and the outliers""" | |
| avg, highlight = transform_data(variable, window, sigma) | |
| return avg.hvplot( | |
| height=300, legend=False, | |
| ) * highlight.hvplot.scatter(padding=0.1, legend=False) | |
| variable_widget = pn.widgets.Select(name="Target", value="Exp", options=list(metadata.columns)) | |
| window_widget = pn.widgets.IntSlider(name="window", value=30, start=1, end=60) | |
| sigma_widget = pn.widgets.IntSlider(name="sigma", value=10, start=0, end=20) | |
| app = pn.template.GoldenTemplate( | |
| site="Cyc-IF", | |
| title="Quality Control", | |
| main=[ | |
| pn.Tabs( | |
| ("Dataframes", pn.Column( | |
| pn.Row(csv_files_button,pn.bind(handle_click, csv_files_button.param.clicks)), | |
| pn.pane.Markdown("### The Dataframe uploaded:"), pn.pane.DataFrame(intial_dataframe), | |
| #pn.pane.Markdown("### The Exposure time DataFrame is :"), pn.pane.DataFrame(exp_df.head()), | |
| pn.pane.Markdown("### The DataFrame after merging CycIF data x metadata :"), pn.pane.DataFrame(merged_dataframe.head()), | |
| )), | |
| ("Quality Control", pn.Column( | |
| quality_check(quality_control_df, not_intensities) | |
| #pn.pane.Markdown("### The Quality check results are:"), quality_check_results(check_shape, check_no_null, check_all_expected_files_present, check_zero_intensities) | |
| )), | |
| ("Intensities", pn.Column( | |
| pn.pane.Markdown("### The Not Intensities DataFrame after processing is :"), pn.pane.DataFrame(not_intensities_df, height=250), | |
| pn.pane.Markdown("### Select Intensities to be included"), updated_intensities, | |
| #pn.pane.Markdown("### The Intensities DataFrame"), intensities_df, | |
| #pn.pane.Markdown("### The metadata obtained that specifies the localisation:"), pn.pane.DataFrame(mlid.head()) | |
| )), | |
| ("Plots", pn.Column( | |
| #pn.pane.Markdown(" ### Nucleus Size Distribution: "), pn.Row(nucleus_size_line_graph_with_histogram, num_of_cell_removal), | |
| #pn.pane.Markdown(" ### Nucleus Size Distribution: "), pn.Row(plot1,layout2), | |
| #pn.pane.Markdown("### Nucleus Distribution Plot:"), pn.Column(nucleus_size_plot, nucleus_size_graph), | |
| pn.pane.Markdown(" ### Intensity Average Plot:"), pn.Row(selected_marker_plot,num_of_cell_removal_intensity ), | |
| #pn.Column(pn.Column(column_dropdown, generate_plot_button), quantile_slider, plot), | |
| #pn.pane.Markdown("### Cytoplasm Intensity Plot:"), cytoplasm_intensity_plot, | |
| #pn.pane.Markdown("### AF555_Cell_Intensity_Average:"), quantile_output_app, | |
| #pn.pane.Markdown("### Distribution of AF555_Cell_Intensity_Average with Quantiles:"), quantile_intensity_plot) | |
| )), | |
| ), | |
| ]) | |
| app.servable() | |
| if __name__ == "__main__": | |
| pn.serve(app, port=5007) |