TAPAS_WTQ_Chunking / weaviate_utils.py
jskinner215's picture
Update weaviate_utils.py
340cc83
raw
history blame
2.15 kB
import weaviate
from weaviate.embedded import EmbeddedOptions
from weaviate import Client
def initialize_weaviate_client():
return weaviate.Client(embedded_options=EmbeddedOptions())
def class_exists(client, class_name):
try:
client.schema.get_class(class_name)
return True
except:
return False
def map_dtype_to_weaviate(dtype):
if "int" in str(dtype):
return "int"
elif "float" in str(dtype):
return "number"
elif "bool" in str(dtype):
return "boolean"
else:
return "string"
def ingest_data_to_weaviate(client, dataframe, class_name, class_description):
# Create class schema
class_schema = {
"class": class_name,
"description": class_description,
"properties": [] # Start with an empty properties list
}
# Try to create the class without properties first
try:
client.schema.create({"classes": [class_schema]})
except weaviate.exceptions.SchemaValidationException:
# Class might already exist, so we can continue
pass
# Now, let's add properties to the class
for column_name, data_type in zip(dataframe.columns, dataframe.dtypes):
property_schema = {
"name": column_name,
"description": f"Property for {column_name}",
"dataType": [map_dtype_to_weaviate(data_type)]
}
try:
client.schema.property.create(class_name, property_schema)
except weaviate.exceptions.SchemaValidationException:
# Property might already exist, so we can continue
pass
# Ingest data
for index, row in dataframe.iterrows():
obj = {
"class": class_name,
"id": str(index),
"properties": row.to_dict()
}
client.data_object.create(obj)
# Log data ingestion
log_debug_info(f"Data ingested into Weaviate for class: {class_name}")
def get_class_schema(client, class_name):
all_classes = client.schema.get()["classes"]
for cls in all_classes:
if cls["class"] == class_name:
return cls
return None