|
|
""" |
|
|
Module: gene_mapper.py |
|
|
|
|
|
This module provides utilities for mapping gene identifiers between human and mouse datasets, |
|
|
as well as handling orthology relationships. It is designed to process gene expression data |
|
|
and map gene IDs to standardized formats for downstream analysis. |
|
|
|
|
|
Main Features: |
|
|
- Map human and mouse gene IDs to a common reference format. |
|
|
- Handle orthology relationships to convert mouse gene symbols to human gene symbols. |
|
|
- Combine mapping results from multiple sources and flag discrepancies. |
|
|
- Transform wide-format gene data into long-format for easier processing. |
|
|
- Categorize gene mappings based on their relationships (e.g., one-to-one, one-to-many). |
|
|
|
|
|
Dependencies: |
|
|
- pandas: For data manipulation. |
|
|
- numpy: For numerical operations. |
|
|
- warnings: For handling warnings during processing. |
|
|
|
|
|
Usage: |
|
|
- Import the functions and use them to map gene IDs or process gene data. |
|
|
- Run the script directly to execute test cases for the implemented functions. |
|
|
|
|
|
Why: |
|
|
- This module is essential for harmonizing gene identifiers across datasets, enabling |
|
|
consistent analysis of gene expression data from different species or sources. |
|
|
""" |
|
|
|
|
|
import warnings |
|
|
|
|
|
import numpy as np |
|
|
import pandas as pd |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def map_mouse_human(data_frame, query_column, human_map_db, mouse_map_db, orthology_db, verbose=False): |
|
|
""" |
|
|
Maps gene IDs from a dataset to human and mouse reference databases, and resolves orthology relationships. |
|
|
|
|
|
Args: |
|
|
data_frame (pd.DataFrame): Input data containing gene IDs to map. |
|
|
query_column (str): Column name in the input data containing gene IDs. |
|
|
human_map_db (pd.DataFrame): Reference database for human gene mapping. |
|
|
mouse_map_db (pd.DataFrame): Reference database for mouse gene mapping. |
|
|
orthology_db (pd.DataFrame): Database containing orthology relationships between mouse and human genes. |
|
|
verbose (bool): Whether to print detailed logs during processing. |
|
|
|
|
|
Returns: |
|
|
pd.DataFrame: A combined mapping result with discrepancies flagged. |
|
|
""" |
|
|
if verbose: |
|
|
print("------------ map human gene ids ------------") |
|
|
mapped_hsap = map_genes( |
|
|
expr_mat=data_frame, |
|
|
expr_ids=query_column, |
|
|
annot_mat=human_map_db, |
|
|
annot_from="id", |
|
|
annot_to="reference_id", |
|
|
return_unmapped=True, |
|
|
keep_prev_ids=True, |
|
|
verbose=verbose, |
|
|
) |
|
|
|
|
|
if verbose: |
|
|
print("------------ map mouse gene ids ------------") |
|
|
mapped_mus = map_genes( |
|
|
expr_mat=data_frame, |
|
|
expr_ids=query_column, |
|
|
annot_mat=mouse_map_db, |
|
|
annot_from="id", |
|
|
annot_to="reference_id", |
|
|
return_unmapped=True, |
|
|
keep_prev_ids=True, |
|
|
verbose=verbose, |
|
|
) |
|
|
|
|
|
if verbose: |
|
|
print("------------ mouse to human orthologs ------------") |
|
|
mouse_hsap = orthologs_to_human( |
|
|
mouse_df=mapped_mus, |
|
|
mouse_col="reference_id", |
|
|
orthology_df=orthology_db, |
|
|
ortho_mouse_col="mouse_gene_symbol", |
|
|
ortho_human_col="human_gene_symbol", |
|
|
ortho_type_col="mouse_homology_type", |
|
|
orthology_type="ortholog_one2one", |
|
|
) |
|
|
|
|
|
mouse_hsap = mouse_hsap.loc[:, ["previous_ids", "human_gene_symbol"]].drop_duplicates() |
|
|
mouse_hsap = mouse_hsap.rename(columns={"human_gene_symbol": "reference_id"}) |
|
|
|
|
|
if verbose: |
|
|
print("------------ combine results ------------") |
|
|
both_mapped = combine_dataframe_columns( |
|
|
df1=mapped_hsap, df2=mouse_hsap, id_column="previous_ids", reference_id_column="reference_id", verbose=verbose |
|
|
) |
|
|
both_mapped = both_mapped.loc[:, ["previous_ids", "reference_id", "discrepancy_flag"]].drop_duplicates() |
|
|
|
|
|
return both_mapped |
|
|
|
|
|
|
|
|
def map_mouse_human2(data_frame, query_column, human_map_db, mouse_map_db, orthology_db, verbose=False): |
|
|
if verbose: |
|
|
print("------------ map human gene ids ------------") |
|
|
mapped_hsap = map_genes( |
|
|
expr_mat=data_frame, |
|
|
expr_ids=query_column, |
|
|
annot_mat=human_map_db, |
|
|
annot_from="id", |
|
|
annot_to="reference_id", |
|
|
return_unmapped=True, |
|
|
keep_prev_ids=True, |
|
|
verbose=verbose, |
|
|
) |
|
|
|
|
|
if verbose: |
|
|
print("------------ map mouse gene ids ------------") |
|
|
mapped_mus = map_genes( |
|
|
expr_mat=data_frame, |
|
|
expr_ids=query_column, |
|
|
annot_mat=mouse_map_db, |
|
|
annot_from="id", |
|
|
annot_to="reference_id", |
|
|
return_unmapped=True, |
|
|
keep_prev_ids=True, |
|
|
verbose=verbose, |
|
|
) |
|
|
|
|
|
if verbose: |
|
|
print("------------ mouse to human orthologs ------------") |
|
|
mouse_hsap = orthologs_to_human( |
|
|
mouse_df=mapped_mus, |
|
|
mouse_col="reference_id", |
|
|
orthology_df=orthology_db, |
|
|
ortho_mouse_col="mouse_gene_symbol", |
|
|
ortho_human_col="human_gene_symbol", |
|
|
ortho_type_col="mouse_homology_type", |
|
|
orthology_type="ortholog_one2one", |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if verbose: |
|
|
print(mouse_hsap.shape) |
|
|
mouse_hsap_filt = mouse_hsap.loc[ |
|
|
(mouse_hsap.previous_ids.str.contains("ENSMUS")) | (~mouse_hsap.mouse_gene_symbol.isnull()), : |
|
|
] |
|
|
|
|
|
if verbose: |
|
|
print(mouse_hsap_filt.shape) |
|
|
|
|
|
mouse_hsap = mouse_hsap_filt |
|
|
|
|
|
|
|
|
mouse_hsap.loc[mouse_hsap["mouse_homology_type"] != "ortholog_one2one", "human_gene_symbol"] = pd.NA |
|
|
|
|
|
if verbose: |
|
|
print("\n=========\tcount missing\t=========") |
|
|
print(sum(mouse_hsap.human_gene_symbol.isnull())) |
|
|
|
|
|
mouse_hsap["human_gene_symbol"] = mouse_hsap["human_gene_symbol"].fillna(mouse_hsap["previous_ids"]) |
|
|
|
|
|
if verbose: |
|
|
print(sum(mouse_hsap.human_gene_symbol.str.contains("ENSMUSG"))) |
|
|
|
|
|
if verbose: |
|
|
print("\n=========\tdoes not contain ENSMUSG\t=========") |
|
|
print(mouse_hsap["previous_ids"][~mouse_hsap["previous_ids"].str.contains("ENSMUSG")].shape) |
|
|
print(mouse_hsap["human_gene_symbol"][~mouse_hsap["human_gene_symbol"].str.contains("ENSMUSG")].shape) |
|
|
|
|
|
print("\n=========\tcount missing\t=========") |
|
|
print(sum(mouse_hsap.human_gene_symbol.isnull())) |
|
|
|
|
|
mouse_hsap = mouse_hsap.loc[:, ["previous_ids", "human_gene_symbol"]].drop_duplicates() |
|
|
mouse_hsap = mouse_hsap.rename(columns={"human_gene_symbol": "reference_id"}) |
|
|
|
|
|
if verbose: |
|
|
print("------------ combine results ------------") |
|
|
both_mapped = combine_dataframe_columns( |
|
|
df1=mapped_hsap, df2=mouse_hsap, id_column="previous_ids", reference_id_column="reference_id", verbose=verbose |
|
|
) |
|
|
both_mapped = both_mapped.loc[:, ["previous_ids", "reference_id", "discrepancy_flag"]].drop_duplicates() |
|
|
|
|
|
return both_mapped |
|
|
|
|
|
|
|
|
def combine_dataframe_columns(df1, df2, id_column, reference_id_column, verbose=True): |
|
|
""" |
|
|
Combines two dataframes by merging on a common ID column and flags discrepancies in reference IDs. |
|
|
|
|
|
Args: |
|
|
df1 (pd.DataFrame): First dataframe to merge. |
|
|
df2 (pd.DataFrame): Second dataframe to merge. |
|
|
id_column (str): Column name to merge on. |
|
|
reference_id_column (str): Column name containing reference IDs. |
|
|
verbose (bool): Whether to print detailed logs during processing. |
|
|
|
|
|
Returns: |
|
|
pd.DataFrame: A merged dataframe with discrepancies flagged. |
|
|
""" |
|
|
|
|
|
df1[reference_id_column] = df1[reference_id_column].replace("", pd.NA) |
|
|
df2[reference_id_column] = df2[reference_id_column].replace("", pd.NA) |
|
|
|
|
|
if verbose: |
|
|
|
|
|
missing_df1 = df1[reference_id_column].isna().sum() |
|
|
missing_df2 = df2[reference_id_column].isna().sum() |
|
|
print(f"Missing values in {reference_id_column} of df1: {missing_df1}") |
|
|
print(f"Missing values in {reference_id_column} of df2: {missing_df2}") |
|
|
|
|
|
|
|
|
merged_df = pd.merge(df1, df2, on=id_column, how="outer", suffixes=("_df1", "_df2")) |
|
|
|
|
|
|
|
|
merged_df["discrepancy_flag"] = np.where( |
|
|
(merged_df[f"{reference_id_column}_df1"].notna()) |
|
|
& (merged_df[f"{reference_id_column}_df2"].notna()) |
|
|
& (merged_df[f"{reference_id_column}_df1"] != merged_df[f"{reference_id_column}_df2"]), |
|
|
True, |
|
|
False, |
|
|
) |
|
|
|
|
|
|
|
|
merged_df[reference_id_column] = np.where( |
|
|
merged_df[f"{reference_id_column}_df1"].notna(), |
|
|
merged_df[f"{reference_id_column}_df1"], |
|
|
merged_df[f"{reference_id_column}_df2"], |
|
|
) |
|
|
|
|
|
|
|
|
final_df = merged_df[ |
|
|
[id_column, reference_id_column, f"{reference_id_column}_df1", f"{reference_id_column}_df2", "discrepancy_flag"] |
|
|
].fillna("") |
|
|
|
|
|
if verbose: |
|
|
|
|
|
missing_final = final_df[reference_id_column].isna().sum() |
|
|
print(f"Missing values in final merged {reference_id_column}: {missing_final}") |
|
|
|
|
|
|
|
|
if final_df["discrepancy_flag"].any(): |
|
|
print("Warning: There are discrepancies in the reference IDs between the two dataframes.") |
|
|
|
|
|
return final_df |
|
|
|
|
|
|
|
|
def orthologs_to_human( |
|
|
mouse_df, |
|
|
orthology_df, |
|
|
mouse_col, |
|
|
ortho_mouse_col, |
|
|
ortho_human_col, |
|
|
ortho_type_col, |
|
|
orthology_type="ortholog_one2one", |
|
|
): |
|
|
""" |
|
|
Merges a mouse data_processing frame with an orthology data_processing frame to convert mouse gene symbols to human gene symbols. |
|
|
|
|
|
Parameters: |
|
|
- mouse_df: pd.DataFrame - The data_processing frame containing mouse gene symbols. |
|
|
- orthology_df: pd.DataFrame - The data_processing frame containing orthology information. |
|
|
- mouse_col: str - The column name in the mouse_df that contains mouse gene symbols. |
|
|
- ortho_mouse_col: str - The column name in the orthology_df that contains mouse gene symbols. |
|
|
- ortho_human_col: str - The column name in the orthology_df that contains human gene symbols. |
|
|
- ortho_type_col: str - The column name in the orthology_df that contains the orthology type. |
|
|
- orthology_type: str - The type of orthology to keep (default is 'ortholog_one2one'). |
|
|
|
|
|
Returns: |
|
|
- merged_df: pd.DataFrame - The merged data_processing frame with human gene symbols included. |
|
|
""" |
|
|
|
|
|
|
|
|
unique_ortho_types = orthology_df[ortho_type_col].unique() |
|
|
|
|
|
if orthology_type not in unique_ortho_types: |
|
|
print(f"Error: Specified orthology type '{orthology_type}' not found.") |
|
|
print("Available orthology types are:", unique_ortho_types) |
|
|
return None |
|
|
|
|
|
|
|
|
filtered_orthology_df = orthology_df[orthology_df[ortho_type_col] == orthology_type] |
|
|
|
|
|
|
|
|
merged_df = mouse_df.merge( |
|
|
filtered_orthology_df[[ortho_mouse_col, ortho_human_col, ortho_type_col]], |
|
|
left_on=mouse_col, |
|
|
right_on=ortho_mouse_col, |
|
|
how="left", |
|
|
) |
|
|
|
|
|
return merged_df |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def preprocess_wide_to_long(df, reference_id, sep="|", keep_id_type=True): |
|
|
""" |
|
|
Transforms the given DataFrame into a long format table where one specified column represents reference IDs |
|
|
and all the entries from the other columns, including the specified column, are put into the second column. |
|
|
Entries separated by a specified separator are split into individual values. Removes any duplicate values. |
|
|
Handles NaN values appropriately by skipping them and removes rows with NaN in the reference_id column. |
|
|
|
|
|
Args: |
|
|
df (pd.DataFrame): The input DataFrame with gene information. |
|
|
reference_id (str): The column name to be used as the reference identifier. |
|
|
sep (str): The separator used to split entries in the ID columns. |
|
|
keep_id_type (bool): Whether to keep the id_type column in the final output. |
|
|
|
|
|
Returns: |
|
|
pd.DataFrame: The transformed long format DataFrame with split values. |
|
|
""" |
|
|
|
|
|
if df.columns.duplicated().any(): |
|
|
raise ValueError("Duplicate column names detected in the DataFrame.") |
|
|
|
|
|
|
|
|
initial_row_count = df.shape[0] |
|
|
df = df.dropna(subset=[reference_id]) |
|
|
final_row_count = df.shape[0] |
|
|
|
|
|
if initial_row_count != final_row_count: |
|
|
print( |
|
|
f"Removed {initial_row_count - final_row_count} rows with NaN in '{reference_id}'. {final_row_count} rows remain." |
|
|
) |
|
|
else: |
|
|
print("No rows with NaN in the reference_id were found.") |
|
|
|
|
|
|
|
|
if df[reference_id].duplicated().any(): |
|
|
print( |
|
|
f"Warning: Duplicate values found in the '{reference_id}' column. This may cause issues with the transformation." |
|
|
) |
|
|
|
|
|
long_format_data = [] |
|
|
|
|
|
|
|
|
for col in df.columns: |
|
|
if col != reference_id: |
|
|
|
|
|
if pd.api.types.is_numeric_dtype(df[col]): |
|
|
df[col] = df[col].astype(str) |
|
|
|
|
|
exploded_df = df[[reference_id, col]].dropna().assign(**{col: df[col].str.split(sep)}) |
|
|
exploded_df = exploded_df.explode(col) |
|
|
exploded_df["id_type"] = col |
|
|
exploded_df = exploded_df.rename(columns={col: "id"}) |
|
|
long_format_data.append(exploded_df) |
|
|
|
|
|
|
|
|
long_df = pd.concat(long_format_data) |
|
|
|
|
|
|
|
|
reference_id_df = df[[reference_id]].dropna() |
|
|
reference_id_df["id_type"] = reference_id |
|
|
reference_id_df["id"] = reference_id_df[reference_id] |
|
|
long_df = pd.concat([long_df, reference_id_df], ignore_index=True) |
|
|
|
|
|
|
|
|
long_df = long_df.rename(columns={reference_id: "reference_id"}) |
|
|
|
|
|
|
|
|
long_df.drop_duplicates(inplace=True) |
|
|
|
|
|
if not keep_id_type: |
|
|
|
|
|
long_df = long_df.drop(columns=["id_type"]).drop_duplicates() |
|
|
|
|
|
|
|
|
columns_order = ["id", "reference_id"] if not keep_id_type else ["id", "id_type", "reference_id"] |
|
|
long_df = long_df[columns_order] |
|
|
|
|
|
return long_df |
|
|
|
|
|
|
|
|
def categorise_mapping(df, ids_from_col, ids_to_col): |
|
|
|
|
|
id_counts = df[ids_from_col].value_counts() |
|
|
gene_counts = df[ids_to_col].value_counts() |
|
|
|
|
|
|
|
|
df["id_count"] = df[ids_from_col].map(id_counts) |
|
|
df["gene_count"] = df[ids_to_col].map(gene_counts) |
|
|
|
|
|
|
|
|
conditions = [(df["id_count"] > 1) & (df["gene_count"] > 1), (df["id_count"] > 1), (df["gene_count"] > 1)] |
|
|
choices = ["many2many", "one2many", "many2one"] |
|
|
df["match_type"] = np.select(conditions, choices, default="one2one") |
|
|
|
|
|
|
|
|
df.drop(columns=["id_count", "gene_count"], inplace=True) |
|
|
|
|
|
return df |
|
|
|
|
|
|
|
|
def remove_whitespace(series): |
|
|
|
|
|
return series.astype(str).str.strip() |
|
|
|
|
|
|
|
|
def unlist(nested_list): |
|
|
""" |
|
|
Recursively flattens a nested list. |
|
|
|
|
|
Args: |
|
|
nested_list (list): A list that may contain nested lists. |
|
|
|
|
|
Returns: |
|
|
list: A flattened list. |
|
|
""" |
|
|
flattened = [] |
|
|
for item in nested_list: |
|
|
if isinstance(item, list): |
|
|
flattened.extend(unlist(item)) |
|
|
else: |
|
|
flattened.append(item) |
|
|
return flattened |
|
|
|
|
|
|
|
|
def map_genes( |
|
|
expr_mat, |
|
|
expr_ids=None, |
|
|
annot_mat=None, |
|
|
annot_from="id", |
|
|
annot_to="hgnc_symbol", |
|
|
return_unmapped=False, |
|
|
verbose=True, |
|
|
error=False, |
|
|
keep_prev_ids=False, |
|
|
): |
|
|
"""TODO: The code currently breaks when expr_mat already has a column called referene_id. This is because the mapped = pd.merge(...) does not merge the reference_id columns. Try to fix this.""" |
|
|
|
|
|
if expr_ids is not None: |
|
|
expr_mat = expr_mat.rename(columns={expr_ids: "previous_ids"}) |
|
|
expr_ids = "previous_ids" |
|
|
|
|
|
if expr_ids is None: |
|
|
expr_ids = "previous_ids" |
|
|
expr_mat[expr_ids] = expr_mat.index |
|
|
|
|
|
with warnings.catch_warnings(): |
|
|
warnings.simplefilter(action="ignore", category=pd.errors.SettingWithCopyWarning) |
|
|
|
|
|
expr_mat[expr_ids] = remove_whitespace(expr_mat[expr_ids]) |
|
|
|
|
|
if verbose: |
|
|
print("\n [ gene ID mapping ] \n") |
|
|
print( |
|
|
f"\tdataset contains : {len(expr_mat['previous_ids'])} ids, of which unique: {len(expr_mat['previous_ids'].unique())} - {round(len(expr_mat['previous_ids'].unique()) / len(expr_mat['previous_ids']) * 100, 1)}%" |
|
|
) |
|
|
|
|
|
|
|
|
missing_genes = expr_mat[expr_mat[expr_ids].isin([None, "", "nan"])] |
|
|
if not missing_genes.empty: |
|
|
if verbose: |
|
|
print(f"\tfound {len(missing_genes)} missing ids", list(missing_genes[expr_ids].unique())[:5]) |
|
|
expr_mat = expr_mat[~expr_mat[expr_ids].isin([None, "", "nan"])] |
|
|
|
|
|
|
|
|
premapped = expr_mat[expr_mat["previous_ids"].isin(annot_mat[annot_to])] |
|
|
premapped.loc[:, annot_to] = premapped["previous_ids"] |
|
|
|
|
|
if verbose: |
|
|
print( |
|
|
f'\n\texpr_mat - of {len(expr_mat["previous_ids"].unique())} ids {len(premapped["previous_ids"].unique())} - {round(len(premapped["previous_ids"].unique()) / len(expr_mat["previous_ids"].unique()) * 100, 3)}% directly map to annot_mat${annot_to}\n' |
|
|
) |
|
|
|
|
|
|
|
|
unmapped_hgnc = expr_mat[~expr_mat["previous_ids"].isin(premapped["previous_ids"])] |
|
|
if unmapped_hgnc.empty: |
|
|
if keep_prev_ids: |
|
|
return premapped.drop_duplicates() |
|
|
return premapped.drop(columns=["previous_ids"], errors="ignore").drop_duplicates() |
|
|
|
|
|
mapped = pd.merge( |
|
|
expr_mat[~expr_mat["previous_ids"].isin(premapped["previous_ids"])], |
|
|
annot_mat[[annot_from, annot_to]].drop_duplicates(), |
|
|
left_on="previous_ids", |
|
|
right_on=annot_from, |
|
|
how="inner", |
|
|
) |
|
|
|
|
|
mapped = pd.concat([mapped, premapped if not premapped.empty else None]) |
|
|
|
|
|
|
|
|
remap = expr_mat[~expr_mat["previous_ids"].isin(mapped["previous_ids"])] |
|
|
remap.loc[:, "previous_ids"] = remap["previous_ids"].str.lower() |
|
|
|
|
|
reannot = annot_mat[[annot_from, annot_to]].drop_duplicates() |
|
|
reannot[annot_from] = reannot[annot_from].str.lower() |
|
|
|
|
|
remap = pd.merge(remap, reannot, left_on="previous_ids", right_on=annot_from, how="inner") |
|
|
|
|
|
mapped = pd.concat([mapped, remap]).drop_duplicates() |
|
|
|
|
|
dups = mapped[mapped.duplicated(subset=[annot_to], keep=False)][annot_to].unique() |
|
|
uniq = mapped[~mapped[annot_to].isin(dups)][annot_to].unique() |
|
|
|
|
|
if verbose: |
|
|
print(f'\tone2one: {len(uniq)}\t{", ".join(uniq[:5])}') |
|
|
print(f'\tmany2one: {len(dups)}\t{", ".join(dups[:5])}') |
|
|
|
|
|
unmapped = expr_mat["previous_ids"][ |
|
|
~expr_mat["previous_ids"].str.lower().isin(mapped["previous_ids"].str.lower()) |
|
|
].unique() |
|
|
|
|
|
if verbose: |
|
|
print(f'\n\tunmapped genes: {len(unmapped)}\t:: {", ".join(unmapped[:5])}\n') |
|
|
print("\n\n") |
|
|
|
|
|
result = mapped |
|
|
|
|
|
if return_unmapped: |
|
|
unmapped_expr_mat = expr_mat[expr_mat["previous_ids"].isin(unmapped)] |
|
|
if not unmapped_expr_mat.empty: |
|
|
unmapped_expr_mat.loc[:, annot_to] = "" |
|
|
result = pd.concat([result, unmapped_expr_mat]) |
|
|
|
|
|
result = result.loc[:, result.columns.isin(unlist([list(expr_mat.columns.values), annot_to]))] |
|
|
|
|
|
if keep_prev_ids: |
|
|
return result.drop_duplicates() |
|
|
return result.drop(columns=["previous_ids"], errors="ignore").drop_duplicates() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_transform_function(): |
|
|
""" |
|
|
Test case for the transform_and_split_to_long_format function using a toy example. |
|
|
""" |
|
|
data = { |
|
|
"Gene stable ID": ["ID1|ID2", "ID3", "ID4|ID5"], |
|
|
"Gene stable ID version": ["ID1.1", "ID3.1", None], |
|
|
"Gene Synonym": ["Syn1", None, "Syn4"], |
|
|
"Gene name": ["GeneA", "GeneB", "GeneC"], |
|
|
} |
|
|
|
|
|
df = pd.DataFrame(data) |
|
|
|
|
|
expected_data = { |
|
|
"id": ["ID1", "ID2", "ID1.1", "Syn1", "GeneA", "ID3", "ID3.1", "GeneB", "ID4", "ID5", "Syn4", "GeneC"], |
|
|
"id_type": [ |
|
|
"Gene stable ID", |
|
|
"Gene stable ID", |
|
|
"Gene stable ID version", |
|
|
"Gene Synonym", |
|
|
"Gene name", |
|
|
"Gene stable ID", |
|
|
"Gene stable ID version", |
|
|
"Gene name", |
|
|
"Gene stable ID", |
|
|
"Gene stable ID", |
|
|
"Gene Synonym", |
|
|
"Gene name", |
|
|
], |
|
|
"reference_id": [ |
|
|
"GeneA", |
|
|
"GeneA", |
|
|
"GeneA", |
|
|
"GeneA", |
|
|
"GeneA", |
|
|
"GeneB", |
|
|
"GeneB", |
|
|
"GeneB", |
|
|
"GeneC", |
|
|
"GeneC", |
|
|
"GeneC", |
|
|
"GeneC", |
|
|
], |
|
|
} |
|
|
|
|
|
expected_df = pd.DataFrame(expected_data) |
|
|
|
|
|
|
|
|
long_df = transform_and_split_to_long_format(df, "Gene name") |
|
|
|
|
|
|
|
|
long_df = long_df.sort_values(by=["id", "id_type", "reference_id"]).reset_index(drop=True) |
|
|
expected_df = expected_df.sort_values(by=["id", "id_type", "reference_id"]).reset_index(drop=True) |
|
|
|
|
|
|
|
|
assert long_df.equals(expected_df), "test_transform_function\t\t- did not produce expected result" |
|
|
|
|
|
print("test_transform_function\t\t- passed") |
|
|
|
|
|
|
|
|
|
|
|
def test_categorise_function(): |
|
|
mapping_test_data = { |
|
|
"ids": ["id1", "id2", "id3", "id4", "id1", "id5"], |
|
|
"gene_names": ["gene1", "gene2", "gene3", "gene3", "gene4", "gene5"], |
|
|
"expected_match_type": ["one2many", "one2one", "many2one", "many2one", "one2many", "one2one"], |
|
|
} |
|
|
|
|
|
mapping_test_data = pd.DataFrame(mapping_test_data) |
|
|
|
|
|
test_data = { |
|
|
"ids": ["id1", "id2", "id3", "id4", "id1", "id5"], |
|
|
"gene_names": ["gene1", "gene2", "gene3", "gene3", "gene4", "gene5"], |
|
|
} |
|
|
|
|
|
df_test = pd.DataFrame(test_data) |
|
|
|
|
|
print("\nRunning optimized version:") |
|
|
annotated_df_optimized = categorise_mapping(df_test.copy(), "ids", "gene_names") |
|
|
print(annotated_df_optimized) |
|
|
|
|
|
|
|
|
assert ( |
|
|
annotated_df_optimized["match type"].tolist() == mapping_test_data["expected_match_type"].tolist() |
|
|
), "Optimized version failed" |
|
|
|
|
|
print("\ntest_categorise_function\t\t- passed") |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
test_transform_function() |
|
|
test_categorise_function() |
|
|
|