|
import os |
|
import time |
|
import logging |
|
import json |
|
import requests |
|
import torch |
|
from flask import Flask, render_template, request, jsonify, session |
|
from flask_session import Session |
|
from simple_salesforce import Salesforce |
|
from transformers import pipeline, AutoConfig |
|
from gtts import gTTS |
|
from pydub import AudioSegment |
|
from pydub.silence import detect_nonsilent |
|
from waitress import serve |
|
|
|
app = Flask(__name__) |
|
|
|
|
|
app.secret_key = os.getenv("SECRET_KEY", "sSSjyhInIsUohKpG8sHzty2q") |
|
app.config["SESSION_TYPE"] = "filesystem" |
|
app.config["SESSION_COOKIE_NAME"] = "my_session" |
|
app.config["SESSION_COOKIE_SECURE"] = True |
|
app.config["SESSION_COOKIE_SAMESITE"] = "None" |
|
Session(app) |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
try: |
|
print("Attempting to connect to Salesforce...") |
|
sf = Salesforce(username='diggavalli98@gmail.com', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') |
|
print("Connected to Salesforce successfully!") |
|
except Exception as e: |
|
print(f"Failed to connect to Salesforce: {str(e)}") |
|
|
|
|
|
device = "cuda" if torch.cuda.is_available() else "cpu" |
|
config = AutoConfig.from_pretrained("openai/whisper-small") |
|
config.update({"timeout": 60}) |
|
|
|
|
|
prompts = { |
|
"welcome": "Welcome to Biryani Hub.", |
|
"ask_name": "Tell me your name.", |
|
"ask_email": "Please provide your email address.", |
|
"thank_you": "Thank you for registration." |
|
} |
|
|
|
def generate_audio_prompt(text, filename): |
|
try: |
|
tts = gTTS(text) |
|
tts.save(os.path.join("static", filename)) |
|
except gtts.tts.gTTSError as e: |
|
print(f"Error: {e}") |
|
print("Retrying after 5 seconds...") |
|
time.sleep(5) |
|
generate_audio_prompt(text, filename) |
|
|
|
for key, text in prompts.items(): |
|
generate_audio_prompt(text, f"{key}.mp3") |
|
|
|
|
|
def convert_to_wav(input_path, output_path): |
|
try: |
|
audio = AudioSegment.from_file(input_path) |
|
audio = audio.set_frame_rate(16000).set_channels(1) |
|
audio.export(output_path, format="wav") |
|
except Exception as e: |
|
raise Exception(f"Audio conversion failed: {str(e)}") |
|
|
|
|
|
def is_silent_audio(audio_path): |
|
audio = AudioSegment.from_wav(audio_path) |
|
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) |
|
return len(nonsilent_parts) == 0 |
|
|
|
@app.route("/") |
|
def index(): |
|
return render_template("index.html") |
|
|
|
|
|
@app.route('/login', methods=['POST']) |
|
def login(): |
|
data = request.json |
|
email = data.get('email').strip().lower() |
|
phone_number = data.get('phone_number').strip() |
|
|
|
if not email or not phone_number: |
|
return jsonify({'error': 'Missing email or phone number'}), 400 |
|
|
|
try: |
|
print(f"π Checking login for Email: {email}, Phone: {phone_number}") |
|
|
|
query = f"SELECT Id, Name FROM Customer_Login__c WHERE LOWER(Email__c) = '{email}' AND Phone_Number__c = '{phone_number}' LIMIT 1" |
|
result = sf.query(query) |
|
|
|
if result['totalSize'] == 0: |
|
print("β No matching records found!") |
|
return jsonify({'error': 'Invalid email or phone number. User not found'}), 401 |
|
|
|
user_data = result['records'][0] |
|
session['user_id'] = user_data['Id'] |
|
session['name'] = user_data['Name'] |
|
print("β
User found:", user_data) |
|
|
|
return jsonify({'success': True, 'message': 'Login successful', 'user_id': user_data['Id'], 'name': user_data['Name']}), 200 |
|
|
|
except requests.exceptions.RequestException as req_error: |
|
print("π΄ Salesforce Connection Error:", req_error) |
|
return jsonify({'error': f'Salesforce connection error: {str(req_error)}'}), 500 |
|
except Exception as e: |
|
print("π¨ Unexpected Error:", e) |
|
return jsonify({'error': f'Unexpected error: {str(e)}'}), 500 |
|
|
|
|
|
@app.route("/register", methods=["POST"]) |
|
def register(): |
|
data = request.json |
|
name = data.get('name') |
|
email = data.get('email').strip().lower() |
|
phone = data.get('phone').strip() |
|
|
|
if not name or not email or not phone: |
|
return jsonify({'error': 'Missing data'}), 400 |
|
|
|
try: |
|
query = f"SELECT Id FROM Customer_Login__c WHERE LOWER(Email__c) = '{email}' AND Phone_Number__c = '{phone}' LIMIT 1" |
|
existing_user = sf.query(query) |
|
|
|
if existing_user['totalSize'] > 0: |
|
return jsonify({'error': 'User already exists'}), 409 |
|
|
|
customer_login = sf.Customer_Login__c.create({ |
|
'Name': name, |
|
'Email__c': email, |
|
'Phone_Number__c': phone |
|
}) |
|
|
|
if customer_login.get('id'): |
|
return jsonify({'success': True, 'user_id': customer_login['id']}), 200 |
|
else: |
|
return jsonify({'error': 'Failed to create record'}), 500 |
|
|
|
except Exception as e: |
|
return jsonify({'error': str(e)}), 500 |
|
|
|
|
|
@app.route("/transcribe", methods=["POST"]) |
|
def transcribe(): |
|
if "audio" not in request.files: |
|
return jsonify({"error": "No audio file provided"}), 400 |
|
|
|
audio_file = request.files["audio"] |
|
input_audio_path = os.path.join("static", "temp_input.wav") |
|
output_audio_path = os.path.join("static", "temp.wav") |
|
audio_file.save(input_audio_path) |
|
|
|
try: |
|
convert_to_wav(input_audio_path, output_audio_path) |
|
|
|
if is_silent_audio(output_audio_path): |
|
return jsonify({"error": "No speech detected. Please try again."}), 400 |
|
|
|
result = pipeline("automatic-speech-recognition", model="openai/whisper-small", device=0 if torch.cuda.is_available() else -1, config=config) |
|
transcribed_text = result(output_audio_path)["text"].strip().capitalize() |
|
|
|
return jsonify({"text": transcribed_text}) |
|
|
|
except Exception as e: |
|
return jsonify({"error": f"Speech recognition error: {str(e)}"}), 500 |
|
|
|
|
|
if __name__ == "__main__": |
|
serve(app, host="0.0.0.0", port=7860) |
|
|
|
|