brain / Brain /src /firebase /firebase.py
Kotta
bugfix(#136): autogpt interface issue fixed for infinit loop and applied with finish command as well.
49be943
import json
import os
from typing import Any
import firebase_admin
from firebase_admin import db
from firebase_admin import credentials
from Brain.src.common.assembler import Assembler
from Brain.src.common.brain_exception import BrainException
from Brain.src.common.http_response_codes import responses
from Brain.src.common.utils import FIREBASE_STORAGE_BUCKET, FIREBASE_REALTIME_DATABASE
from Brain.src.logs import logger
from Brain.src.model.req_model import ReqModel
from Brain.src.model.requests.request_model import BasicReq
def initialize_app(setting: ReqModel) -> firebase_admin.App:
app_name = get_firebase_admin_name(setting.uuid)
# Check if the app is already initialized
try:
app = firebase_admin.get_app(app_name)
return app
# if app is not None:
# # Delete the existing app
# firebase_admin.delete_app(app)
except Exception as ex:
logger.warn(
title="firebase init",
message=f"this app name: {app_name} does not exist",
)
return firebase_admin.initialize_app(
get_firebase_cred(setting),
{
"storageBucket": FIREBASE_STORAGE_BUCKET,
"databaseURL": FIREBASE_REALTIME_DATABASE,
},
name=app_name,
)
def get_firebase_admin_name(uuid: str = ""):
return f"firebase_admin_{uuid}"
def firebase_admin_with_setting(data: BasicReq):
# firebase admin init
assembler = Assembler()
setting = assembler.to_req_model(data.confs)
try:
firebase_app = initialize_app(setting)
except Exception as ex:
raise BrainException(code=507, message=responses[507])
return setting, firebase_app
def get_firebase_cred(setting: ReqModel):
if os.path.exists("Brain/firebase_cred.json"):
file = open("Brain/firebase_cred.json")
cred = json.load(file)
file.close()
return credentials.Certificate(cred)
else:
cred = json.loads(setting.firebase_key)
return credentials.Certificate(cred)
"""
delete data from real time database of firebase using reference link
"""
def delete_data_from_realtime(
reference_link: str, firebase_app: firebase_admin.App
) -> None:
ref = db.reference(reference_link, app=firebase_app)
ref.delete()