import re import pandas as pd import numpy as np from constants import * def get_information(info_type, info_string): ''' Edge case is that sometimes the 4G and 5G RRC Reconfig are appended together causing report id clashes ''' assert info_type >= 1 and info_type <= 3, "Info_type = 1 means measobjecttoaddmodlist, 2 means reportconfigtoaddmodlist and 3 means measIds" info_string = info_string.lower() if info_type == 1: pattern = r"measobjecttoaddmodlist \[ (\d*) ][\d\W\w]*?reportconfig" #Pattern to get everything in measobjecttoaddmodlist neglen = len("reportconfig") elif info_type == 2: pattern = r"reportconfigtoaddmodlist \[ (\d*) ][\d\W\w]*?measidto" #Pattern for reportConfigToAddModList neglen = len("measidto") elif info_type == 3: pattern_3_initial = r"measidtoaddmodlist \[ (\d*) ][\d\W\w]*?quant" #try this pattern first, else the other neglen = len("quant") a_s = re.search( pattern_3_initial, info_string ) if a_s is None: pattern = r"measidtoaddmodlist \[ (\d*) ][\d\W\w]*reportconfigid: \d*" #Pattern for measIdtoAddModList else: return info_string[a_s.span()[0]:a_s.span()[1] - neglen], a_s.span() neglen = 0 a_s = re.search( pattern, info_string ) if a_s is not None: return info_string[a_s.span()[0]:a_s.span()[1] - neglen], a_s.span() else: return None, (-1, -1) def clean_res(cstring): ''' cstring should come from the get_information function which is cleaned and returned ''' temp = cstring.replace(". ", "").replace(" ", "").replace("-.", "") temp = temp.lower() return temp def decode_rpt_type(info_type, info_string): ''' Decodes the message into dictionary based on the info_type filter. ''' assert info_type >= 1 and info_type <= 3, "Info_type = 1 means measobjecttoaddmodlist, 2 means reportconfigtoaddmodlist and 3 means measIds" info_string = info_string.lower() if info_type == 1: pattern = r"measobjecttoaddmodlist\[(\d*)" #Pattern to get everything in measobjecttoaddmodlist pattern2 = "measobjecttoaddmodlist" #neglen = len("reportconfig") elif info_type == 2: pattern = r"reportconfigtoaddmodlist\[(\d*)" #Pattern for reportConfigToAddModList pattern2 = "reportconfigtoaddmodlist" #neglen = len("measidto") elif info_type == 3: pattern = r"measidtoaddmodlist\[(\d*)" #Pattern for measIdtoAddModList pattern2 = "measidtoaddmodlist" #neglen = 0 reportconfigtoaddmodlist = [] mtoaddid = -1 current_key_vals = [] test_key_vals = {} for i in clean_res(info_string).split(SPLIT_PATTERN): if i.find(pattern2) > -1: if len(current_key_vals) > 0: #reportconfigtoaddmodlist.append({mtoaddid: current_key_vals}) reportconfigtoaddmodlist.append({mtoaddid: test_key_vals}) current_key_vals = [] #reset the keys inside a measurement test_key_vals = {} mtoaddid = re.findall(pattern, i)[0] if re.findall(pattern, i) is not None else -1 #print(re.findall(r"measidtoaddmodlist\[(\d*)", i)[0]) #print(mtoaddid) else: keyval = i.split(":") #print( { keyval[0]: keyval[1] }) if len(keyval) <= 1: current_key_vals.append( keyval ) test_key_vals.update( {keyval[0]: ""} ) else: current_key_vals.append({ keyval[0]: " ".join(keyval[1:]) }) test_key_vals.update( { keyval[0]: " ".join(keyval[1:]) } ) if len(current_key_vals) > 0: #reportconfigtoaddmodlist.append({mtoaddid: current_key_vals}) reportconfigtoaddmodlist.append({mtoaddid: test_key_vals}) return reportconfigtoaddmodlist def get_rrc_info_main(info_type, info_string): ''' combines the function to return a type of specific information dictionary. info_type = 1 : MeasObjectId (like earfcn and PCI...) info_type = 2 : ReportConfiguration (like event a5/a3 ....) info_type = 3 : MeasInfo (combines the above 2 to indicate type of measurement) ''' ret_str, span = get_information(info_type, info_string) if ret_str is not None: return decode_rpt_type(info_type, ret_str), span return None, (-1, -1) def find_reportconfig(report_cfg, cfg_number): ''' Tries to find the configurations in the report_cfg dictionary related to a specific id ''' assert type(cfg_number) == str, "Please use the string type as report config as per the code" for i in report_cfg: for j, k in i.items(): assert 'reportconfigid' in k, "New edge case detected. Please check code and actual values. k = {}".format(k) if k['reportconfigid'] == cfg_number: return k return None def find_measobject(measobj_cfg, cfg_number): ''' Tries to find the configurations in the measobj_cfg dictionary related to a specific id ''' assert type(cfg_number) == str, "Please use the string type as report config as per the code" for i in measobj_cfg: for j, k in i.items(): assert 'measobjectid' in k, "New edge case detected. Please check code and actual values. k = {}".format(k) if k['measobjectid'] == cfg_number: return k return None def process_main(current_text): print("Processing. ") measobjectids, span1 = get_rrc_info_main(1, current_text) reportconfigs, span2 = get_rrc_info_main(2, current_text) measids, span3 = get_rrc_info_main(3, current_text) print(span1, span2, span3) max_span = max(span1[1], span2[1], span3[1]) min_span = min(span1[1], span2[1], span3[1]) if min_span == -1: return None, "" ret = [] for i in measids: row = {} for j, k in i.items(): assert 'measid' in k, "New edge case detected. Please check code and actual values. k = {}".format(k) assert 'measobjectid' in k, "New edge case detected. Please check code and actual values. k = {}".format(k) assert 'reportconfigid', "New edge case detected. Please check code and actual values. k = {}".format(k) #print(k) k['measid'] = j #AAM - Rewrite the ids to save all rc = find_reportconfig( reportconfigs, k['reportconfigid'] ) mo = find_measobject( measobjectids, k['measobjectid'] ) row = k row.update(rc) row.update(mo) ret.append(row) print("Appended information together. Total length of measurement Ids is {}".format(len(ret))) ########################################################################################### # Display the results now. ########################################################################################### if len(ret) <= 0: return None, "" row = [] for i in ret: row.append({x:y for x,y in i.items() if x in main_keys}) df = pd.DataFrame( row ).fillna("") all_columns = [x for x in main_keys if x in df.columns] all_columns.extend([x for x in df.columns if x not in main_keys]) df = df[all_columns] df = df.T print(df.head()) print("*"*50) df.columns = df.loc['measid'] print(df.head()) print("*"*50) df = df[df.index != 'measid'] print(df.head()) print("*"*50) return df, current_text[max_span: ]