Spaces:
Runtime error
Runtime error
from sentence_transformers import SentenceTransformer, util | |
import json | |
import time | |
import pandas as pd | |
import numpy as np | |
import pickle | |
import chromadb | |
from chromadb.config import Settings | |
from chromadb.utils import embedding_functions | |
from chromadb.db.clickhouse import NoDatapointsException | |
def prepare_cd(conceptDescriptions): | |
df_cd = pd.DataFrame( | |
columns=["SemanticId", "Definition", "PreferredName", "Datatype", "Unit"] | |
) | |
# In den leeren DF werden alle Concept Descriptions eingelesen | |
for cd in conceptDescriptions: | |
semantic_id = cd["identification"]["id"] | |
data_spec = cd["embeddedDataSpecifications"][0]["dataSpecificationContent"] | |
preferred_name = data_spec["preferredName"] | |
short_name = data_spec["shortName"] | |
if len(preferred_name) > 1: | |
for name_variant in preferred_name: | |
if ( | |
name_variant["language"] == "EN" | |
or name_variant["language"] == "en" | |
or name_variant["language"] == "EN?" | |
): | |
name = name_variant["text"] | |
elif len(preferred_name) == 1: | |
name = preferred_name[0]["text"] | |
elif len(preferred_name) == 0: | |
short_name = data_spec["shortName"] | |
if len(short_name) == 0: | |
name = "NaN" | |
else: | |
name = short_name[0]["text"] | |
definition = data_spec["definition"] | |
if len(definition) > 1: | |
for definition_variant in definition: | |
if ( | |
definition_variant["language"] == "EN" | |
or definition_variant["language"] == "en" | |
or definition_variant["language"] == "EN?" | |
): | |
chosen_def = definition_variant["text"] | |
elif len(definition) == 1: | |
chosen_def = definition[0]["text"] | |
elif len(definition) == 0: | |
chosen_def = "NaN" | |
if data_spec["dataType"] == "": | |
datatype = "NaN" | |
else: | |
datatype = data_spec["dataType"] | |
if data_spec["unit"] == "": | |
unit = "NaN" | |
else: | |
unit = data_spec["unit"] | |
new_entry = pd.DataFrame( | |
{ | |
"SemanticId": semantic_id, | |
"Definition": chosen_def, | |
"PreferredName": name, | |
"Datatype": datatype, | |
"Unit": unit, | |
}, | |
index=[0], | |
) | |
df_cd = pd.concat([df_cd, new_entry], ignore_index=True) | |
return df_cd | |
def get_values(submodel_element): | |
# Auslesen der Submodel Element Werte | |
se_type = submodel_element["modelType"]["name"] | |
se_semantic_id = submodel_element["semanticId"]["keys"][0]["value"] | |
se_semantic_id_local = submodel_element["semanticId"]["keys"][0]["local"] | |
se_id_short = submodel_element["idShort"] | |
value = [] | |
se_value = submodel_element["value"] | |
value.append(se_value) | |
return se_type, se_semantic_id, se_semantic_id_local, se_id_short, value | |
def get_concept_description(semantic_id, df_cd): | |
cd_content = df_cd.loc[df_cd["SemanticId"] == semantic_id] | |
if cd_content.empty: | |
cd_content = pd.DataFrame( | |
{ | |
"SemanticId": semantic_id, | |
"Definition": "NaN", | |
"PreferredName": "NaN", | |
"Datatype": "NaN", | |
"Unit": "NaN", | |
}, | |
index=[0], | |
) | |
cd_content = cd_content.iloc[0] | |
return cd_content | |
def get_values_sec( | |
df_cd, | |
content, | |
df, | |
aas_id, | |
aas_name, | |
submodel_id, | |
submodel_name, | |
submodel_semantic_id, | |
): | |
collection_values = content[0]["value"] | |
for element in collection_values: | |
content = [] | |
content.append(element) | |
se_type, se_semantic_id, se_semantic_id_local, se_id_short, value = get_values( | |
element | |
) | |
if se_type == "SubmodelElementCollection": | |
if se_semantic_id_local == True: | |
cd_content = get_concept_description(se_semantic_id, df_cd) | |
definition = cd_content["Definition"] | |
preferred_name = cd_content["PreferredName"] | |
datatype = cd_content["Datatype"] | |
unit = cd_content["Unit"] | |
else: | |
definition = "NaN" | |
preferred_name = "NaN" | |
datatype = "NaN" | |
unit = "NaN" | |
new_row = pd.DataFrame( | |
{ | |
"AASId": aas_id, | |
"AASIdShort": aas_name, | |
"SubmodelId": submodel_id, | |
"SubmodelName": submodel_name, | |
"SubmodelSemanticId": submodel_semantic_id, | |
"SEContent": content, | |
"SESemanticId": se_semantic_id, | |
"SEModelType": se_type, | |
"SEIdShort": se_id_short, | |
"SEValue": value, | |
"Definition": definition, | |
"PreferredName": preferred_name, | |
"Datatype": datatype, | |
"Unit": unit, | |
} | |
) | |
df = pd.concat([df, new_row], ignore_index=True) | |
content = [] | |
content.append(element) | |
# Rekursive Funktion -> so oft durchlaufen bis unterste Ebene der Collections erreicht ist, so werden verschachteltet SECs bis zum Ende ausgelesen | |
df = get_values_sec( | |
df_cd, | |
content, | |
df, | |
aas_id, | |
aas_name, | |
submodel_id, | |
submodel_name, | |
submodel_semantic_id, | |
) | |
else: | |
if se_semantic_id_local == True: | |
cd_content = get_concept_description(se_semantic_id, df_cd) | |
definition = cd_content["Definition"] | |
preferred_name = cd_content["PreferredName"] | |
datatype = cd_content["Datatype"] | |
unit = cd_content["Unit"] | |
else: | |
definition = "NaN" | |
preferred_name = "NaN" | |
datatype = "NaN" | |
unit = "NaN" | |
new_row = pd.DataFrame( | |
{ | |
"AASId": aas_id, | |
"AASIdShort": aas_name, | |
"SubmodelId": submodel_id, | |
"SubmodelName": submodel_name, | |
"SubmodelSemanticId": submodel_semantic_id, | |
"SEContent": content, | |
"SESemanticId": se_semantic_id, | |
"SEModelType": se_type, | |
"SEIdShort": se_id_short, | |
"SEValue": value, | |
"Definition": definition, | |
"PreferredName": preferred_name, | |
"Datatype": datatype, | |
"Unit": unit, | |
} | |
) | |
df = pd.concat([df, new_row], ignore_index=True) | |
return df | |
def set_up_metadata(metalabel, df): | |
datatype_mapping = { | |
"boolean": "BOOLEAN", | |
"string": "STRING", | |
"string_translatable": "STRING", | |
"translatable_string": "STRING", | |
"non_translatable_string": "STRING", | |
"date": "DATE", | |
"data_time": "DATE", | |
"uri": "URI", | |
"int": "INT", | |
"int_measure": "INT", | |
"int_currency": "INT", | |
"integer": "INT", | |
"real": "REAL", | |
"real_measure": "REAL", | |
"real_currency": "REAL", | |
"enum_code": "ENUM_CODE", | |
"enum_int": "ENUM_CODE", | |
"ENUM_REAL": "ENUM_CODE", | |
"ENUM_RATIONAL": "ENUM_CODE", | |
"ENUM_BOOLEAN": "ENUM_CODE", | |
"ENUM_STRING": "ENUM_CODE", | |
"enum_reference": "ENUM_CODE", | |
"enum_instance": "ENUM_CODE", | |
"set(b1,b2)": "SET", | |
"constrained_set(b1,b2,cmn,cmx)": "SET", | |
"set [0,?]": "SET", | |
"set [1,?]": "SET", | |
"set [1, ?]": "SET", | |
"nan": "NaN", | |
"media_type": "LARGE_OBJECT_TYPE", | |
} | |
unit_mapping = { | |
"nan": "NaN", | |
"hertz": "FREQUENCY", | |
"hz": "FREQUENCY", | |
"pa": "PRESSURE", | |
"pascal": "PRESSURE", | |
"n/m²": "PRESSURE", | |
"bar": "PRESSURE", | |
"%": "SCALARS_PERC", | |
"w": "POWER", | |
"watt": "POWER", | |
"kw": "POWER", | |
"kg/m³": "CHEMISTRY", | |
"m²/s": "CHEMISTRY", | |
"pa*s": "CHEMISTRY", | |
"v": "ELECTRICAL", | |
"volt": "ELECTRICAL", | |
"db": "ACOUSTICS", | |
"db(a)": "ACOUSTICS", | |
"k": "TEMPERATURE", | |
"°c": "TEMPERATURE", | |
"n": "MECHANICS", | |
"newton": "MECHANICS", | |
"kg/s": "FLOW", | |
"kg/h": "FLOW", | |
"m³/s": "FLOW", | |
"m³/h": "FLOW", | |
"l/s": "FLOW", | |
"l/h": "FLOW", | |
"µm": "LENGTH", | |
"mm": "LENGTH", | |
"cm": "LENGTH", | |
"dm": "LENGTH", | |
"m": "LENGTH", | |
"meter": "LENGTH", | |
"m/s": "SPEED", | |
"km/h": "SPEED", | |
"s^(-1)": "FREQUENCY", | |
"1/s": "FREQUENCY", | |
"s": "TIME", | |
"h": "TIME", | |
"min": "TIME", | |
"d": "TIME", | |
"hours": "TIME", | |
"a": "ELECTRICAL", | |
"m³": "VOLUME", | |
"m²": "AREA", | |
"rpm": "FLOW", | |
"nm": "MECHANICS", | |
"m/m": "MECHANICS", | |
"m³/m²s": "MECHANICS", | |
"w(m²*K)": "HEAT_TRANSFER", | |
"kwh": "ELECTRICAL", | |
"kg/(s*m²)": "FLOW", | |
"kg": "MASS", | |
"w/(m*k)": "HEAT_TRANSFER", | |
"m²*k/w": "HEAT_TRANSFER", | |
"j/s": "POWER", | |
} | |
dataset = df | |
dataset["unit_lowercase"] = dataset["Unit"] | |
dataset["unit_lowercase"] = dataset["unit_lowercase"].str.lower() | |
dataset["unit_categ"] = dataset["unit_lowercase"].map(unit_mapping) | |
dataset["datatype_lowercase"] = dataset["Datatype"] | |
dataset["datatype_lowercase"] = dataset["datatype_lowercase"].str.lower() | |
dataset["datatype_categ"] = dataset["datatype_lowercase"].map(datatype_mapping) | |
dataset = dataset.fillna("NaN") | |
dataset["index"] = dataset.index | |
# uni_datatype=dataset['datatype_categ'].unique() | |
# uni_unit=dataset['unit_categ'].unique() | |
unique_labels_set = set() | |
dataset["Metalabel"] = "" | |
for i in range(0, len(dataset["Metalabel"])): | |
concat = (str(dataset["unit_categ"][i]), str(dataset["datatype_categ"][i])) | |
keys = [k for k, v in metalabel.items() if v == concat] | |
dataset["Metalabel"][i] = keys[0] | |
unique_labels_set.add(keys[0]) | |
unique_label = list(unique_labels_set) | |
print(unique_label) | |
return dataset | |
def encode(aas_df, model): | |
# Einsatz von Sentence Bert um Embeddings zu kreieren | |
aas_df["PreferredName"] = "Name: " + aas_df["PreferredName"].astype(str) | |
aas_df["Definition"] = "Description: " + aas_df["Definition"].astype(str) + "; " | |
corpus_names = aas_df.loc[:, "PreferredName"] | |
corpus_definitions = aas_df.loc[:, "Definition"] | |
embeddings_definitions = model.encode(corpus_definitions, show_progress_bar=True) | |
embeddings_names = model.encode(corpus_names, show_progress_bar=True) | |
concat_name_def_emb = np.concatenate( | |
(embeddings_definitions, embeddings_names), axis=1 | |
) | |
# aas_df['EmbeddingDefinition'] = embeddings_definitions.tolist() | |
# aas_df['EmbeddingName'] = embeddings_names.tolist() | |
aas_df["EmbeddingNameDefinition"] = concat_name_def_emb.tolist() | |
return aas_df | |
def convert_to_list(aas_df): | |
# Für die Datenbank werden teilweise Listen gebraucht | |
aas_index = aas_df.index.tolist() | |
aas_index_str = [str(r) for r in aas_index] | |
se_content = aas_df["SEContent"].tolist() | |
se_embedding_name_definition = aas_df["EmbeddingNameDefinition"].tolist() | |
aas_df_dropped = aas_df.drop( | |
["EmbeddingNameDefinition", "SEContent", "SEValue"], axis=1 | |
) | |
metadata = aas_df_dropped.to_dict("records") | |
return metadata, aas_index_str, se_content, se_embedding_name_definition | |
def set_up_chroma( | |
metadata, aas_index_str, se_content, se_embedding_name_definition, aas_name, client | |
): | |
aas_name = aas_name.lower() | |
# Kein Großbuchstaben in Datenbank erlaubt | |
print(aas_name) | |
# client = chromadb.Client(Settings( | |
# chroma_db_impl="duckdb+parquet", | |
# persist_directory="./drive/My Drive/Colab/NLP/SemantischeInteroperabilität/Deployment" # Optional, defaults to .chromadb/ in the current directory | |
# )) | |
emb_fn = embedding_functions.SentenceTransformerEmbeddingFunction( | |
model_name="gart-labor/eng-distilBERT-se-eclass" | |
) | |
collection = client.get_or_create_collection( | |
name=aas_name, embedding_function=emb_fn | |
) | |
aas_content_string = [] | |
# Umwandeln in Json damit es in db geschrieben werden kann | |
for element in se_content: | |
content = json.dumps(element) | |
aas_content_string.append(content) | |
items = collection.count() # returns the number of items in the collection | |
print(collection) | |
print("Datenbank erstellt, Anzahl Items:") | |
print(items) | |
if items == 0: | |
# Hinzufügen der SE Inhalte, der Embeddings und weiterer Metadaten in collection der Datenbank | |
collection.add( | |
documents=aas_content_string, | |
embeddings=se_embedding_name_definition, | |
metadatas=metadata, | |
ids=aas_index_str, | |
) | |
items = collection.count() # returns the number of items in the collection | |
print("------------") | |
print("Datenbank befüllt, Anzahl items:") | |
print(items) | |
else: | |
print("-----------") | |
print("AAS schon vorhanden") | |
return collection | |
def read_aas(aas, submodels, assets, conceptDescriptions, submodels_ids, metalabel): | |
df = pd.DataFrame( | |
columns=[ | |
"AASId", | |
"AASIdShort", | |
"SubmodelId", | |
"SubmodelName", | |
"SubmodelSemanticId", | |
"SEContent", | |
"SESemanticId", | |
"SEModelType", | |
"SEIdShort", | |
"SEValue", | |
"Definition", | |
"PreferredName", | |
"Datatype", | |
"Unit", | |
] | |
) | |
aas_id = aas[0]["identification"]["id"] | |
aas_name = aas[0]["idShort"] | |
# Aufbereiten aller Concept descriptions als pandas dataframe, damit diese nachher einfacher untersucht werden können | |
df_cd = prepare_cd(conceptDescriptions) | |
# Auslesen der Teilmodelle | |
for submodel in submodels: | |
submodel_name = submodel["idShort"] | |
submodel_id = submodel["identification"]["id"] | |
# Muss gemacht werden, da Anzahl der Teilmodelle innerhalb der AAS und des Env nicht immer übereisntimmen | |
if submodel_id in submodels_ids: | |
semantic_id_existing = submodel["semanticId"]["keys"] | |
if not semantic_id_existing: | |
submodel_semantic_id = "Not defined" | |
else: | |
submodel_semantic_id = semantic_id_existing[0]["value"] | |
submodel_elements = submodel["submodelElements"] | |
# Auslesen Submodel Elements | |
for submodel_element in submodel_elements: | |
content = [] | |
content.append(submodel_element) | |
( | |
se_type, | |
se_semantic_id, | |
se_semantic_id_local, | |
se_id_short, | |
value, | |
) = get_values(submodel_element) | |
# When Concept Description local dann auslesen der Concept Description | |
if se_semantic_id_local == True: | |
cd_content = get_concept_description(se_semantic_id, df_cd) | |
definition = cd_content["Definition"] | |
preferred_name = cd_content["PreferredName"] | |
datatype = cd_content["Datatype"] | |
unit = cd_content["Unit"] | |
else: | |
definition = "NaN" | |
preferred_name = "NaN" | |
datatype = "NaN" | |
unit = "NaN" | |
new_row = pd.DataFrame( | |
{ | |
"AASId": aas_id, | |
"AASIdShort": aas_name, | |
"SubmodelId": submodel_id, | |
"SubmodelName": submodel_name, | |
"SubmodelSemanticId": submodel_semantic_id, | |
"SEContent": content, | |
"SESemanticId": se_semantic_id, | |
"SEModelType": se_type, | |
"SEIdShort": se_id_short, | |
"SEValue": value, | |
"Definition": definition, | |
"PreferredName": preferred_name, | |
"Datatype": datatype, | |
"Unit": unit, | |
} | |
) | |
df = pd.concat([df, new_row], ignore_index=True) | |
# Wenn Submodel Element Collection dann diese Werte auch auslesen | |
if se_type == "SubmodelElementCollection": | |
df = get_values_sec( | |
df_cd, | |
content, | |
df, | |
aas_id, | |
aas_name, | |
submodel_id, | |
submodel_name, | |
submodel_semantic_id, | |
) | |
else: | |
continue | |
df = set_up_metadata(metalabel, df) | |
return df, aas_name | |
def index_corpus(data, model, metalabel, client_chroma): | |
# Start Punkt | |
aas = data["assetAdministrationShells"] | |
aas_submodels = aas[0]["submodels"] | |
submodels_ids = [] | |
for submodel in aas_submodels: | |
submodels_ids.append(submodel["keys"][0]["value"]) | |
submodels = data["submodels"] | |
conceptDescriptions = data["conceptDescriptions"] | |
assets = data["assets"] | |
aas_df, aas_name = read_aas( | |
aas, submodels, assets, conceptDescriptions, submodels_ids, metalabel | |
) | |
# aas_df_embeddings = encode(aas_df, model) | |
aas_df = encode(aas_df, model) | |
metadata, aas_index_str, se_content, se_embedding_name_definition = convert_to_list( | |
aas_df | |
) | |
collection = set_up_chroma( | |
metadata, | |
aas_index_str, | |
se_content, | |
se_embedding_name_definition, | |
aas_name, | |
client_chroma, | |
) | |
return collection | |
# if __name__ == '__main__': | |
# create_database = index_corpus(aas = 'festo_switch.json') | |