HirCoir's picture
Update app.py
edc72f5 verified
import os
import json
import random
import string
import subprocess
import requests
from datetime import datetime
from flask import Flask, render_template, request, jsonify, send_file, Response, stream_with_context
from bs4 import BeautifulSoup
import markdown
import threading
from queue import Queue
import time
app = Flask(__name__)
# Define directories
file_folder = os.path.dirname(os.path.abspath(__file__))
temp_audio_folder = os.path.join(file_folder, 'temp_audio')
chats_folder = os.path.join(file_folder, 'chats')
model_folder = None
piper_binary_path = os.path.join(file_folder, 'piper')
# Create necessary directories
os.makedirs(temp_audio_folder, exist_ok=True)
os.makedirs(chats_folder, exist_ok=True)
# Check default user folder
default_user_folder = "./"
if os.path.exists(default_user_folder) and any(f.endswith('.onnx') for f in os.listdir(default_user_folder)):
model_folder = default_user_folder
# Global settings
DEFAULT_BASE_HOST = "http://localhost:11434"
SETTINGS = {
'speaker': 0,
'noise_scale': 0.667,
'length_scale': 1.0,
'noise_w': 0.8,
'sentence_silence': 0.2
}
def get_available_models():
if not model_folder:
return []
return [os.path.splitext(model)[0] for model in os.listdir(model_folder) if model.endswith('.onnx')]
def get_ollama_models(base_host=DEFAULT_BASE_HOST):
try:
response = requests.get(f"{base_host}/api/tags")
if response.status_code == 200:
return [model['name'] for model in response.json().get('models', [])]
return []
except:
return []
def load_chat_history(chat_file):
try:
with open(os.path.join(chats_folder, chat_file), 'r', encoding='utf-8') as f:
return json.load(f).get('messages', [])
except:
return []
def save_chat(messages, chat_file=None):
if not messages:
return None
if not chat_file:
timestamp = datetime.now().strftime('%Y%m%d-%H%M%S')
chat_file = f'chat-{timestamp}.json'
filepath = os.path.join(chats_folder, chat_file)
with open(filepath, 'w', encoding='utf-8') as f:
json.dump({
'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
'messages': messages
}, f, ensure_ascii=False, indent=2)
return chat_file
def remove_markdown(text):
html_content = markdown.markdown(text)
soup = BeautifulSoup(html_content, 'html.parser')
return soup.get_text().strip()
def convert_to_speech(text, model_name, remove_md=False):
if model_name not in get_available_models():
return None
if remove_md:
text = remove_markdown(text)
random_name = ''.join(random.choices(string.ascii_letters + string.digits, k=8)) + '.wav'
output_file = os.path.join(temp_audio_folder, random_name)
# Clean old audio files
for file in os.listdir(temp_audio_folder):
if file.endswith('.wav'):
os.remove(os.path.join(temp_audio_folder, file))
model_path = os.path.join(model_folder, model_name + '.onnx')
try:
# Use the text directly in the command
command = (
f'"{piper_binary_path}" -m "{model_path}" -f "{output_file}" '
f'--speaker {SETTINGS["speaker"]} --noise_scale {SETTINGS["noise_scale"]} '
f'--length_scale {SETTINGS["length_scale"]} --noise_w {SETTINGS["noise_w"]} '
f'--sentence_silence {SETTINGS["sentence_silence"]}'
)
# Pass the text as input to the command
result = subprocess.run(command, input=text.encode('utf-8'), shell=True, check=True)
if os.path.exists(output_file):
return output_file
except Exception as e:
print(f"Error during text-to-speech conversion: {e}")
return None
def set_default_models():
tts_models = get_available_models()
ollama_models = get_ollama_models()
default_tts_model = "RecomendacionesConMiau" if "RecomendacionesConMiau" in tts_models else None
default_ollama_model = "llama3.2:1b" if "llama3.2:1b" in ollama_models else None
return default_tts_model, default_ollama_model
@app.route('/')
def index():
tts_models = get_available_models()
chat_files = sorted([f for f in os.listdir(chats_folder) if f.endswith('.json')], reverse=True)
default_tts_model, default_ollama_model = set_default_models()
return render_template('index.html', tts_models=tts_models, chat_files=chat_files, default_tts_model=default_tts_model, default_ollama_model=default_ollama_model)
@app.route('/api/list_ollama_models')
def list_ollama_models():
base_host = request.args.get('base_host', DEFAULT_BASE_HOST)
return jsonify(models=get_ollama_models(base_host))
@app.route('/api/load_chat/<chat_file>')
def load_chat(chat_file):
messages = load_chat_history(chat_file)
return jsonify(messages=messages)
@app.route('/api/update_message', methods=['POST'])
def update_message():
data = request.json
chat_file = data.get('chat_file')
message_index = data.get('message_index')
new_content = data.get('content')
is_user = data.get('is_user', False)
if not chat_file or message_index is None or not new_content:
return jsonify(error="Missing required parameters"), 400
messages = load_chat_history(chat_file)
if message_index >= len(messages):
return jsonify(error="Invalid message index"), 400
# Update the message content
messages[message_index]['content'] = new_content
# If it's a user message, regenerate all subsequent responses
if is_user:
# Keep messages up to and including the edited message
messages = messages[:message_index + 1]
# Save the updated chat
save_chat(messages, chat_file)
return jsonify(success=True, messages=messages)
@app.route('/api/chat', methods=['POST'])
def chat():
data = request.json
base_host = data.get('base_host', DEFAULT_BASE_HOST)
model = data.get('model')
messages = data.get('messages', [])
chat_file = data.get('chat_file')
def generate():
queue = Queue()
thread = threading.Thread(
target=stream_ollama_response,
args=(base_host, model, messages, queue)
)
thread.start()
complete_response = ""
while True:
msg_type, content = queue.get()
if msg_type == "error":
yield f"data: {json.dumps({'error': content})}\n\n"
break
elif msg_type == "chunk":
complete_response = content
yield f"data: {json.dumps({'chunk': content})}\n\n"
elif msg_type == "done":
# Save chat history
messages.append({"role": "assistant", "content": complete_response})
save_chat(messages, chat_file)
yield f"data: {json.dumps({'done': complete_response})}\n\n"
break
return Response(stream_with_context(generate()), mimetype='text/event-stream')
def stream_ollama_response(base_host, model, messages, queue):
url = f"{base_host}/api/chat"
data = {
"model": model,
"messages": messages,
"stream": True
}
try:
with requests.post(url, json=data, stream=True) as response:
if response.status_code == 200:
complete_response = ""
for line in response.iter_lines():
if line:
try:
json_response = json.loads(line)
chunk = json_response.get("message", {}).get("content", "")
if chunk:
complete_response += chunk
queue.put(("chunk", complete_response))
except json.JSONDecodeError:
continue
queue.put(("done", complete_response))
else:
queue.put(("error", f"Error: {response.status_code}"))
except Exception as e:
queue.put(("error", f"Error: {str(e)}"))
@app.route('/api/tts', methods=['POST'])
def text_to_speech():
data = request.json
text = data.get('text', '')
model = data.get('model')
remove_md = data.get('remove_markdown', False)
if not text or not model:
return jsonify(error="Missing text or model"), 400
audio_file = convert_to_speech(text, model, remove_md)
if not audio_file:
return jsonify(error="Failed to convert text to speech"), 500
return jsonify(audio_file=os.path.basename(audio_file))
@app.route('/audio/<filename>')
def serve_audio(filename):
return send_file(os.path.join(temp_audio_folder, filename))
if __name__ == '__main__':
app.run(debug=True, port=7860, host='0.0.0.0')