LLMBB-Agent / qwen_server /database_server.py
vlff李飞飞
use token
2ab9e99
raw
history blame
No virus
4.93 kB
import json
import multiprocessing
import os
from pathlib import Path
import add_qwen_libs # NOQA
import jsonlines
import uvicorn
from fastapi import FastAPI, Request, HTTPException, Response
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from fastapi.staticfiles import StaticFiles
from qwen_agent.log import logger
from qwen_agent.utils.utils import get_local_ip
from qwen_server.schema import GlobalConfig
from qwen_server.utils import extract_and_cache_document
from starlette.middleware.sessions import SessionMiddleware
# Read config
with open(Path(__file__).resolve().parent / 'server_config.json', 'r') as f:
server_config = json.load(f)
server_config = GlobalConfig(**server_config)
app = FastAPI()
logger.info(get_local_ip())
origins = [
'http://127.0.0.1:' + str(server_config.server.workstation_port),
'http://localhost:' + str(server_config.server.workstation_port),
'http://0.0.0.0:' + str(server_config.server.workstation_port),
'http://' + get_local_ip() + ':' +
str(server_config.server.workstation_port),
]
app.add_middleware(
CORSMiddleware,
# allow_origins=origins,
allow_credentials=True,
allow_methods=['*'],
allow_headers=['*'],
)
app.mount('/static',
StaticFiles(directory=server_config.path.code_interpreter_ws),
name='static')
def update_pop_url(data, cache_file_popup_url, access_token):
new_line = {'url': data['url'], "access_token": access_token}
lines = []
for line in jsonlines.open(cache_file_popup_url):
lines.append(line)
lines.append(new_line)
with jsonlines.open(cache_file_popup_url, mode='w') as writer:
for new_line in lines:
writer.write(new_line)
response = 'Update URL'
return response
def change_checkbox_state(text, cache_file, access_token):
if not os.path.exists(cache_file):
return {'result': 'no file'}
lines = []
for line in jsonlines.open(cache_file):
if line['url'] == text[3:] and line['access_token'] == access_token:
if line['checked']:
line['checked'] = False
else:
line['checked'] = True
lines.append(line)
with jsonlines.open(cache_file, mode='w') as writer:
for new_line in lines:
writer.write(new_line)
return {'result': 'changed'}
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
print(f"Request URL path: {request.url}")
access_token: str = request.headers.get("Authorization") or request.query_params.get("access_token") or request.session.get("access_token")
if not access_token or os.getenv("ACCESS_TOKEN") != access_token:
return Response(status_code=401, content="the token is not valid")
request.session.setdefault("access_token", access_token)
return await call_next(request)
@app.get('/healthz')
async def healthz(request: Request):
return JSONResponse({"healthz": True})
@app.get('/cachedata/{file_name}')
async def cache_data(file_name: str):
cache_file = os.path.join(server_config.path.cache_root, file_name)
lines = []
for line in jsonlines.open(cache_file):
lines.append(line)
return JSONResponse(lines)
@app.post('/endpoint')
async def web_listening(request: Request):
data = await request.json()
msg_type = data['task']
access_token = request.session.get("access_token")
cache_file_popup_url = os.path.join(server_config.path.cache_root, 'popup_url.jsonl')
cache_file = os.path.join(server_config.path.cache_root, 'browse.jsonl')
if msg_type == 'change_checkbox':
rsp = change_checkbox_state(data['ckid'], cache_file, access_token)
elif msg_type == 'cache':
cache_obj = multiprocessing.Process(
target=extract_and_cache_document,
args=(data, cache_file, server_config.path.cache_root, access_token))
cache_obj.start()
# rsp = cache_data(data, cache_file)
rsp = 'caching'
elif msg_type == 'pop_url':
# What a misleading name! pop_url actually means add_url. pop is referring to the pop_up ui.
rsp = update_pop_url(data, cache_file_popup_url, access_token)
else:
raise NotImplementedError
return JSONResponse(content=rsp)
import gradio as gr
# from qwen_server.workstation_server import demo as workstation_app
from qwen_server.assistant_server import demo as assistant_app
# app = gr.mount_gradio_app(app, workstation_app, path="/workstation")
app = gr.mount_gradio_app(app, assistant_app, path="/")
app.add_middleware(SessionMiddleware, secret_key=os.getenv("SECRET_KEY"), max_age=25200)
if __name__ == '__main__':
uvicorn.run(app='database_server:app',
host=server_config.server.server_host,
port=server_config.server.fast_api_port,
reload=False, workers=1)