Spaces:
Sleeping
Sleeping
| from astrapy import DataAPIClient | |
| xforce_bcl_endpoint="https://b3cc3888-098f-45f7-9087-260ed43b742c-us-east-2.apps.astra.datastax.com" | |
| xforce_bcl_db_token="AstraCS:dRZIxPSXJqqWPPgJjMpoqnrJ:999c021256da787c45f0309e589c9e99884c6559a9337d09de9fa15cd88843dc" | |
| #Initialize the client | |
| client = DataAPIClient(xforce_bcl_db_token) | |
| db = client.get_database_by_api_endpoint(xforce_bcl_endpoint) | |
| coll="bcl" | |
| ie="individual_experiments" | |
| app="app" | |
| memory="memory" | |
| history="history" | |
| applayer="applayer" | |
| filtered_candidates="filtered_candidates" | |
| ooda="ooda" | |
| class DatabaseEngine(): | |
| def __init__(self): | |
| pass | |
| def Insert(self,data): | |
| collection=db.get_collection(coll) | |
| result = collection.insert_one(data) | |
| return result | |
| def Insert_IE(self,data): | |
| try: | |
| collection=db.get_collection(ie) | |
| collection.insert_one(data) | |
| return True | |
| except Exception as e: | |
| return False | |
| def InsertMemory(self,data): | |
| try: | |
| collection=db.get_collection(memory) | |
| collection.insert_one(data) | |
| return True | |
| except Exception as e: | |
| return False | |
| def Fetch_IE(self,bcl_id): | |
| collection=db.get_collection(ie) | |
| documents=collection.find({"bcl_id":bcl_id}) | |
| released_docs=[] | |
| for doc in documents: | |
| released_docs.append(doc) | |
| return released_docs | |
| def FetchMemory(self,bcl_id): | |
| collection=db.get_collection(memory) | |
| document=collection.find_one({"bcl_id":bcl_id}) | |
| return document | |
| def UpdateMemory(self,bcl_id,operation): | |
| collection=db.get_collection(memory) | |
| update_result_a = collection.update_one( | |
| {"bcl_id": bcl_id}, | |
| {"$push": {"executed_operations":operation}} | |
| ) | |
| #UPDATED | |
| def UpdateProject(self,projectid,plan): | |
| collection=db.get_collection(coll) | |
| update_result_a = collection.update_one( | |
| {"project.project_id": projectid}, | |
| {"$push": {"plans":plan}} | |
| ) | |
| def CheckEmpty(self,id): | |
| collection=db.get_collection(history) | |
| cursor=collection.find({"bcl_id":id}) | |
| data=list(cursor) | |
| if not data: | |
| return True | |
| else: | |
| return False | |
| def CheckEmptyAppLayer(self,id): | |
| collection=db.get_collection(applayer) | |
| cursor=collection.find({"bcl_id":id}) | |
| data=list(cursor) | |
| if not data: | |
| return True | |
| else: | |
| return False | |
| def CheckEmptyOps(self,id): | |
| collection=db.get_collection(memory) | |
| cursor=collection.find({"bcl_id":id}) | |
| data=list(cursor) | |
| if not data: | |
| return True | |
| else: | |
| return False | |
| def CheckEmptyProjects(self,id): | |
| collection=db.get_collection(coll) | |
| cursor=collection.find({"bcl_id":id}) | |
| data=list(cursor) | |
| if not data: | |
| return True | |
| else: | |
| return False | |
| def Insert_Conversation(self,data): | |
| collection=db.get_collection(history) | |
| collection.insert_one(data) | |
| def Update_Conversation(self,bcl_id,data): | |
| collection=db.get_collection(history) | |
| update_result_a = collection.update_one( | |
| {"bcl_id": bcl_id}, | |
| {"$push": {"messages": {"$each": data}}} | |
| ) | |
| def FetchConversation(self,id): | |
| collection=db.get_collection(history) | |
| document=collection.find_one({"bcl_id":id}) | |
| return document | |
| def Insert_AppLayer(self,data): | |
| collection=db.get_collection(applayer) | |
| collection.insert_one(data) | |
| def Update_AppLayer(self,bcl_id,data): | |
| collection=db.get_collection(applayer) | |
| update_result_a = collection.update_one( | |
| {"bcl_id": bcl_id}, | |
| {"$push": {"messages": {"$each": data}}} | |
| ) | |
| def Fetch_AppLayer(self,id): | |
| collection=db.get_collection(applayer) | |
| document=collection.find_one({"bcl_id":id}) | |
| return document | |
| def Update_Status(self,project_id,bcl_id): | |
| try: | |
| collection=db.get_collection(coll) | |
| # This is the query to find the correct document AND the specific plan. | |
| filter_query = { | |
| "project.project_id": project_id, | |
| "project.plans.bcl_id": bcl_id | |
| } | |
| update_operation = { | |
| "$set": { | |
| "project.plans.$.status": "done" | |
| } | |
| } | |
| collection.update_one(filter_query, update_operation) | |
| #collection.update_one(filter_query, update_operation) | |
| return True | |
| except Exception as e: | |
| return False | |
| def Find_User(self,uid): | |
| collection = db.get_collection(coll) | |
| cursor = collection.find({"user_id":uid}) | |
| documents=[] | |
| for document in cursor: | |
| bcl_id=document.get("bcl_id") | |
| status=document.get("status") | |
| newdoc={"id":bcl_id,"status":status} | |
| documents.append(newdoc) | |
| return documents | |
| def Find_Task(self,id): | |
| collection = db.get_collection(coll) | |
| cursor = collection.find({"bcl_id":id}) | |
| documents=[] | |
| for document in cursor: | |
| documents.append(document) | |
| return documents | |
| def Fetch_Status(self,bcl_id): | |
| collection=db.get_collection(coll) | |
| document=collection.find_one({"bcl_id":bcl_id}) | |
| return {"status":document["status"]} | |
| ''' | |
| def Fetch_Projects(self,user_id): | |
| collection=db.get_collection(coll) | |
| cursor=collection.find({"user_id":user_id}) | |
| documents=[] | |
| for document in cursor: | |
| projectid=document.get("project_id") | |
| status=document.get("status") | |
| documents.append({"project":projectid,"status":status}) | |
| return documents | |
| ''' | |
| def Fetch_Projects(self, user_id): | |
| #Get the correct collection from the database | |
| collection = db.get_collection(coll) | |
| #Find the document for the given user_id | |
| document = collection.find_one({"user_id": user_id}) | |
| #Initialize a list to hold the project details | |
| projects = [] | |
| #Check if a document was found and if it contains a 'project' field | |
| if document and "project" in document: | |
| # Get the project dictionary from the document | |
| project_data = document["project"] | |
| # Get the project_id from the project dictionary | |
| project_id = project_data.get("project_id") | |
| # Get the list of plans from the project data | |
| #plans = project_data.get("plans", []) | |
| # Iterate over each plan to extract its status and include the project_id | |
| #for plan in plans: | |
| projects.append(project_id) | |
| return projects | |
| def RegisterUser(self,data): | |
| collection=db.get_collection(app) | |
| collection.insert_one(data) | |
| def UpdateUser(self,uid,data): | |
| try: | |
| collection=db.get_collection(app) | |
| collection.update_one( | |
| {"username": uid}, | |
| {"$push": {"projects": data}} | |
| ) | |
| return True | |
| except Exception as e: | |
| return False | |
| def InsertFiltered(self,data): | |
| collection=db.get_collection(filtered_candidates) | |
| collection.insert_one(data) | |
| def FetchFiltered(self,bcl_id): | |
| collection=db.get_collection(filtered_candidates) | |
| document=collection.find_one({"bcl_id":bcl_id}) | |
| return document | |
| def CheckEmptyFiltered(self,id): | |
| collection=db.get_collection(filtered_candidates) | |
| cursor=collection.find({"bcl_id":id}) | |
| data=list(cursor) | |
| if not data: | |
| return True | |
| else: | |
| return False | |
| def UpdateFiltered(self,bcl_id,data): | |
| collection=db.get_collection(filtered_candidates) | |
| update_result_a = collection.update_one( | |
| {"bcl_id": bcl_id}, | |
| {"$push": {"candidates": {"$each": data}}} | |
| ) | |
| def Insert_OODA(self,data): | |
| collection=db.get_collection(ooda) | |
| collection.insert_one(data) | |
| def update_OODA(self,bcl_id,data): | |
| collection=db.get_collection(ooda) | |
| update_result_a=collection.update_one( | |
| {"bcl_id":bcl_id}, | |
| {"$push":{"ooda_loop":{"$each":data}}} | |
| ) | |
| def CheckEmptyOODA(self,id): | |
| collection=db.get_collection(ooda) | |
| cursor=collection.find({"bcl_id":id}) | |
| data=list(cursor) | |
| if not data: | |
| return True | |
| else: | |
| return False | |
| def FetchOODA(self,bcl_id): | |
| collection=db.get_collection(ooda) | |
| document=collection.find_one({"bcl_id":bcl_id}) | |
| return document |