newvoice / app.py
AiDeveloper1's picture
Update app.py
3e41026 verified
from flask import Flask, render_template, request, jsonify
from googletrans import Translator
import io
import asyncio
from dotenv import load_dotenv
import os
import logging
from chatbot import process_uploaded_file, index_documents, rag_chatbot
# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# Load environment variables
load_dotenv()
app = Flask(__name__)
LANGUAGE_MAP = {
"English (US)": "en",
"Hindi (India)": "hi",
"Spanish (Spain)": "es",
"French (France)": "fr",
"German (Germany)": "de",
"Arabic (Saudi Arabia)": "ar"
}
@app.route('/')
def index():
return render_template("index.html")
@app.route('/api/upload_document', methods=['POST'])
def upload_document():
try:
if 'file' not in request.files:
return jsonify({"error": "No file uploaded"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No file selected"}), 400
# Process file without saving locally
file_stream = io.BytesIO(file.read())
documents = process_uploaded_file(file_stream, file.filename)
# Index documents in Pinecone
vector_store = index_documents(documents)
return jsonify({"message": f"Successfully processed and indexed {len(documents)} chunks from {file.filename}"})
except Exception as e:
logger.error(f"Error in upload_document: {str(e)}")
return jsonify({"error": str(e)}), 500
@app.route('/api/process_text', methods=['POST'])
def process_text():
# Get JSON payload
data = request.get_json()
try:
original_text = data['text']
language_name = data['language']
except (KeyError, TypeError):
return jsonify({"error": "Missing 'text' or 'language' in JSON payload"}), 400
# Map language name to language code
if language_name not in LANGUAGE_MAP:
return jsonify({"error": f"Unsupported language: {language_name}"}), 400
original_lang_code = LANGUAGE_MAP[language_name]
logger.info(f"Original Text: {original_text}")
logger.info(f"Original Language: {language_name} ({original_lang_code})")
# Define an async function for translation
def translate_async(text, dest_lang):
translator = Translator()
translated = translator.translate(text, dest=dest_lang)
return translated.text
# Translate to English
if original_lang_code != "en":
translated_text = asyncio.run(translate_async(original_text, dest_lang="en"))
else:
translated_text = original_text
logger.info(f"Translated to English: {translated_text}")
# Process with RAG
response = rag_chatbot(translated_text)
logger.info(f"English Response: {response}")
# Translate response back to original language
if original_lang_code != "en":
final_response = asyncio.run(translate_async(response, dest_lang=original_lang_code))
else:
final_response = response
logger.info(f"Final Response (in original language): {final_response}")
# Return the final response
return jsonify({"response": final_response, "language": language_name})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7860)