Spaces:
Sleeping
Sleeping
import torch | |
from flask import Flask, render_template, request, jsonify | |
import json | |
import os | |
from transformers import pipeline | |
from gtts import gTTS | |
from pydub import AudioSegment | |
from pydub.silence import detect_nonsilent | |
from transformers import AutoConfig # Import AutoConfig for the config object | |
import time | |
from waitress import serve | |
from simple_salesforce import Salesforce | |
import requests # Import requests for exception handling | |
app = Flask(__name__) | |
# Use whisper-small for faster processing and better speed | |
device = "cuda" if torch.cuda.is_available() else "cpu" | |
# Create config object to set timeout and other parameters | |
config = AutoConfig.from_pretrained("openai/whisper-small") | |
config.update({"timeout": 60}) # Set timeout to 60 seconds | |
# Your function where you generate and save the audio | |
def generate_audio_prompt(text, filename): | |
try: | |
tts = gTTS(text) | |
tts.save(os.path.join("static", filename)) | |
except gtts.tts.gTTSError as e: | |
print(f"Error: {e}") | |
print("Retrying after 5 seconds...") | |
time.sleep(5) # Wait for 5 seconds before retrying | |
generate_audio_prompt(text, filename) | |
# Generate required voice prompts | |
prompts = { | |
"welcome": "Welcome to Biryani Hub.", | |
"ask_name": "Tell me your name.", | |
"ask_email": "Please provide your email address.", | |
"thank_you": "Thank you for registration." | |
} | |
for key, text in prompts.items(): | |
generate_audio_prompt(text, f"{key}.mp3") | |
# Symbol mapping for proper recognition | |
SYMBOL_MAPPING = { | |
"at the rate": "@", | |
"at": "@", | |
"dot": ".", | |
"underscore": "_", | |
"hash": "#", | |
"plus": "+", | |
"dash": "-", | |
"comma": ",", | |
"space": " " | |
} | |
# Function to convert audio to WAV format | |
def convert_to_wav(input_path, output_path): | |
try: | |
audio = AudioSegment.from_file(input_path) | |
audio = audio.set_frame_rate(16000).set_channels(1) # Convert to 16kHz, mono | |
audio.export(output_path, format="wav") | |
except Exception as e: | |
print(f"Error: {str(e)}") | |
raise Exception(f"Audio conversion failed: {str(e)}") | |
# Function to check if audio contains actual speech | |
def is_silent_audio(audio_path): | |
audio = AudioSegment.from_wav(audio_path) | |
nonsilent_parts = detect_nonsilent(audio, min_silence_len=500, silence_thresh=audio.dBFS-16) # Reduced silence duration | |
print(f"Detected nonsilent parts: {nonsilent_parts}") | |
return len(nonsilent_parts) == 0 # If no speech detected | |
# Set the secret key to handle sessions securely | |
app.secret_key = os.getenv("SECRET_KEY", "sSSjyhInIsUohKpG8sHzty2q") | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
# Salesforce connection setup | |
sf = Salesforce(username='diggavalli98@gmail.com', password='Sati@1020', security_token='sSSjyhInIsUohKpG8sHzty2q') | |
def index(): | |
return render_template("index.html") | |
def validate_login(): | |
try: | |
# Get the email and mobile number from the request | |
data = request.json | |
email = data.get("email") | |
mobile = data.get("mobile") | |
# Salesforce query to check if the email and mobile exist | |
query = f"SELECT Id, Name FROM Customer_Login__c WHERE Email__c = '{email}' AND Phone_Number__c = '{mobile}'" | |
result = sf.query(query) | |
if result['totalSize'] > 0: | |
return jsonify({'success': True, 'message': 'User authenticated successfully.'}), 200 | |
else: | |
return jsonify({'success': False, 'error': 'Invalid email or mobile number.'}), 400 | |
except Exception as e: | |
logging.error(f"Error: {str(e)}") | |
return jsonify({'error': 'Something went wrong. Please try again later.'}), 500 | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860, debug=True) | |
# Initialize Flask app | |
app = Flask(__name__) | |
# Set the secret key to handle sessions securely | |
app.secret_key = os.getenv("SECRET_KEY", "sSSjyhInIsUohKpG8sHzty2q") # Replace with a secure key | |
# Configure the session type | |
app.config["SESSION_TYPE"] = "filesystem" # Use filesystem for session storage | |
app.config["SESSION_COOKIE_NAME"] = "my_session" # Optional: Change session cookie name | |
app.config["SESSION_COOKIE_SECURE"] = True # Ensure cookies are sent over HTTPS | |
app.config["SESSION_COOKIE_SAMESITE"] = "None" # Allow cross-site cookies | |
# Initialize the session | |
Session(app) | |
# Set up logging | |
logging.basicConfig(level=logging.INFO) | |
def index(): | |
# Serve the HTML page for the voice-based login | |
return render_template("index.html") | |
def capture_email_and_mobile(): | |
try: | |
# Get the voice captured email and mobile number from the request | |
data = request.json | |
email = data.get("email") | |
mobile = data.get("mobile") | |
# Validate the captured email and mobile number | |
if not email or not mobile: | |
return jsonify({"error": "Email or mobile number is missing."}), 400 | |
# Log the captured data for now (you can replace it with actual processing logic) | |
logging.info(f"Captured Email: {email}, Mobile: {mobile}") | |
# For simplicity, we'll assume the capture was successful. | |
return jsonify({"success": True, "message": "Email and mobile captured successfully."}), 200 | |
except Exception as e: | |
logging.error(f"Error: {str(e)}") | |
return jsonify({"error": "Something went wrong while processing."}), 500 | |
# Start Production Server | |
if __name__ == "__main__": | |
app.run(host="0.0.0.0", port=7860, debug=True) | |