chgpt / modules /model.py
Miguel Diaz
Update assistant + google oauth + google search
9c5c050
import json
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi
from fastapi import Request, Response
from . import log_module, security, settings, chat_functions
from datetime import timezone, datetime, timedelta
from pydantic import BaseModel, Field, PrivateAttr
from typing import Any, List, Self
import tiktoken
import uuid
client: MongoClient = MongoClient(settings.DB_URI, server_api=ServerApi('1'))
class __DB:
user = client.ChatDB.users
sess = client.ChatDB.sessions
personalities = client.ChatDB.personalityCores
DB: __DB = __DB()
tz: timezone = timezone(timedelta(hours=-4))
encoding = tiktoken.encoding_for_model(settings.GPT_MODEL)
def count_tokens_on_message(args: list) -> int:
token_count = 0
for n in args:
if n: token_count += len(encoding.encode(n))
return token_count
def get_personality_core (personality):
found = DB.personalities.find_one({"name":personality})
return found["prompt"].replace(
"{date}", datetime.now().strftime("%Y-%m-%D")
)
def get_all_personality_cores():
return [x["name"] for x in DB.personalities.find({})]
class Configs(BaseModel):
temperature: float = 0.5
frequency_penalty: float = 0.0
presence_penalty: float = 0.0
useTool: bool = True
assistant: str = "Chatsito clasico"
assistantPrompt: str = get_personality_core("Chatsito clasico")
def __init__(self, *args, **kwargs):
super(Configs, self).__init__(*args, **kwargs)
self.assistantPrompt = get_personality_core(self.assistant)
class Message(BaseModel):
role: str
content: str = ""
_tokens: int = 1
_thread = None
_running:bool = False
def __init__(self, *args, **kwargs):
super(Message, self).__init__(*args, **kwargs)
self._tokens = count_tokens_on_message([self.content])
def consume_stream(self, stream):
self._running = True
for chunk in stream:
self._tokens += 1
choice = chunk.choices[0]
if choice.finish_reason == "stop" or not choice.delta.content:
return
self.content += choice.delta.content
self._running = False
def get_tokens(self):
return self._tokens, self._tokensOutput
class ToolCallsInputFunction(BaseModel):
name: str = ""
arguments: str = ""
class ToolCallsInput(BaseModel):
id: str
function: ToolCallsInputFunction
type: str = "function"
class ToolCallsOutput(BaseModel):
role: str
tool_call_id: str
name: str
content: str
_tokens: int = 0
def __init__(self, *args, **kwargs):
super(ToolCallsOutput, self).__init__(*args, **kwargs)
self._tokens = count_tokens_on_message([self.content])
class MessageTool(BaseModel):
role: str
content: str = ""
tool_calls: list[ToolCallsInput]
_tokens: int = 0
_outputs: list[ToolCallsOutput] = []
def __init__(self, **kwargs):
stream = kwargs.pop("stream")
chunk = kwargs.pop("chunk")
kwargs["tool_calls"] = []
super(MessageTool, self).__init__(**kwargs)
while True:
choice = chunk.choices[0]
self._tokens += 1
if choice.finish_reason:
break
if chunk.choices[0].delta.tool_calls == None:
chunk = next(stream)
continue
tool_call = chunk.choices[0].delta.tool_calls[0]
if tool_call.id:
self.tool_calls.append(
ToolCallsInput(
id=tool_call.id,
function=ToolCallsInputFunction(
**tool_call.function.model_dump()
)))
elif tool_call.function.arguments:
self.tool_calls[-1].function.arguments += tool_call.function.arguments
chunk = next(stream)
if not chunk:
self._tokens += sum
break
def exec(self, gid):
for func in self.tool_calls:
self._outputs.append(ToolCallsOutput(
role="tool",
tool_call_id=func.id,
name=func.function.name ,
content=chat_functions.function_callbacks[func.function.name](json.loads(func.function.arguments), gid)
))
class Chat(BaseModel):
messages: List[Message|MessageTool|ToolCallsOutput]
tokens: int = 3
def __init__(self: Self, *args: list, **kwargs: dict):
temp = []
for i in kwargs.pop("messages"):
temp.append(Message(**i))
kwargs["messages"] = temp
super(Chat, self).__init__(*args, **kwargs)
self.tokens += sum([x._tokens+3 for x in self.messages])
def append(self: Self, message: Message|MessageTool):
if isinstance(message, Message):
self.messages.append(message)
self.tokens += message._tokens
else:
self.messages.append(message)
self.tokens += message._tokens
for out in message._outputs:
self.tokens += out._tokens
self.messages.append(out)
@classmethod
def new_msg(cls: Self, role: str, stream):
message = Message(role=role)
message.consume_stream(stream)
return message
@classmethod
def new_func(cls: Self, role: str, stream, chunk):
return MessageTool(role=role, stream=stream, chunk=chunk)
class Session(BaseModel):
gid: str
fprint: str
hashed: str
guid: str
public_key: str = ""
challenge: str = str(uuid.uuid4())
data: dict = {}
configs: Configs | None = None
def __init__(self, **kwargs):
kwargs["guid"] = kwargs.get("guid", str(uuid.uuid4()))
kwargs["hashed"] = security.sha256(kwargs["guid"] + kwargs["fprint"])
super(Session, self).__init__(**kwargs)
def validate_signature(self: Self, signature:str):
valid = security.validate_signature(self.public_key, signature, self.challenge)
if not valid:
security.raise_401("Cannot validate Session signature")
return True
@classmethod
def find_from_data(cls:Self, request: Request, data:dict) -> Self:
cookie_data:dict = security.token_from_cookie(request)
if "gid" not in cookie_data or "guid" not in cookie_data:
log_module.logger().error("Cookie without session needed data")
security.raise_401("gid or guid not in cookie")
if not (public_key := cookie_data.get("public_key", None)): # FIX Vuln Code
if request.scope["path"] != "/getToken":
log_module.logger(cookie_data["gid"]).error(f"User without public key saved | {json.dumps(cookie_data)}")
security.raise_401("the user must have a public key saved in token")
else:
log_module.logger(cookie_data['gid']).info("API public key set for user")
public_key = data["public_key"]
else:
cls.check_challenge(data["fingerprint"], cookie_data["challenge"])
session: Self = cls(
gid = cookie_data["gid"],
fprint = data["fingerprint"],
guid = cookie_data["guid"],
public_key = public_key,
data = data,
configs = Configs(**cookie_data["configs"])
)
if session.hashed != cookie_data["fprint"]:
log_module.logger(session.gid).error(f"Fingerprint didnt match | {json.dumps(cookie_data)}")
security.raise_401("Fingerprint didnt match")
session.add_challenge()
return session
def create_cookie_token(self:Self):
return security.create_jwt_token({
"gid":self.gid,
"guid": self.guid,
"fprint": self.hashed,
"public_key": self.public_key,
"challenge": self.challenge,
"configs": self.configs.model_dump()
})
def create_cookie(self:Self, response: Response):
jwt = self.create_cookie_token()
security.set_cookie(response, "token", jwt, {"hours": 24})
def update_usage(self: Self, message:Message):
User.update_usage(self.gid, message)
def add_challenge(self: Self):
return True
self.challenge = str(uuid.uuid4())
DB.sess.insert_one(self.model_dump(include={"fprint", "challenge"}) )
@staticmethod
def check_challenge(fprint:str, challenge: str):
return True
found = DB.sess.find_one_and_delete({"fprint":fprint})
if not found or found["challenge"] != challenge:
security.raise_401("Check challenge failed")
class User(BaseModel):
name: str
tokens: dict = {}
created: datetime = datetime.now(tz)
approved: datetime | None = None
description: str = ""
email: str
gid: str
role: str = "on hold"
configs: Configs = Configs()
_session: Session | None = None
_data: dict | None = None
@classmethod
def find_or_create(cls: Self, data: dict, loginData: dict)-> Self:
found = DB.user.find_one({"gid":data["gid"]})
user:Self = cls(**found) if found else cls(**data)
if not found:
DB.user.insert_one(user.model_dump())
user._session = Session(gid=user.gid, fprint=loginData["fp"], public_key=loginData["pk"])
log_module.logger(user.gid).info(f"User {'logged' if found else'created'} | fp: {user._session.fprint}")
return user
@classmethod
def find_from_cookie(cls:Self, request: Request) -> Self:
cookie_data:dict = security.token_from_cookie(request)
if "gid" not in cookie_data or "guid" not in cookie_data:
log_module.logger().error("Cookie without needed data")
security.raise_307("gid or guid not in cookie")
found:dict = DB.user.find_one({"gid":cookie_data["gid"]})
if not found:
log_module.logger(cookie_data["gid"]).error("User not found on DB")
security.raise_307("User not found on DB")
user: Self = cls(**found)
user._session = Session(
gid = cookie_data["gid"],
guid = cookie_data["guid"],
fprint = cookie_data["fprint"],
configs = user.configs
)
return user
@classmethod
def find_from_data(cls:Self, request: Request, data:dict) -> Self:
session:Session = Session.find_from_data(request, data)
found:dict = DB.user.find_one({"gid":session.gid})
if not found:
log_module.logger(session.gid).error("User not found on DB")
security.raise_307("User not found on DB")
user: Self = cls(**found)
user._session = session
user._data = data
return user
def update_description(self: Self, message: str) -> None:
log_module.logger(self.gid).info("Description Updated")
DB.user.update_one(
{"gid":self.gid},
{"$set": { "description": message}}
)
self.description = message
def can_use(self: Self, activity: str):
return security.can_use(self.role, activity)
def update_user(self: Self) -> None:
log_module.logger(self.gid).info("User Updated")
DB.user.update_one({"gid": self.gid}, {"$set": self.model_dump()})
return self.configs.assistantPrompt
@staticmethod
def update_usage(gid:str, tokens:int):
inc_field = datetime.now().strftime("tokens.%y.%m.%d")
DB.user.update_one({"gid": gid}, {"$inc":{inc_field: tokens}})
def create_cookie(self:Self):
return security.create_jwt_token({
"gid":self._session.gid,
"guid": self._session.guid,
"fprint": self._session.hashed,
"public_key": self._session.public_key,
"challenge": self._session.challenge,
"configs": self.configs.model_dump()
})