Spaces:
Running
Running
from fastapi import FastAPI, Request, Depends, HTTPException, status, Response | |
from fastapi.responses import HTMLResponse, RedirectResponse, StreamingResponse | |
from fastapi.security import HTTPBasic, HTTPBasicCredentials | |
from base64 import b64decode | |
import time, json | |
from hashlib import sha256 | |
from modules import model, oauth, security, log_module, llm, chat_functions, settings | |
from modules.model import User, Session | |
from fastapi.staticfiles import StaticFiles | |
from fastapi.templating import Jinja2Templates | |
from datetime import datetime | |
from typing import Annotated | |
####################### APP SETUP ######################## | |
# app = FastAPI(docs_url=None, redoc_url=None) | |
app = FastAPI() | |
app.mount("/static", StaticFiles(directory="static"), name="static") | |
templates = Jinja2Templates(directory="templates") | |
fecha_unix = str(int(time.time())) | |
log_module.logger("system").info("iniciado") | |
########################################################## | |
######################## ROUTE: / ######################## | |
async def main_page(request: Request): | |
tools = [{"name": k, "desc": v} for k, v in chat_functions.function_user_text.items()] | |
asistants = model.get_all_personality_cores() | |
response = templates.TemplateResponse( | |
"main.html", { | |
"request": request, | |
"version": fecha_unix, | |
"tools": tools, | |
"asistants": asistants | |
}) | |
return response | |
########################################################## | |
#################### SECURITY (OAUTH) #################### | |
async def login(request: Request): | |
# Shows the Start "session with google" button, and deletes token cookie | |
ret = templates.TemplateResponse("login.html", {"request": request, "redirecturi": settings.OAUTH_REDIRECT}) | |
ret.delete_cookie("token") | |
return ret | |
async def validate_oauth(request: Request, response: Response): | |
# Get the oauth get params, | |
# look for the google validation and info, | |
# set the session cookie and save user in DB | |
# Extract the Get params | |
params = dict(request.query_params) | |
# Client browser fingerprint bypassed with google oauth as "state" | |
data = params["state"].split("=",1)[1] | |
data = b64decode(data) | |
data = json.loads(data) | |
# Get the google user info | |
google_userinfo = oauth.validate_redirect(params) | |
# Create the user model with the google info | |
user:User = model.User.find_or_create(google_userinfo, data) | |
# Prepare redirect response and set the session cookie | |
token = user.create_cookie() | |
if not user.can_use("chat"): | |
response = RedirectResponse(url='/hold') | |
security.set_cookie(response, key="token", value=token) | |
return response | |
# Saves the user | |
user.update_user() | |
security.set_cookie(response, key="token", value=token) | |
return {"success": True} | |
########################################################## | |
###################### APIs Configs ###################### | |
# Get data and structures from its models | |
User_find_from_data = Annotated[User, Depends(User.find_from_data)] | |
Session_find_from_data = Annotated[Session, Depends(Session.find_from_data)] | |
async def get_configs(response: Response, user: User_find_from_data): | |
if not user.can_use("chat"): | |
response = RedirectResponse(url='/hold') | |
return response | |
# Get llm tokens used | |
year_, month_ = datetime.now().strftime("%y-%m").split("-") | |
month = sum(user.tokens.get(year_, {}).get(month_, {"_":0}).values()) | |
total = sum([z for x in user.tokens.values() for y in x.values() for z in y.values()]) | |
tokens = {"month": month, "total": total} | |
# Create cookies and answer | |
user._session.create_cookie(response) | |
return user.configs.model_dump() | {"tokens": tokens, "challenge": user._session.challenge} | |
async def set_configs(response: Response, user: User_find_from_data): | |
if not user.can_use("chat"): | |
response = RedirectResponse(url='/hold') | |
return response | |
# Create config model | |
user.configs = model.Configs(**user._data) | |
user._session.configs = user.configs | |
assisntat_prompt = user.update_user() | |
# Create cookies and answer | |
user._session.create_cookie(response) | |
return {"success":True, "challenge": user._session.challenge, "assistantPrompt": assisntat_prompt} | |
async def get_token(response: Response, user: User_find_from_data): | |
# Generate api token | |
user.create_cookie() | |
if not user.can_use("chat"): | |
return {"success":False, "redirect": "/hold"} | |
# Create cookies and answer | |
return {"success":True, "challenge": user._session.challenge} | |
########################################################## | |
async def chat_async(session: Session_find_from_data): | |
chat = model.Chat(messages = session.data["messages"], personality=session.configs.assistant) | |
if(len(chat.messages) < 1 or chat.messages[-1].content==""): | |
log_module.logger(session.gid).warning("Empty message") | |
raise HTTPException( | |
status_code=status.HTTP_418_IM_A_TEAPOT, | |
detail= "Nope" | |
) | |
return StreamingResponse(llm.streamer(chat, session), media_type="application/json") | |
########################## Static Pages ########################## | |
async def privacy_policy(request: Request): | |
return templates.TemplateResponse("PrivacyPolicy.html", {"request": request}) | |
async def on_hold(request: Request, user: User = Depends(User.find_from_cookie)): | |
if user.can_use("chat"): | |
return RedirectResponse(url='/') | |
return templates.TemplateResponse( | |
"no_access.html", { | |
"request": request, | |
"description": bool(user.description) | |
}) | |
async def on_hold(request: Request, user: User = Depends(User.find_from_cookie)): | |
if not user.description: | |
form = await request.form() | |
if message := form["message"].strip(): | |
user.update_description(message) | |
return templates.TemplateResponse( | |
"no_access.html", { | |
"request": request, | |
"description": bool(user.description) | |
}) | |
########################## Other ########################## | |
# @app.get("/read_log", response_class=HTMLResponse) | |
# async def read_log(request: Request, credentials: HTTPBasicCredentials = Depends(httpsecurity)): | |
# if sha256(credentials.username.encode()).hexdigest()=="bc1c32d709aef061bbde4fc848421cdb933e8a9f391c3a089f2861ac0772c168" and security.authenticate_user(credentials): | |
# log_module.log_write(credentials.username, "Log Accesado", "") | |
# with open("logs/eventos.log", "r") as f: | |
# return HTMLResponse(f.read()) | |
# log_module.log_write(credentials.username, "Intento acceder logs", f"{request.client.host}:{request.client.port} - {str(dict(request.headers))}") | |
# raise HTTPException(status_code=404) | |