db_query / queries /process_gsm.py
DavMelchi's picture
Adding City Adresse Commune and cercle to Physical DB
9d69630
import pandas as pd
from queries.process_mal import process_mal_data, process_mal_with_bts_name
from queries.process_trx import process_trx_data, process_trx_with_bts_name
from utils.config_band import config_band
from utils.convert_to_excel import convert_dfs, save_dataframe
from utils.kml_creator import generate_kml_from_df
from utils.utils_vars import GsmAnalysisData, UtilsVars, get_physical_db
BTS_COLUMNS = [
"ID_BCF",
"ID_BTS",
"ID_MAL",
"BSC",
"BCF",
"BTS",
"usedMobileAllocation",
"code",
"plmnPermitted",
"frequencyBandInUse",
"name",
"Region",
"adminState",
"allowIMSIAttachDetach",
"amrSegLoadDepTchRateLower",
"amrSegLoadDepTchRateUpper",
"btsSpLoadDepTchRateLower",
"btsSpLoadDepTchRateUpper",
"amrWbFrCodecModeSet",
"antennaHopping",
"bcchTrxPower",
"bsIdentityCodeBCC",
"bsIdentityCodeNCC",
"BSIC",
"cellId",
"dedicatedGPRScapacity",
"defaultGPRScapacity",
"fddQMin",
"fddQOffset",
"fddRscpMin",
"gprsEnabled",
"locationAreaIdLAC",
"rac",
"rachDropRxLevelThreshold",
"sectorId",
"SectorId2",
"segmentId",
"fastReturnToLTE",
"gsmPriority",
"segmentName",
"Code_Sector",
"band_frequence",
"type_cellule",
"configuration_schema",
"band",
]
BCF_COLUMNS = [
"ID_BCF",
"site_name",
]
GSM_KML_COLUMNS = [
"code",
"name",
"Longitude",
"Latitude",
"Azimut",
"Hauteur",
"BSIC",
"cellId",
"locationAreaIdLAC",
"band",
"BCCH",
"TRX_TCH",
"number_trx_per_cell",
"number_trx_per_bcf",
]
def compare_trx_tch_versus_mal(tch1, tch2):
# Split the strings by commas, convert to sets, and compare
set1 = set(str(tch1).split(",")) if isinstance(tch1, str) else set()
set2 = set(str(tch2).split(",")) if isinstance(tch2, str) else set()
return set1 == set2
def process_gsm_data(file_path: str):
"""
Process data from the specified file path.
Args:
file_path (str): The path to the file.
"""
# Read the specific sheet into a DataFrame
dfs = pd.read_excel(
file_path,
sheet_name=["BTS", "BCF"],
engine="calamine",
skiprows=[0],
)
# Process BTS data
df_bts = dfs["BTS"]
df_bts.columns = df_bts.columns.str.replace(r"[ ]", "", regex=True)
df_bts["code"] = df_bts["name"].str.split("_").str[0]
df_bts["code"] = (
pd.to_numeric(df_bts["code"], errors="coerce").fillna(0).astype(int)
)
df_bts["Region"] = df_bts["name"].str.split("_").str[1]
df_bts["ID_BTS"] = df_bts[["BSC", "BCF", "BTS"]].astype(str).apply("_".join, axis=1)
df_bts["ID_MAL"] = (
df_bts[["BSC", "usedMobileAllocation"]].astype(str).apply("_".join, axis=1)
)
df_bts["BSIC"] = (
df_bts[["bsIdentityCodeNCC", "bsIdentityCodeBCC"]]
.astype(str)
.apply("".join, axis=1)
)
df_bts["SectorId2"] = (
df_bts["sectorId"].map(UtilsVars.sector_mapping).fillna(df_bts["sectorId"])
)
df_bts["band_frequence"] = (
df_bts["frequencyBandInUse"]
.map(UtilsVars.oml_band_frequence)
.fillna("not found")
)
df_bts["type_cellule"] = (
df_bts["frequencyBandInUse"].map(UtilsVars.type_cellule).fillna("not found")
)
df_bts["band"] = (
df_bts["frequencyBandInUse"].map(UtilsVars.gsm_band).fillna("not found")
)
df_bts["configuration_schema"] = (
df_bts["frequencyBandInUse"]
.map(UtilsVars.configuration_schema)
.fillna("not found")
)
df_bts["ID_BCF"] = df_bts[["BSC", "BCF"]].astype(str).apply("_".join, axis=1)
df_bts["Code_Sector"] = (
df_bts[["code", "SectorId2"]].astype(str).apply("_".join, axis=1)
)
df_bts["Code_Sector"] = df_bts["Code_Sector"].str.replace(".0", "")
df_bts = df_bts[BTS_COLUMNS]
# Process BCF data
df_bcf = dfs["BCF"]
df_bcf.columns = df_bcf.columns.str.replace(r"[ ]", "", regex=True)
df_bcf["ID_BCF"] = df_bcf[["BSC", "BCF"]].astype(str).apply("_".join, axis=1)
df_bcf.rename(columns={"name": "site_name"}, inplace=True)
df_bcf = df_bcf[BCF_COLUMNS]
# Process TRX data
df_trx = process_trx_data(file_path)
# Process MAL data
df_mal = process_mal_data(file_path)
# create band dataframe
df_band = config_band(df_bts)
# Merge dataframes
df_bts_bcf = pd.merge(df_bcf, df_bts, on="ID_BCF", how="left")
df_2g = pd.merge(df_bts_bcf, df_trx, on="ID_BTS", how="left")
df_2g = pd.merge(df_2g, df_band, on="code", how="left")
df_2g = pd.merge(df_2g, df_mal, on="ID_MAL", how="left")
df_2g["TRX_TCH_VS_MAL"] = df_2g.apply(
lambda row: compare_trx_tch_versus_mal(row["TRX_TCH"], row["MAL_TCH"]), axis=1
)
df_physical_db = get_physical_db()
df_2g = pd.merge(df_2g, df_physical_db, on="Code_Sector", how="left")
# Save dataframes
# save_dataframe(df_band, "BAND")
# save_dataframe(df_bcf, "bcf")
# save_dataframe(df_trx, "trx")
# df_2g2 = save_dataframe(df_2g, "2g")
# UtilsVars.all_db_dfs.append(df_2g)
# UtilsVars.final_gsm_database = convert_dfs([df_2g], ["GSM"])
# UtilsVars.final_gsm_database = [df_2g]
return df_2g
def combined_gsm_database(file_path: str):
gsm_df = process_gsm_data(file_path)
mal_df = process_mal_with_bts_name(file_path)
trx_df = process_trx_with_bts_name(file_path)
UtilsVars.all_db_dfs.extend([gsm_df, mal_df, trx_df])
UtilsVars.gsm_dfs.extend([gsm_df, mal_df, trx_df])
UtilsVars.all_db_dfs_names.extend(["GSM", "MAL", "TRX"])
return [gsm_df, mal_df, trx_df]
def process_gsm_data_to_excel(file_path: str):
"""
Process data from the specified file path and save it to a excel file.
Args:
file_path (str): The path to the file.
"""
gsm_dfs = combined_gsm_database(file_path)
UtilsVars.final_gsm_database = convert_dfs(gsm_dfs, ["GSM", "MAL", "TRX"])
############################# KML CREATION #################################
def process_gsm_data_to_kml(file_path: str):
gsm_kml_df = process_gsm_data(file_path)
gsm_kml_df = gsm_kml_df[GSM_KML_COLUMNS]
# Add colors column base on "band" column
gsm_kml_df["color"] = gsm_kml_df["band"].map(UtilsVars.color_mapping)
# Add size column base on "band" column
gsm_kml_df["size"] = gsm_kml_df["band"].map(UtilsVars.size_mapping)
# Remove empty rows
gsm_kml_df = gsm_kml_df.dropna(subset=["Longitude", "Latitude", "Azimut"])
# Generate kml
UtilsVars.gsm_kml_file = generate_kml_from_df(gsm_kml_df)
#############################GSM ANALYSIS#################################
def gsm_analaysis(file_path: str):
# gsm_df = process_gsm_data(file_path)
# trx_df = process_trx_with_bts_name(file_path)
gsm_df: pd.DataFrame = UtilsVars.gsm_dfs[0]
trx_df: pd.DataFrame = UtilsVars.gsm_dfs[2]
# df to count number of site per bsc
df_site_per_bsc: pd.DataFrame = gsm_df[["BSC", "code"]]
df_site_per_bsc = df_site_per_bsc.drop_duplicates(subset=["code"], keep="first")
df_site_per_lac: pd.DataFrame = gsm_df.loc[
:, ["BSC", "locationAreaIdLAC", "code"]
].copy()
df_site_per_lac.loc[:, "code_lac"] = (
df_site_per_lac["code"].astype(str)
+ "_"
+ df_site_per_lac["locationAreaIdLAC"].astype(str)
)
df_site_per_lac = df_site_per_lac.drop_duplicates(subset=["code_lac"], keep="first")
GsmAnalysisData.total_number_of_bsc = len(
gsm_df[gsm_df["BSC"].notna()]["BSC"].unique()
)
GsmAnalysisData.total_number_of_cell = len(
gsm_df[gsm_df["ID_BTS"].notna()]["ID_BTS"].unique()
)
GsmAnalysisData.number_of_site = len(
gsm_df[gsm_df["site_name"].notna()]["site_name"].unique()
)
GsmAnalysisData.number_of_cell_per_bsc = gsm_df["BSC"].value_counts()
GsmAnalysisData.number_of_site_per_bsc = df_site_per_bsc["BSC"].value_counts()
GsmAnalysisData.number_of_bts_name_empty = gsm_df["name"].isna().sum()
GsmAnalysisData.number_of_bcf_name_empty = gsm_df["site_name"].isna().sum()
GsmAnalysisData.number_of_bcch_empty = gsm_df["BCCH"].isna().sum()
GsmAnalysisData.bts_administate_distribution = gsm_df["adminState"].value_counts()
GsmAnalysisData.trx_administate_distribution = trx_df["adminState"].value_counts()
# GsmAnalysisData.trx_administate_distribution = (
# trx_df["adminState"]
# .value_counts()
# .reset_index()
# .rename(columns={"index": "value", 0: "count"})
# )
######################################## Number of trx per bsc
GsmAnalysisData.number_of_trx_per_bsc = trx_df["BSC"].value_counts()
######################################## Number of cell per lac
GsmAnalysisData.number_of_cell_per_lac = (
gsm_df.groupby(["BSC", "locationAreaIdLAC"]).size().reset_index(name="count")
)
# Get BSC name
GsmAnalysisData.number_of_cell_per_lac["BSC_NAME"] = (
GsmAnalysisData.number_of_cell_per_lac["BSC"].map(UtilsVars.bsc_name).fillna("")
)
# Rename columns
GsmAnalysisData.number_of_cell_per_lac.rename(
columns={"BSC": "BSC", "locationAreaIdLAC": "LAC", "count": "count"},
inplace=True,
)
# Add "BSC_" and "LAC_" prefix to LAC column
GsmAnalysisData.number_of_cell_per_lac["LAC"] = (
"LAC_" + GsmAnalysisData.number_of_cell_per_lac["LAC"].astype(str)
).str.replace(".0", "")
GsmAnalysisData.number_of_cell_per_lac["BSC_NAME_ID"] = (
GsmAnalysisData.number_of_cell_per_lac[["BSC_NAME", "BSC"]]
.astype(str)
.apply("_".join, axis=1)
).str.replace(".0", "")
GsmAnalysisData.number_of_cell_per_lac = GsmAnalysisData.number_of_cell_per_lac[
["BSC_NAME_ID", "LAC", "count"]
]
######################################## Number of site per LA
GsmAnalysisData.number_of_site_per_lac = (
df_site_per_lac.groupby(["BSC", "locationAreaIdLAC"])
.size()
.reset_index(name="count")
)
# Get BSC name
GsmAnalysisData.number_of_site_per_lac["BSC_NAME"] = (
GsmAnalysisData.number_of_site_per_lac["BSC"].map(UtilsVars.bsc_name).fillna("")
)
# Rename columns
GsmAnalysisData.number_of_site_per_lac.rename(
columns={"BSC": "BSC", "locationAreaIdLAC": "LAC", "count": "count"},
inplace=True,
)
# Add "BSC_" and "LAC_" prefix to LAC column
GsmAnalysisData.number_of_site_per_lac["LAC"] = (
"LAC_" + GsmAnalysisData.number_of_site_per_lac["LAC"].astype(str)
).str.replace(".0", "")
GsmAnalysisData.number_of_site_per_lac["BSC_NAME_ID"] = (
GsmAnalysisData.number_of_site_per_lac[["BSC_NAME", "BSC"]]
.astype(str)
.apply("_".join, axis=1)
).str.replace(".0", "")
GsmAnalysisData.number_of_site_per_lac = GsmAnalysisData.number_of_site_per_lac[
["BSC_NAME_ID", "LAC", "count"]
]
# Add initialFrequency to trx_frequency_distribution from trx_df
GsmAnalysisData.trx_frequency_distribution = trx_df[
"initialFrequency"
].value_counts()