from sentence_transformers import SentenceTransformer, util import json import time import pandas as pd import numpy as np import pickle import chromadb from chromadb.config import Settings from chromadb.utils import embedding_functions from chromadb.db.clickhouse import NoDatapointsException def prepare_cd(conceptDescriptions): df_cd = pd.DataFrame( columns=["SemanticId", "Definition", "PreferredName", "Datatype", "Unit"] ) # In den leeren DF werden alle Concept Descriptions eingelesen for cd in conceptDescriptions: semantic_id = cd["identification"]["id"] data_spec = cd["embeddedDataSpecifications"][0]["dataSpecificationContent"] preferred_name = data_spec["preferredName"] short_name = data_spec["shortName"] if len(preferred_name) > 1: for name_variant in preferred_name: if ( name_variant["language"] == "EN" or name_variant["language"] == "en" or name_variant["language"] == "EN?" ): name = name_variant["text"] elif len(preferred_name) == 1: name = preferred_name[0]["text"] elif len(preferred_name) == 0: short_name = data_spec["shortName"] if len(short_name) == 0: name = "NaN" else: name = short_name[0]["text"] definition = data_spec["definition"] if len(definition) > 1: for definition_variant in definition: if ( definition_variant["language"] == "EN" or definition_variant["language"] == "en" or definition_variant["language"] == "EN?" ): chosen_def = definition_variant["text"] elif len(definition) == 1: chosen_def = definition[0]["text"] elif len(definition) == 0: chosen_def = "NaN" if data_spec["dataType"] == "": datatype = "NaN" else: datatype = data_spec["dataType"] if data_spec["unit"] == "": unit = "NaN" else: unit = data_spec["unit"] new_entry = pd.DataFrame( { "SemanticId": semantic_id, "Definition": chosen_def, "PreferredName": name, "Datatype": datatype, "Unit": unit, }, index=[0], ) df_cd = pd.concat([df_cd, new_entry], ignore_index=True) return df_cd def get_values(submodel_element): # Auslesen der Submodel Element Werte se_type = submodel_element["modelType"]["name"] se_semantic_id = submodel_element["semanticId"]["keys"][0]["value"] se_semantic_id_local = submodel_element["semanticId"]["keys"][0]["local"] se_id_short = submodel_element["idShort"] value = [] se_value = submodel_element["value"] value.append(se_value) return se_type, se_semantic_id, se_semantic_id_local, se_id_short, value def get_concept_description(semantic_id, df_cd): cd_content = df_cd.loc[df_cd["SemanticId"] == semantic_id] if cd_content.empty: cd_content = pd.DataFrame( { "SemanticId": semantic_id, "Definition": "NaN", "PreferredName": "NaN", "Datatype": "NaN", "Unit": "NaN", }, index=[0], ) cd_content = cd_content.iloc[0] return cd_content def get_values_sec( df_cd, content, df, aas_id, aas_name, submodel_id, submodel_name, submodel_semantic_id, ): collection_values = content[0]["value"] for element in collection_values: content = [] content.append(element) se_type, se_semantic_id, se_semantic_id_local, se_id_short, value = get_values( element ) if se_type == "SubmodelElementCollection": if se_semantic_id_local == True: cd_content = get_concept_description(se_semantic_id, df_cd) definition = cd_content["Definition"] preferred_name = cd_content["PreferredName"] datatype = cd_content["Datatype"] unit = cd_content["Unit"] else: definition = "NaN" preferred_name = "NaN" datatype = "NaN" unit = "NaN" new_row = pd.DataFrame( { "AASId": aas_id, "AASIdShort": aas_name, "SubmodelId": submodel_id, "SubmodelName": submodel_name, "SubmodelSemanticId": submodel_semantic_id, "SEContent": content, "SESemanticId": se_semantic_id, "SEModelType": se_type, "SEIdShort": se_id_short, "SEValue": value, "Definition": definition, "PreferredName": preferred_name, "Datatype": datatype, "Unit": unit, } ) df = pd.concat([df, new_row], ignore_index=True) content = [] content.append(element) # Rekursive Funktion -> so oft durchlaufen bis unterste Ebene der Collections erreicht ist, so werden verschachteltet SECs bis zum Ende ausgelesen df = get_values_sec( df_cd, content, df, aas_id, aas_name, submodel_id, submodel_name, submodel_semantic_id, ) else: if se_semantic_id_local == True: cd_content = get_concept_description(se_semantic_id, df_cd) definition = cd_content["Definition"] preferred_name = cd_content["PreferredName"] datatype = cd_content["Datatype"] unit = cd_content["Unit"] else: definition = "NaN" preferred_name = "NaN" datatype = "NaN" unit = "NaN" new_row = pd.DataFrame( { "AASId": aas_id, "AASIdShort": aas_name, "SubmodelId": submodel_id, "SubmodelName": submodel_name, "SubmodelSemanticId": submodel_semantic_id, "SEContent": content, "SESemanticId": se_semantic_id, "SEModelType": se_type, "SEIdShort": se_id_short, "SEValue": value, "Definition": definition, "PreferredName": preferred_name, "Datatype": datatype, "Unit": unit, } ) df = pd.concat([df, new_row], ignore_index=True) return df def set_up_metadata(metalabel, df): datatype_mapping = { "boolean": "BOOLEAN", "string": "STRING", "string_translatable": "STRING", "translatable_string": "STRING", "non_translatable_string": "STRING", "date": "DATE", "data_time": "DATE", "uri": "URI", "int": "INT", "int_measure": "INT", "int_currency": "INT", "integer": "INT", "real": "REAL", "real_measure": "REAL", "real_currency": "REAL", "enum_code": "ENUM_CODE", "enum_int": "ENUM_CODE", "ENUM_REAL": "ENUM_CODE", "ENUM_RATIONAL": "ENUM_CODE", "ENUM_BOOLEAN": "ENUM_CODE", "ENUM_STRING": "ENUM_CODE", "enum_reference": "ENUM_CODE", "enum_instance": "ENUM_CODE", "set(b1,b2)": "SET", "constrained_set(b1,b2,cmn,cmx)": "SET", "set [0,?]": "SET", "set [1,?]": "SET", "set [1, ?]": "SET", "nan": "NaN", "media_type": "LARGE_OBJECT_TYPE", } unit_mapping = { "nan": "NaN", "hertz": "FREQUENCY", "hz": "FREQUENCY", "pa": "PRESSURE", "pascal": "PRESSURE", "n/m²": "PRESSURE", "bar": "PRESSURE", "%": "SCALARS_PERC", "w": "POWER", "watt": "POWER", "kw": "POWER", "kg/m³": "CHEMISTRY", "m²/s": "CHEMISTRY", "pa*s": "CHEMISTRY", "v": "ELECTRICAL", "volt": "ELECTRICAL", "db": "ACOUSTICS", "db(a)": "ACOUSTICS", "k": "TEMPERATURE", "°c": "TEMPERATURE", "n": "MECHANICS", "newton": "MECHANICS", "kg/s": "FLOW", "kg/h": "FLOW", "m³/s": "FLOW", "m³/h": "FLOW", "l/s": "FLOW", "l/h": "FLOW", "µm": "LENGTH", "mm": "LENGTH", "cm": "LENGTH", "dm": "LENGTH", "m": "LENGTH", "meter": "LENGTH", "m/s": "SPEED", "km/h": "SPEED", "s^(-1)": "FREQUENCY", "1/s": "FREQUENCY", "s": "TIME", "h": "TIME", "min": "TIME", "d": "TIME", "hours": "TIME", "a": "ELECTRICAL", "m³": "VOLUME", "m²": "AREA", "rpm": "FLOW", "nm": "MECHANICS", "m/m": "MECHANICS", "m³/m²s": "MECHANICS", "w(m²*K)": "HEAT_TRANSFER", "kwh": "ELECTRICAL", "kg/(s*m²)": "FLOW", "kg": "MASS", "w/(m*k)": "HEAT_TRANSFER", "m²*k/w": "HEAT_TRANSFER", "j/s": "POWER", } dataset = df dataset["unit_lowercase"] = dataset["Unit"] dataset["unit_lowercase"] = dataset["unit_lowercase"].str.lower() dataset["unit_categ"] = dataset["unit_lowercase"].map(unit_mapping) dataset["datatype_lowercase"] = dataset["Datatype"] dataset["datatype_lowercase"] = dataset["datatype_lowercase"].str.lower() dataset["datatype_categ"] = dataset["datatype_lowercase"].map(datatype_mapping) dataset = dataset.fillna("NaN") dataset["index"] = dataset.index # uni_datatype=dataset['datatype_categ'].unique() # uni_unit=dataset['unit_categ'].unique() unique_labels_set = set() dataset["Metalabel"] = "" for i in range(0, len(dataset["Metalabel"])): concat = (str(dataset["unit_categ"][i]), str(dataset["datatype_categ"][i])) keys = [k for k, v in metalabel.items() if v == concat] dataset["Metalabel"][i] = keys[0] unique_labels_set.add(keys[0]) unique_label = list(unique_labels_set) print(unique_label) return dataset def encode(aas_df, model): # Einsatz von Sentence Bert um Embeddings zu kreieren aas_df["PreferredName"] = "Name: " + aas_df["PreferredName"].astype(str) aas_df["Definition"] = "Description: " + aas_df["Definition"].astype(str) + "; " corpus_names = aas_df.loc[:, "PreferredName"] corpus_definitions = aas_df.loc[:, "Definition"] embeddings_definitions = model.encode(corpus_definitions, show_progress_bar=True) embeddings_names = model.encode(corpus_names, show_progress_bar=True) concat_name_def_emb = np.concatenate( (embeddings_definitions, embeddings_names), axis=1 ) # aas_df['EmbeddingDefinition'] = embeddings_definitions.tolist() # aas_df['EmbeddingName'] = embeddings_names.tolist() aas_df["EmbeddingNameDefinition"] = concat_name_def_emb.tolist() return aas_df def convert_to_list(aas_df): # Für die Datenbank werden teilweise Listen gebraucht aas_index = aas_df.index.tolist() aas_index_str = [str(r) for r in aas_index] se_content = aas_df["SEContent"].tolist() se_embedding_name_definition = aas_df["EmbeddingNameDefinition"].tolist() aas_df_dropped = aas_df.drop( ["EmbeddingNameDefinition", "SEContent", "SEValue"], axis=1 ) metadata = aas_df_dropped.to_dict("records") return metadata, aas_index_str, se_content, se_embedding_name_definition def set_up_chroma( metadata, aas_index_str, se_content, se_embedding_name_definition, aas_name, client ): aas_name = aas_name.lower() # Kein Großbuchstaben in Datenbank erlaubt print(aas_name) # client = chromadb.Client(Settings( # chroma_db_impl="duckdb+parquet", # persist_directory="./drive/My Drive/Colab/NLP/SemantischeInteroperabilität/Deployment" # Optional, defaults to .chromadb/ in the current directory # )) emb_fn = embedding_functions.SentenceTransformerEmbeddingFunction( model_name="gart-labor/eng-distilBERT-se-eclass" ) collection = client.get_or_create_collection( name=aas_name, embedding_function=emb_fn ) aas_content_string = [] # Umwandeln in Json damit es in db geschrieben werden kann for element in se_content: content = json.dumps(element) aas_content_string.append(content) items = collection.count() # returns the number of items in the collection print(collection) print("Datenbank erstellt, Anzahl Items:") print(items) if items == 0: # Hinzufügen der SE Inhalte, der Embeddings und weiterer Metadaten in collection der Datenbank collection.add( documents=aas_content_string, embeddings=se_embedding_name_definition, metadatas=metadata, ids=aas_index_str, ) items = collection.count() # returns the number of items in the collection print("------------") print("Datenbank befüllt, Anzahl items:") print(items) else: print("-----------") print("AAS schon vorhanden") return collection def read_aas(aas, submodels, assets, conceptDescriptions, submodels_ids, metalabel): df = pd.DataFrame( columns=[ "AASId", "AASIdShort", "SubmodelId", "SubmodelName", "SubmodelSemanticId", "SEContent", "SESemanticId", "SEModelType", "SEIdShort", "SEValue", "Definition", "PreferredName", "Datatype", "Unit", ] ) aas_id = aas[0]["identification"]["id"] aas_name = aas[0]["idShort"] # Aufbereiten aller Concept descriptions als pandas dataframe, damit diese nachher einfacher untersucht werden können df_cd = prepare_cd(conceptDescriptions) # Auslesen der Teilmodelle for submodel in submodels: submodel_name = submodel["idShort"] submodel_id = submodel["identification"]["id"] # Muss gemacht werden, da Anzahl der Teilmodelle innerhalb der AAS und des Env nicht immer übereisntimmen if submodel_id in submodels_ids: semantic_id_existing = submodel["semanticId"]["keys"] if not semantic_id_existing: submodel_semantic_id = "Not defined" else: submodel_semantic_id = semantic_id_existing[0]["value"] submodel_elements = submodel["submodelElements"] # Auslesen Submodel Elements for submodel_element in submodel_elements: content = [] content.append(submodel_element) ( se_type, se_semantic_id, se_semantic_id_local, se_id_short, value, ) = get_values(submodel_element) # When Concept Description local dann auslesen der Concept Description if se_semantic_id_local == True: cd_content = get_concept_description(se_semantic_id, df_cd) definition = cd_content["Definition"] preferred_name = cd_content["PreferredName"] datatype = cd_content["Datatype"] unit = cd_content["Unit"] else: definition = "NaN" preferred_name = "NaN" datatype = "NaN" unit = "NaN" new_row = pd.DataFrame( { "AASId": aas_id, "AASIdShort": aas_name, "SubmodelId": submodel_id, "SubmodelName": submodel_name, "SubmodelSemanticId": submodel_semantic_id, "SEContent": content, "SESemanticId": se_semantic_id, "SEModelType": se_type, "SEIdShort": se_id_short, "SEValue": value, "Definition": definition, "PreferredName": preferred_name, "Datatype": datatype, "Unit": unit, } ) df = pd.concat([df, new_row], ignore_index=True) # Wenn Submodel Element Collection dann diese Werte auch auslesen if se_type == "SubmodelElementCollection": df = get_values_sec( df_cd, content, df, aas_id, aas_name, submodel_id, submodel_name, submodel_semantic_id, ) else: continue df = set_up_metadata(metalabel, df) return df, aas_name def index_corpus(data, model, metalabel, client_chroma): # Start Punkt aas = data["assetAdministrationShells"] aas_submodels = aas[0]["submodels"] submodels_ids = [] for submodel in aas_submodels: submodels_ids.append(submodel["keys"][0]["value"]) submodels = data["submodels"] conceptDescriptions = data["conceptDescriptions"] assets = data["assets"] aas_df, aas_name = read_aas( aas, submodels, assets, conceptDescriptions, submodels_ids, metalabel ) # aas_df_embeddings = encode(aas_df, model) aas_df = encode(aas_df, model) metadata, aas_index_str, se_content, se_embedding_name_definition = convert_to_list( aas_df ) collection = set_up_chroma( metadata, aas_index_str, se_content, se_embedding_name_definition, aas_name, client_chroma, ) return collection # if __name__ == '__main__': # create_database = index_corpus(aas = 'festo_switch.json')