Spaces:
Runtime error
Runtime error
import os | |
import json | |
from google.cloud import firestore | |
import gradio as gr | |
import whisper | |
from transformers import pipeline | |
from dotenv import load_dotenv | |
import base64 | |
import datetime | |
# Positive sentiments | |
positive_sentiments = [ | |
"approval", | |
"realization", | |
"joy", | |
"caring", | |
"relief", | |
"desire", | |
"admiration", | |
"optimism", | |
"love", | |
"excitement", | |
"curiosity", | |
"amusement", | |
"gratitude", | |
"pride" | |
] | |
# Load google cloud credentials | |
load_dotenv() | |
base64_credentials = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS') | |
decoded_credentials = base64.b64decode(base64_credentials).decode() | |
credentials_json = json.loads(decoded_credentials) | |
db = firestore.Client.from_service_account_info(credentials_json) | |
# ===== Authentication ===== | |
def authenticate(new_username, new_pw): | |
if new_username == '' or new_pw == '': return [None, None, 0, gr.update(), gr.update()] | |
users_ref = db.collection('Users') | |
doc_ref = users_ref.document(new_username) | |
doc = doc_ref.get() | |
new_score = 0 | |
if doc.exists: | |
# User exists in Firestore | |
user_data = doc.to_dict() | |
new_score = user_data['score'] | |
# Handle incorrect password | |
if user_data['password'] != new_pw: | |
raise gr.Error("Incorrect password") | |
else: | |
doc_ref.set({"username": new_username, "password": new_pw, "score": new_score}) | |
gr.Info(f"Welcome, {new_username}!") | |
show_welcome = gr.update(visible=True, value=f'<div style=\'height:190px; display:flex; justify-content:center; align-items:center;\'><h1 style=\'text-align:center\'>Hello {new_username}! 👋</h1></div>') | |
hide_signin = gr.update(visible=False) | |
return [new_username, new_pw, new_score, show_welcome, hide_signin] | |
def get_user_transcripts(username): | |
arr = [] | |
if username is None: return [gr.update(value=arr)] | |
# Fetch user's records | |
user_transcripts = db.collection(f'Users/{username}/Transcripts').stream() | |
for trans in user_transcripts: | |
trans_dict = trans.to_dict() | |
arr.append([trans_dict['date'], trans_dict['transcription'], trans_dict['sentiment_output']]) | |
if (len(arr) == 0): | |
arr = ['', '', ''] | |
return arr | |
def get_user_score(username): | |
doc = db.document(f'Users/{username}').get() | |
if doc.exists: | |
# User exists in Firestore | |
user_data = doc.to_dict() | |
return [f""" | |
<p align="center">Earn points by making customers happy!</p> | |
<br/> | |
<h1 align="center" style=\'font-size:56px;\'>{user_data["score"]}</h1> | |
"""] | |
return [f'<h1 align="center"></h1>'] | |
# ===== Loading Whisper ===== | |
# Load model | |
model = whisper.load_model("base") | |
sentiment_analysis = pipeline("sentiment-analysis", framework="pt", model="SamLowe/roberta-base-go_emotions") | |
def analyze_sentiment(text): | |
results = sentiment_analysis(text) | |
sentiment_results = {result['label']: result['score'] for result in results} | |
return sentiment_results | |
def is_positive(result): | |
result = result.split(' ')[0] | |
if (result in positive_sentiments): | |
return True | |
return False | |
def get_sentiment_emoji(sentiment): | |
# Define the emojis corresponding to each sentiment | |
emoji_mapping = { | |
"disappointment": "😞", | |
"sadness": "😢", | |
"annoyance": "😠", | |
"neutral": "😐", | |
"disapproval": "👎", | |
"realization": "😮", | |
"nervousness": "😬", | |
"approval": "👍", | |
"joy": "😄", | |
"anger": "😡", | |
"embarrassment": "😳", | |
"caring": "🤗", | |
"remorse": "😔", | |
"disgust": "🤢", | |
"grief": "😥", | |
"confusion": "😕", | |
"relief": "😌", | |
"desire": "😍", | |
"admiration": "😌", | |
"optimism": "😊", | |
"fear": "😨", | |
"love": "❤️", | |
"excitement": "🎉", | |
"curiosity": "🤔", | |
"amusement": "😄", | |
"surprise": "😲", | |
"gratitude": "🙏", | |
"pride": "🦁" | |
} | |
return emoji_mapping.get(sentiment, "") | |
def display_sentiment_results(sentiment_results, option): | |
sentiment_text = "" | |
for sentiment, score in sentiment_results.items(): | |
emoji = get_sentiment_emoji(sentiment) | |
if option == "Sentiment Only": | |
sentiment_text += f"{sentiment} {emoji}\n" | |
elif option == "Sentiment + Score": | |
sentiment_text += f"{sentiment} {emoji}: {score}\n" | |
return sentiment_text | |
def inference(username, audio, sentiment_option): | |
audio = whisper.load_audio(audio) | |
audio = whisper.pad_or_trim(audio) | |
mel = whisper.log_mel_spectrogram(audio).to(model.device) | |
_, probs = model.detect_language(mel) | |
lang = max(probs, key=probs.get) | |
options = whisper.DecodingOptions(fp16=False) | |
result = whisper.decode(model, mel, options) | |
sentiment_results = analyze_sentiment(result.text) | |
sentiment_output = display_sentiment_results(sentiment_results, sentiment_option) | |
if username: | |
# save results in firestore | |
ts = datetime.datetime.now() | |
ts_formatted = ts.strftime("%d %b %Y, %H:%M") | |
ref = db.document(f'Users/{username}') | |
transcript_ref = db.document(f'Users/{username}/Transcripts/{ts_formatted}') | |
transcript_ref.set({"date": ts_formatted, "transcription": result.text, "sentiment_output": sentiment_output}) | |
person_doc = ref.get() | |
user_data = person_doc.to_dict() | |
new_score = user_data['score'] | |
if is_positive(sentiment_output): | |
new_score = new_score + 1 | |
db.document(f'Users/{username}').update({"score": new_score}) | |
gr.Info("Transcription saved!") | |
return lang.upper(), result.text, sentiment_output, new_score | |
title = """<h1 align="center">☕ Lim Kopi Call Center Service 💬</h1>""" | |
image_path = "coffee_logo.jpg" | |
description = """ | |
💻 This MVP shows how we can use Whisper to conduct audio sentiment analysis on voice recordings of customer service agents. Whisper is a general speech recognition model built by OpenAI. It is trained on a large dataset of diverse audio and supports multilingual speech recognition, speech translation, and language identification tasks.<br><br> | |
⚙️ MVP Components:<br> | |
<br> | |
- Real-time multilingual speech recognition<br> | |
- Language identification<br> | |
- Sentiment analysis of the transcriptions<br> | |
<br> | |
🎯 The sentiment analysis results are provided as a dictionary with different emotions and their corresponding scores, so customer service agents can receive feedback on the overall call quality and customer receptiveness.<br> | |
<br> | |
😃 The sentiment analysis results are displayed with emojis representing the corresponding sentiment.<br> | |
<br> | |
✅ The higher the score for a specific emotion, the stronger the presence of that emotion in the transcribed text.<br> | |
<br> | |
❓ Use the microphone for real-time speech recognition.<br> | |
<br> | |
⚡️ The model will transcribe the audio for record-keeping, and perform sentiment analysis on the transcribed text.<br> | |
""" | |
custom_css = """ | |
#banner-image { | |
display: app; | |
margin-left: auto; | |
margin-right: auto; | |
} | |
#chat-message { | |
font-size: 14px; | |
min-height: 300px; | |
} | |
#flex-row { | |
display: flex; | |
flex-direction: row; | |
justify-content: center; | |
align-items: flex-end; | |
} | |
""" | |
app = gr.Blocks(title="Lim Kopi Call Center Service", css=custom_css) | |
with app: | |
gr.HTML(title) | |
authed_username = gr.State(value=None) | |
authed_password = gr.State(value=None) | |
user_score = gr.State(value=0) | |
# ===== UI ===== | |
with gr.Row(): | |
with gr.Column(): | |
with gr.Group(): | |
gr.Image(image_path, elem_id="banner-image", show_label=False) | |
with gr.Column(): | |
gr.HTML(description) | |
with gr.Row(elem_id="flex-row"): | |
with gr.Column(scale=1): | |
user_welcome = gr.HTML(visible=False) | |
with gr.Group(visible=True) as auth_block: | |
username_input = gr.Textbox(label="Username", placeholder="Your cool alter ego") | |
password_input = gr.Textbox(label="Password", type='password', placeholder="A super secret code only you know") | |
login_btn = gr.Button("Login / Sign Up") | |
login_btn.click( | |
authenticate, | |
inputs=[username_input, password_input], | |
outputs=[authed_username, authed_password, user_score, user_welcome, auth_block] | |
) | |
with gr.Column(scale=1): | |
with gr.Group(): | |
sentiment_option = gr.Radio( | |
choices=["Sentiment Only", "Sentiment + Score"], | |
label="Select an option", | |
) | |
audio = gr.Audio( | |
source="microphone", | |
type="filepath" | |
) | |
btn = gr.Button("Transcribe") | |
with gr.Row(): | |
with gr.Column(): | |
gr.HTML("<br/>") | |
gr.HTML("""<h1 align="center">🎉 Results</h1>""") | |
with gr.Group(): | |
lang_str = gr.Textbox(label="Language") | |
text = gr.Textbox(label="Transcription") | |
sentiment_output = gr.Textbox(label="Sentiment Analysis Results") | |
btn.click(inference, inputs=[authed_username, audio, sentiment_option], outputs=[lang_str, text, sentiment_output, user_score]) | |
with gr.Row(visible=True) as scoreboard: | |
with gr.Column(): | |
gr.HTML("<br/>") | |
gr.HTML("""<h1 align="center">💯 Your Score</h1>""") | |
score_sheet = gr.HTML(visible=True, value=f'<p align="center">Log in to see your score and transcripts</p>') | |
user_welcome.change(get_user_score, inputs=[authed_username], outputs=[score_sheet]) | |
sentiment_output.change(get_user_score, inputs=[authed_username], outputs=[score_sheet]) | |
with gr.Row(visible=True) as transcription_records: | |
with gr.Column(): | |
gr.HTML("<br/>") | |
gr.HTML("""<h1 align="center"> 🪩 Your Transcription Records</h1>""") | |
transcription_df = gr.Dataframe( | |
headers=["Date", "Transcription", "Sentiment"], | |
datatype=["str", "str", "str"], | |
value=get_user_transcripts(authed_username.value), | |
type='array', | |
), | |
user_welcome.change(get_user_transcripts, inputs=[authed_username], outputs=[transcription_df[0]]) | |
sentiment_output.change(get_user_transcripts, inputs=[authed_username], outputs=[transcription_df[0]]) | |
app.queue() | |
app.launch() |