from pymongo import MongoClient from pydantic import BaseModel, HttpUrl from datetime import datetime from typing import Optional, List, Dict, Union class MongoService: def __init__(self, mongo_url:str, database:str): self.mongo_url = mongo_url self.mongo_client = MongoClient(self.mongo_url) self.mongo_database = self.mongo_client[database] def insert(self, collection:str, data:Dict): inserted = self.mongo_database[collection].insert_one(data) return def get(self, collection:str, filter:Dict, fields_to_retrieve:List=[]): fields = {} if fields_to_retrieve != []: for field in fields_to_retrieve: fields[field] = 1 retrieved_data = list(self.mongo_database[collection].find(filter,fields)) return retrieved_data def update(self, collection:str, filter:Dict, update_value:Dict, many=False): #myquery = { "address": { "$regex": "^S" } } #newvalues = { "$set": { "name": "Minnie" } } if many == True: updated = self.mongo_database[collection].update_many(filter, update_value) else: updated = self.mongo_database[collection].update_one(filter, update_value) return