HirCoir's picture
Update app.py
508067c verified
raw
history blame
9.07 kB
import os
import json
import random
import string
import subprocess
import requests
from datetime import datetime
from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context
from bs4 import BeautifulSoup
import markdown
import threading
from queue import Queue
import time
app = Flask(__name__)
# Define directories
file_folder = os.path.dirname(os.path.abspath(__file__))
temp_audio_folder = os.path.join(file_folder, 'temp_audio')
chats_folder = os.path.join(file_folder, 'chats')
model_folder = None
piper_binary_path = os.path.join(file_folder, 'piper', 'piper')
# Create necessary directories
os.makedirs(temp_audio_folder, exist_ok=True)
os.makedirs(chats_folder, exist_ok=True)
# Check default user folder
default_user_folder = "./"
if os.path.exists(default_user_folder) and any(f.endswith('.onnx') for f in os.listdir(default_user_folder)):
model_folder = default_user_folder
# Global settings
DEFAULT_BASE_HOST = "http://localhost:11434"
SETTINGS = {
'speaker': 0,
'noise_scale': 0.667,
'length_scale': 1.0,
'noise_w': 0.8,
'sentence_silence': 0.2
}
def get_available_models():
if not model_folder:
return []
return [os.path.splitext(model)[0] for model in os.listdir(model_folder) if model.endswith('.onnx')]
def get_ollama_models(base_host=DEFAULT_BASE_HOST):
try:
response = requests.get(f"{base_host}/api/tags")
if response.status_code == 200:
return [model['name'] for model in response.json().get('models', [])]
return []
except:
return []
def load_chat_history(chat_file):
try:
with open(os.path.join(chats_folder, chat_file), 'r', encoding='utf-8') as f:
return json.load(f).get('messages', [])
except:
return []
def save_chat(messages, chat_file=None):
if not messages:
return None
if not chat_file:
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
chat_file = f'chat-{timestamp}.json'
filepath = os.path.join(chats_folder, chat_file)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump({
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'messages': messages
}, f, ensure_ascii=False, indent=2)
return chat_file
def remove_markdown(text):
html_content = markdown.markdown(text)
soup = BeautifulSoup(html_content, 'html.parser')
return soup.get_text().strip()
def convert_to_speech(text, model_name, remove_md=False):
if model_name not in get_available_models():
return None
if remove_md:
text = remove_markdown(text)
random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + '.wav'
output_file = os.path.join(temp_audio_folder, random_name)
# Clean old audio files
for file in os.listdir(temp_audio_folder):
if file.endswith('.wav'):
os.remove(os.path.join(temp_audio_folder, file))
model_path = os.path.join(model_folder, model_name + '.onnx')
# Create temporary text file
temp_txt_path = os.path.join(os.getenv('TEMP'), "temp_text.txt")
with open(temp_txt_path, "w", encoding="utf-8") as f:
f.write(text)
try:
command = (f'type "{temp_txt_path}" | "{piper_binary_path}" -m "{model_path}" -f "{output_file}" '
f'--speaker {SETTINGS["speaker"]} --noise_scale {SETTINGS["noise_scale"]} '
f'--length_scale {SETTINGS["length_scale"]} --noise_w {SETTINGS["noise_w"]} '
f'--sentence_silence {SETTINGS["sentence_silence"]}')
subprocess.run(command, shell=True, check=True)
os.remove(temp_txt_path)
if os.path.exists(output_file):
return output_file
except:
pass
finally:
if os.path.exists(temp_txt_path):
os.remove(temp_txt_path)
return None
def set_default_models():
tts_models = get_available_models()
ollama_models = get_ollama_models()
default_tts_model = "RecomendacionesConMiau" if "RecomendacionesConMiau" in tts_models else None
default_ollama_model = "llama3.2:1b" if "llama3.2:1b" in ollama_models else None
return default_tts_model, default_ollama_model
@app.route('/')
def index():
tts_models = get_available_models()
chat_files = sorted([f for f in os.listdir(chats_folder) if f.endswith('.json')], reverse=True)
default_tts_model, default_ollama_model = set_default_models()
return render_template('index.html', tts_models=tts_models, chat_files=chat_files, default_tts_model=default_tts_model, default_ollama_model=default_ollama_model)
@app.route('/api/list_ollama_models')
def list_ollama_models():
base_host = request.args.get('base_host', DEFAULT_BASE_HOST)
return jsonify(models=get_ollama_models(base_host))
@app.route('/api/load_chat/<chat_file>')
def load_chat(chat_file):
messages = load_chat_history(chat_file)
return jsonify(messages=messages)
@app.route('/api/update_message', methods=['POST'])
def update_message():
data = request.json
chat_file = data.get('chat_file')
message_index = data.get('message_index')
new_content = data.get('content')
is_user = data.get('is_user', False)
if not chat_file or message_index is None or not new_content:
return jsonify(error="Missing required parameters"), 400
messages = load_chat_history(chat_file)
if message_index >= len(messages):
return jsonify(error="Invalid message index"), 400
# Update the message content
messages[message_index]['content'] = new_content
# If it's a user message, regenerate all subsequent responses
if is_user:
# Keep messages up to and including the edited message
messages = messages[:message_index + 1]
# Save the updated chat
save_chat(messages, chat_file)
return jsonify(success=True, messages=messages)
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.json
base_host = data.get('base_host', DEFAULT_BASE_HOST)
model = data.get('model')
messages = data.get('messages', [])
chat_file = data.get('chat_file')
def generate():
queue = Queue()
thread = threading.Thread(
target=stream_ollama_response,
args=(base_host, model, messages, queue)
)
thread.start()
complete_response = ""
while True:
msg_type, content = queue.get()
if msg_type == "error":
yield f"data: {json.dumps({'error': content})}\n\n"
break
elif msg_type == "chunk":
complete_response = content
yield f"data: {json.dumps({'chunk': content})}\n\n"
elif msg_type == "done":
# Save chat history
messages.append({"role": "assistant", "content": complete_response})
save_chat(messages, chat_file)
yield f"data: {json.dumps({'done': complete_response})}\n\n"
break
return Response(stream_with_context(generate()), mimetype='text/event-stream')
def stream_ollama_response(base_host, model, messages, queue):
url = f"{base_host}/api/chat"
data = {
"model": model,
"messages": messages,
"stream": True
}
try:
with requests.post(url, json=data, stream=True) as response:
if response.status_code == 200:
complete_response = ""
for line in response.iter_lines():
if line:
try:
json_response = json.loads(line)
chunk = json_response.get("message", {}).get("content", "")
if chunk:
complete_response += chunk
queue.put(("chunk", complete_response))
except json.JSONDecodeError:
continue
queue.put(("done", complete_response))
else:
queue.put(("error", f"Error: {response.status_code}"))
except Exception as e:
queue.put(("error", f"Error: {str(e)}"))
@app.route('/api/tts', methods=['POST'])
def text_to_speech():
data = request.json
text = data.get('text', '')
model = data.get('model')
remove_md = data.get('remove_markdown', False)
if not text or not model:
return jsonify(error="Missing text or model"), 400
audio_file = convert_to_speech(text, model, remove_md)
if not audio_file:
return jsonify(error="Failed to convert text to speech"), 500
return jsonify(audio_file=os.path.basename(audio_file))
@app.route('/audio/<filename>')
def serve_audio(filename):
return send_file(os.path.join(temp_audio_folder, filename))
if __name__ == '__main__':
app.run(debug=True, port=7860, host='0.0.0.0')