lokesh341's picture
Update app.py
15e9641 verified
raw
history blame
7.98 kB
import torch
from flask import Flask, render_template, request, jsonify, session
import json
import os
from transformers import pipeline
from gtts import gTTS
from pydub import AudioSegment
from pydub.silence import detect_nonsilent
from transformers import AutoConfig # Import AutoConfig for the config object
import time
from waitress import serve
from simple_salesforce import Salesforce
import requests # Import requests for exception handling
app = Flask(__name__)
app.secret_key = 'your_secret_key' # Use a secure secret key for sessions
# Use whisper-small for faster processing and better speed
device = "cuda" if torch.cuda.is_available() else "cpu"
# Create config object to set timeout and other parameters
config = AutoConfig.from_pretrained("openai/whisper-small")
config.update({"timeout": 60}) # Set timeout to 60 seconds
# Salesforce connection setup
try:
print("Attempting to connect to Salesforce...")
sf = Salesforce(username='diggavalli98@gmail.com', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q')
print("Connected to Salesforce successfully!")
except Exception as e:
print(f"Failed to connect to Salesforce: {str(e)}")
# Function for Salesforce operations
def create_salesforce_record(name, email, phone_number):
try:
customer_login = sf.Customer_Login__c.create({
'Name': name,
'Email__c': email,
'Phone_Number__c': phone_number
})
return customer_login
except Exception as e:
raise Exception(f"Failed to create record: {str(e)}")
def get_menu_items():
query = "SELECT Name, Price__c, Ingredients__c, Category__c FROM Menu_Item__c"
result = sf.query(query)
return result['records']
def create_salesforce_order(cart_items, total_price, customer_id):
try:
order = sf.Order__c.create({
'Total_Price__c': total_price,
'Cart_Items__c': json.dumps(cart_items), # Storing cart items as JSON
'Customer__c': customer_id # Linking to the customer record
})
return order
except Exception as e:
raise Exception(f"Failed to create order: {str(e)}")
# Voice-related functions
def generate_audio_prompt(text, filename):
try:
tts = gTTS(text)
tts.save(os.path.join("static", filename))
except gtts.tts.gTTSError as e:
print(f"Error: {e}")
print("Retrying after 5 seconds...")
time.sleep(5) # Wait for 5 seconds before retrying
generate_audio_prompt(text, filename)
# Utility functions
def convert_to_wav(input_path, output_path):
try:
audio = AudioSegment.from_file(input_path)
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono
audio.export(output_path, format="wav")
except Exception as e:
print(f"Error: {str(e)}")
raise Exception(f"Audio conversion failed: {str(e)}")
def is_silent_audio(audio_path):
audio = AudioSegment.from_wav(audio_path)
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) # Reduced silence duration
print(f"Detected nonsilent parts: {nonsilent_parts}")
return len(nonsilent_parts) == 0 # If no speech detected
# Routes and Views
@app.route("/")
def index():
return render_template("index.html")
@app.route("/dashboard", methods=["GET"])
def dashboard():
return render_template("dashboard.html") # Render the dashboard template
@app.route('/submit', methods=['POST'])
def submit():
try:
data = request.get_json()
print("Received Data:", data) # Debugging line
if not data:
return jsonify({"success": False, "message": "Invalid or empty JSON received"}), 400
name = data.get("name")
email = data.get("email")
phone = data.get("phone")
if not name or not email or not phone:
return jsonify({"success": False, "message": "Missing required fields"}), 400
salesforce_data = {
"Name": name,
"Email__c": email, # Ensure this is the correct field API name in Salesforce
"Phone_Number__c": phone # Ensure this is the correct field API name in Salesforce
}
try:
result = sf.Customer_Login__c.create(salesforce_data) # Replace `Customer_Login__c` with your Salesforce object API name
print(f"Salesforce Response: {result}") # Debugging line
return jsonify({"success": True, "message": "Data submitted successfully"})
except Exception as e:
if "STORAGE_LIMIT_EXCEEDED" in str(e):
return jsonify({"success": False, "message": "Salesforce storage limit exceeded. Please clean up or contact support."}), 500
else:
print(f"Salesforce Insertion Error: {str(e)}") # Log Salesforce errors
return jsonify({"success": False, "message": "Salesforce submission failed", "error": str(e)}), 500
except Exception as e:
print(f"Server Error: {str(e)}") # Log general errors
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500
@app.route('/validate-login', methods=['POST'])
def validate_login():
try:
data = request.get_json()
login_email = data.get("email")
login_mobile = data.get("mobile")
if not login_email or not login_mobile:
return jsonify({"success": False, "message": "Missing email or mobile number"}), 400
query = f"SELECT Id, Name, Email__c, Phone_Number__c FROM Customer_Login__c WHERE Email__c = '{login_email}' AND Phone_Number__c = '{login_mobile}'"
result = sf.query(query)
if result['records']:
user_name = result['records'][0]['Name']
return jsonify({"success": True, "message": "Login successful", "name": user_name})
else:
return jsonify({"success": False, "message": "Invalid email or mobile number"}), 401
except Exception as e:
print(f"Error validating login: {str(e)}")
return jsonify({"success": False, "message": "Internal server error", "error": str(e)}), 500
@app.route("/menu", methods=["GET"])
def menu_page():
menu_items = get_menu_items()
menu_data = [{"name": item['Name'], "price": item['Price__c'], "ingredients": item['Ingredients__c'], "category": item['Category__c']} for item in menu_items]
return render_template("menu_page.html", menu_items=menu_data)
@app.route("/add_to_cart", methods=["POST"])
def add_to_cart():
cart_items = session.get('cart_items', [])
item_name = request.json.get('item_name')
quantity = request.json.get('quantity')
cart_items.append({"name": item_name, "quantity": quantity})
session['cart_items'] = cart_items # Update session with new cart items
return jsonify({"success": True, "message": f"Added {item_name} to your cart."})
@app.route("/view_cart", methods=["GET"])
def view_cart():
cart_items = session.get('cart_items', [])
return render_template("cart_page.html", cart_items=cart_items)
@app.route("/place_order", methods=["POST"])
def place_order():
cart_items = session.get('cart_items', [])
total_price = sum([item['quantity'] * 100 for item in cart_items]) # Placeholder price calculation
customer_id = session.get('customer_id')
if not customer_id:
return jsonify({"message": "User not logged in"}), 400
try:
order_data = {"Total_Price__c": total_price, "Cart_Items__c": json.dumps(cart_items), "Customer__c": customer_id}
order = sf.Order__c.create(order_data)
session.pop('cart_items', None) # Clear the cart after placing the order
return jsonify({"message": "Order placed successfully!", "order": order})
except Exception as e:
return jsonify({"message": f"Failed to create order: {str(e)}"}), 500
# Run the app
if __name__ == "__main__":
app.run(debug=True)