|
import logging |
|
|
|
import requests |
|
import os, shutil |
|
from flask import Flask, send_from_directory, request, jsonify |
|
|
|
app = Flask(__name__, static_folder='online_log/static') |
|
|
|
app.logger.setLevel(logging.ERROR) |
|
|
|
log = logging.getLogger('werkzeug') |
|
log.setLevel(logging.ERROR) |
|
|
|
messages = [] |
|
import threading |
|
from urllib.parse import parse_qs |
|
|
|
FILE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
OUTPUT_DIR = os.path.join(FILE_DIR, "WareHouse") |
|
def check_outdir(): |
|
if not os.path.exists(OUTPUT_DIR): |
|
os.mkdir(OUTPUT_DIR) |
|
else: |
|
shutil.rmtree(OUTPUT_DIR) |
|
os.mkdir(OUTPUT_DIR) |
|
|
|
|
|
def zip_all_files(): |
|
shutil.make_archive("online_log/static/Outputs", "zip", OUTPUT_DIR) |
|
|
|
|
|
def clear_all_files(): |
|
shutil.rmtree(OUTPUT_DIR) |
|
os.mkdir(OUTPUT_DIR) |
|
|
|
|
|
def send_msg(role, text): |
|
try: |
|
data = {"role": role, "text": text} |
|
response = requests.post("http://127.0.0.1:7860/send_message", json=data) |
|
if response.status_code == 200: |
|
print("Message sent successfully!") |
|
else: |
|
print("Failed to send message.") |
|
except: |
|
logging.info("flask app.py did not start for online log") |
|
|
|
|
|
@app.route("/") |
|
def index(): |
|
return send_from_directory("online_log/static", "index.html") |
|
|
|
@app.route("/Outputs.zip") |
|
def Outputs(): |
|
return send_from_directory("online_log/static", "Outputs.zip") |
|
|
|
@app.route("/chain_visualizer") |
|
def chain_visualizer(): |
|
return send_from_directory("online_log/static", "chain_visualizer.html") |
|
|
|
@app.route("/replay") |
|
def replay(): |
|
return send_from_directory("online_log/static", "replay.html") |
|
|
|
@app.route("/download") |
|
def download(): |
|
return send_from_directory("online_log/static", "index.html") |
|
|
|
@app.route("/get_messages") |
|
def get_messages(): |
|
return jsonify(messages) |
|
|
|
|
|
@app.route("/send_message", methods=["POST"]) |
|
def send_message(): |
|
data = request.get_json() |
|
role = data.get("role") |
|
text = data.get("text") |
|
|
|
avatarUrl = find_avatar_url(role) |
|
|
|
message = {"role": role, "text": text, "avatarUrl": avatarUrl} |
|
messages.append(message) |
|
return jsonify(message) |
|
|
|
|
|
@app.post("/download") |
|
def run(): |
|
data = request.get_data().decode('utf-8') |
|
query_params = parse_qs(data) |
|
task = query_params['task'][0].replace("+", " ") |
|
config = query_params['config'][0] |
|
api_key = query_params['api_key'][0] |
|
os.environ["OPENAI_API_KEY"] = api_key |
|
check_outdir() |
|
from run import runchatdev |
|
|
|
|
|
|
|
runchatdev(task, config) |
|
zip_all_files() |
|
return send_from_directory("online_log/static", "index.html") |
|
|
|
def find_avatar_url(role): |
|
role = role.replace(" ", "%20") |
|
avatar_filename = f"avatars/{role}.png" |
|
avatar_url = f"/static/{avatar_filename}" |
|
return avatar_url |
|
|
|
|
|
if __name__ == "__main__": |
|
app.run(host="0.0.0.0", port=7860) |