originbio-component1 / databaseengine.py
Junaidb's picture
Create databaseengine.py
073ab01 verified
from astrapy import DataAPIClient
xforce_bcl_endpoint="https://b3cc3888-098f-45f7-9087-260ed43b742c-us-east-2.apps.astra.datastax.com"
xforce_bcl_db_token="AstraCS:dRZIxPSXJqqWPPgJjMpoqnrJ:999c021256da787c45f0309e589c9e99884c6559a9337d09de9fa15cd88843dc"
#Initialize the client
client = DataAPIClient(xforce_bcl_db_token)
db = client.get_database_by_api_endpoint(xforce_bcl_endpoint)
coll="bcl"
ie="individual_experiments"
app="app"
memory="memory"
history="history"
applayer="applayer"
filtered_candidates="filtered_candidates"
ooda="ooda"
class DatabaseEngine():
def __init__(self):
pass
def Insert(self,data):
collection=db.get_collection(coll)
result = collection.insert_one(data)
return result
def Insert_IE(self,data):
try:
collection=db.get_collection(ie)
collection.insert_one(data)
return True
except Exception as e:
return False
def InsertMemory(self,data):
try:
collection=db.get_collection(memory)
collection.insert_one(data)
return True
except Exception as e:
return False
def Fetch_IE(self,bcl_id):
collection=db.get_collection(ie)
documents=collection.find({"bcl_id":bcl_id})
released_docs=[]
for doc in documents:
released_docs.append(doc)
return released_docs
def FetchMemory(self,bcl_id):
collection=db.get_collection(memory)
document=collection.find_one({"bcl_id":bcl_id})
return document
def UpdateMemory(self,bcl_id,operation):
collection=db.get_collection(memory)
update_result_a = collection.update_one(
{"bcl_id": bcl_id},
{"$push": {"executed_operations":operation}}
)
#UPDATED
def UpdateProject(self,projectid,plan):
collection=db.get_collection(coll)
update_result_a = collection.update_one(
{"project.project_id": projectid},
{"$push": {"plans":plan}}
)
def CheckEmpty(self,id):
collection=db.get_collection(history)
cursor=collection.find({"bcl_id":id})
data=list(cursor)
if not data:
return True
else:
return False
def CheckEmptyAppLayer(self,id):
collection=db.get_collection(applayer)
cursor=collection.find({"bcl_id":id})
data=list(cursor)
if not data:
return True
else:
return False
def CheckEmptyOps(self,id):
collection=db.get_collection(memory)
cursor=collection.find({"bcl_id":id})
data=list(cursor)
if not data:
return True
else:
return False
def CheckEmptyProjects(self,id):
collection=db.get_collection(coll)
cursor=collection.find({"bcl_id":id})
data=list(cursor)
if not data:
return True
else:
return False
def Insert_Conversation(self,data):
collection=db.get_collection(history)
collection.insert_one(data)
def Update_Conversation(self,bcl_id,data):
collection=db.get_collection(history)
update_result_a = collection.update_one(
{"bcl_id": bcl_id},
{"$push": {"messages": {"$each": data}}}
)
def FetchConversation(self,id):
collection=db.get_collection(history)
document=collection.find_one({"bcl_id":id})
return document
def Insert_AppLayer(self,data):
collection=db.get_collection(applayer)
collection.insert_one(data)
def Update_AppLayer(self,bcl_id,data):
collection=db.get_collection(applayer)
update_result_a = collection.update_one(
{"bcl_id": bcl_id},
{"$push": {"messages": {"$each": data}}}
)
def Fetch_AppLayer(self,id):
collection=db.get_collection(applayer)
document=collection.find_one({"bcl_id":id})
return document
def Update_Status(self,project_id,bcl_id):
try:
collection=db.get_collection(coll)
# This is the query to find the correct document AND the specific plan.
filter_query = {
"project.project_id": project_id,
"project.plans.bcl_id": bcl_id
}
update_operation = {
"$set": {
"project.plans.$.status": "done"
}
}
collection.update_one(filter_query, update_operation)
#collection.update_one(filter_query, update_operation)
return True
except Exception as e:
return False
def Find_User(self,uid):
collection = db.get_collection(coll)
cursor = collection.find({"user_id":uid})
documents=[]
for document in cursor:
bcl_id=document.get("bcl_id")
status=document.get("status")
newdoc={"id":bcl_id,"status":status}
documents.append(newdoc)
return documents
def Find_Task(self,id):
collection = db.get_collection(coll)
cursor = collection.find({"bcl_id":id})
documents=[]
for document in cursor:
documents.append(document)
return documents
def Fetch_Status(self,bcl_id):
collection=db.get_collection(coll)
document=collection.find_one({"bcl_id":bcl_id})
return {"status":document["status"]}
'''
def Fetch_Projects(self,user_id):
collection=db.get_collection(coll)
cursor=collection.find({"user_id":user_id})
documents=[]
for document in cursor:
projectid=document.get("project_id")
status=document.get("status")
documents.append({"project":projectid,"status":status})
return documents
'''
def Fetch_Projects(self, user_id):
#Get the correct collection from the database
collection = db.get_collection(coll)
#Find the document for the given user_id
document = collection.find_one({"user_id": user_id})
#Initialize a list to hold the project details
projects = []
#Check if a document was found and if it contains a 'project' field
if document and "project" in document:
# Get the project dictionary from the document
project_data = document["project"]
# Get the project_id from the project dictionary
project_id = project_data.get("project_id")
# Get the list of plans from the project data
#plans = project_data.get("plans", [])
# Iterate over each plan to extract its status and include the project_id
#for plan in plans:
projects.append(project_id)
return projects
def RegisterUser(self,data):
collection=db.get_collection(app)
collection.insert_one(data)
def UpdateUser(self,uid,data):
try:
collection=db.get_collection(app)
collection.update_one(
{"username": uid},
{"$push": {"projects": data}}
)
return True
except Exception as e:
return False
def InsertFiltered(self,data):
collection=db.get_collection(filtered_candidates)
collection.insert_one(data)
def FetchFiltered(self,bcl_id):
collection=db.get_collection(filtered_candidates)
document=collection.find_one({"bcl_id":bcl_id})
return document
def CheckEmptyFiltered(self,id):
collection=db.get_collection(filtered_candidates)
cursor=collection.find({"bcl_id":id})
data=list(cursor)
if not data:
return True
else:
return False
def UpdateFiltered(self,bcl_id,data):
collection=db.get_collection(filtered_candidates)
update_result_a = collection.update_one(
{"bcl_id": bcl_id},
{"$push": {"candidates": {"$each": data}}}
)
def Insert_OODA(self,data):
collection=db.get_collection(ooda)
collection.insert_one(data)
def update_OODA(self,bcl_id,data):
collection=db.get_collection(ooda)
update_result_a=collection.update_one(
{"bcl_id":bcl_id},
{"$push":{"ooda_loop":{"$each":data}}}
)
def CheckEmptyOODA(self,id):
collection=db.get_collection(ooda)
cursor=collection.find({"bcl_id":id})
data=list(cursor)
if not data:
return True
else:
return False
def FetchOODA(self,bcl_id):
collection=db.get_collection(ooda)
document=collection.find_one({"bcl_id":bcl_id})
return document