GradioTest / app.py
dlflannery's picture
Update app.py
a4fdbe4 verified
raw
history blame
14.2 kB
import os
from re import L
import tempfile
import gradio as gr
# import openai
from numpy._core.defchararray import isdecimal
from openai import OpenAI
from dotenv import load_dotenv
from pathlib import Path
from time import sleep
import audioread
import queue
import threading
from tempfile import NamedTemporaryFile
load_dotenv(override=True)
key = os.getenv('OPENAI_API_KEY')
users = os.getenv('LOGNAME')
unames = users.split(',')
pwds = os.getenv('PASSWORD')
pwdList = pwds.split(',')
site = os.getenv('SITE')
if site == 'local':
dp = Path('./data')
dp.mkdir(exist_ok=True)
dataDir = './data/'
else:
dp = Path('/data')
dp.mkdir(exist_ok=True)
dataDir = '/data/'
#speak_file = dataDir + "speek.wav"
client = OpenAI(api_key = key)
qspeech = queue.Queue()
qdelete = queue.Queue()
#digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: ']
abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '}
def genUsageStats(do_reset=False):
result = []
ttotal4o_in = 0
ttotal4o_out = 0
ttotal4mini_in = 0
ttotal4mini_out = 0
totalAudio = 0
totalSpeech = 0
for user in unames:
tokens4o_in = 0
tokens4o_out = 0
tokens4mini_in = 0
tokens4mini_out = 0
fp = dataDir + user + '_log.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(u, t) = line.split(':')
(t, m) = t.split('-')
(tin, tout) = t.split('/')
incount = int(tin)
outcount = int(tout)
if 'mini' in m:
tokens4mini_in += incount
tokens4mini_out += outcount
ttotal4mini_in += incount
ttotal4mini_out += outcount
else:
tokens4o_in += incount
tokens4o_out += outcount
ttotal4o_in += incount
ttotal4o_out += outcount
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading stats for user: {user}'
userAudio = 0
fp = dataDir + user + '_audio.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(dud, len) = line.split(':')
userAudio += int(len)
totalAudio += int(userAudio)
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading audio stats for user: {user}'
userSpeech = 0
fp = dataDir + user + '_speech.txt'
if os.path.exists(fp):
accessOk = False
for i in range(3):
try:
with open(fp) as f:
dataList = f.readlines()
if do_reset:
os.remove(fp)
else:
for line in dataList:
(dud, len) = line.split(':')
userSpeech += int(len)
totalSpeech += int(len)
accessOk = True
break
except:
sleep(3)
if not accessOk:
return f'File access failed reading speech stats for user: {user}'
result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}'])
result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}'])
return result
def clear():
while not qdelete.empty():
fname = qdelete.get()
if os.path.exists(fname):
os.remove(fname)
return [None, [], None]
def updatePassword(txt):
return [txt.lower().strip(), "*********"]
# def setModel(val):
# return val
def chat(prompt, user_window, pwd_window, past, response, gptModel):
user_window = user_window.lower().strip()
isBoss = False
if user_window == unames[0] and pwd_window == pwdList[0]:
isBoss = True
if prompt == 'stats':
response = genUsageStats()
return [past, response, None, gptModel]
if prompt == 'reset':
response = genUsageStats(True)
return [past, response, None, gptModel]
if prompt.startswith('gpt4'):
gptModel = 'gpt-4o'
prompt = prompt[5:]
if user_window in unames and pwd_window in pwdList:
past.append({"role":"user", "content":prompt})
completion = client.chat.completions.create(model=gptModel,
messages=past)
reply = completion.choices[0].message.content
tokens_in = completion.usage.prompt_tokens
tokens_out = completion.usage.completion_tokens
tokens = completion.usage.total_tokens
response += "\n\nYOU: " + prompt + "\nGPT: " + reply
if isBoss:
response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}"
if tokens > 40000:
response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON."
past.append({"role":"assistant", "content": reply})
accessOk = False
for i in range(3):
try:
dataFile = new_func(user_window)
with open(dataFile, 'a') as f:
m = '4o'
if 'mini' in gptModel:
m = '4omini'
f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n')
accessOk = True
break
except Exception as e:
sleep(3)
if not accessOk:
response += f"\nDATA LOG FAILED, path = {dataFile}"
return [past, response , None, gptModel]
else:
return [[], "User name and/or password are incorrect", prompt, gptModel]
def new_func(user_window):
dataFile = dataDir + user_window + '_log.txt'
return dataFile
def transcribe(user, pwd, fpath):
user = user.lower().strip()
pwd = pwd.lower().strip()
if not (user in unames and pwd in pwdList):
return 'Bad credentials'
with audioread.audio_open(fpath) as audio:
duration = int(audio.duration)
if duration > 0:
with open(dataDir + user + '_audio.txt','a') as f:
f.write(f'audio:{str(duration)}\n')
with open(fpath,'rb') as audio_file:
transcript = client.audio.transcriptions.create(
model='whisper-1', file = audio_file ,response_format = 'text' )
reply = transcript
return str(reply)
def pause_message():
return "Audio input is paused. Resume or Stop as desired"
# def gen_output_audio(txt):
# if len(txt) < 10:
# txt = "This dialog is too short to mess with!"
# response = client.audio.speech.create(model="tts-1", voice="fable", input=txt)
# with open(speak_file, 'wb') as fp:
# fp.write(response.content)
# return speak_file
def set_speak_button(txt):
vis = False
if len(txt) > 10:
vis = True
return gr.Button(visible=vis)
def clean_up():
while not qdelete.empty():
fname = qdelete.get()
if os.path.exists(fname):
os.remove(fname)
def speech_worker(chunks=[]):
for chunk in chunks:
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format="wav")
tempFile = NamedTemporaryFile(mode='wb', delete=False)
tempFile.write(response.content)
tempFile.close()
qdelete.put (tempFile.name)
qspeech.put(tempFile.name)
with gr.Blocks() as demo:
def initial_audio_output(txt, user):
global digits
global abbrevs
while not qspeech.empty():
dud = qspeech.get()
for s,x in abbrevs.items():
txt = txt.replace(s, x)
words_in = txt.replace('**', '').splitlines(False)
words_out = []
for s in words_in:
s = s.lstrip('- *@#$%^&_=+-')
if len(s) > 0:
loc = s.index(' ')
if loc > 1:
val = s[0:loc]
isnum = val.replace('.','0').isdecimal()
if isnum:
if val.endswith('.'):
val = val[:-1].replace('.',' point ') + '., '
else:
val = val.replace('.', ' point ') + ', '
s = 'num'+ val + s[loc:]
words_out.append(s)
chunklist = []
for chunk in words_out:
if chunk.strip() == '':
continue
isnumbered = chunk.startswith('num')
number = ''
loc = 0
if isnumbered:
chunk = chunk[3:]
loc = chunk.index(',')
number = chunk[0:loc]
chunk = chunk[loc:]
locs = []
for i in range(1,len(chunk)-1):
(a, b, c) = chunk[i-1:i+2]
if a.isdecimal() and b == '.' and c.isdecimal():
locs.append(i)
for i in locs:
chunk = chunk[:i] + ' point ' + chunk[i+1:]
if len(chunk) > 50:
finechunks = chunk.split('.')
for fchunk in finechunks:
if isnumbered:
fchunk = number + fchunk
isnumbered = False
if len(fchunk) > 0:
if fchunk != '"':
chunklist.append(fchunk)
else:
line = number + chunk
if line != '"':
chunklist.append(line)
total_speech = 0
for chunk in chunklist:
total_speech += len(chunk)
with open(dataDir + user + '_speech.txt','a') as f:
f.write(f'speech:{str(total_speech)}\n')
chunk = chunklist[0]
if chunk.strip() == '':
return gr.Audio(sources=None)
if len(chunklist) > 1:
threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],)).start()
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format="wav")
tempFile = NamedTemporaryFile(mode='wb', delete=False)
tempFile.write(response.content)
tempFile.close()
qdelete.put(tempFile.name)
return tempFile.name
def gen_output_audio():
try:
fname = qspeech.get(timeout=5)
except:
return gr.Audio(sources=None)
return fname
history = gr.State([])
password = gr.State("")
model = gr.State("gpt-4o-mini")
gr.Markdown('# GPT Chat')
gr.Markdown('Enter user name & password then enter prompt and click submit button. Restart conversation if topic changes')
gr.Markdown('You can enter prompts by voice. Tap Record, speak, then tap Stop.' +
' Tap "Reset Voice Entry", to enter more voice. Note: first voice response takes a long time.')
# heading = gr.Label(value="GPT Chat", scale=2, color="Crimson" )
with gr.Row():
user_window = gr.Textbox(label = "User Name")
pwd_window = gr.Textbox(label = "Password")
pwd_window.blur(updatePassword, pwd_window, [password, pwd_window])
with gr.Row():
audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions(
show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120)
reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1()
with gr.Row():
clear_button = gr.Button(value="Restart Conversation")
# gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")],
# value="gpt-3.5-turbo", label="GPT Model", interactive=True)
submit_button = gr.Button(value="Submit Prompt/Question")
speak_output = gr.Button(value="Speak Dialog", visible=False)
prompt_window = gr.Textbox(label = "Prompt or Question")
output_window = gr.Textbox(label = "Dialog")
submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model],
outputs=[history, output_window, prompt_window, model])
clear_button.click(clear, inputs=[], outputs=[prompt_window, history, output_window])
audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget],
outputs=[prompt_window])
audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window])
reset_button.add(audio_widget)
audio_out = gr.Audio(autoplay=True, visible=False)
audio_out.stop(fn=gen_output_audio, inputs=None, outputs = audio_out)
speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=audio_out)
output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output)
demo.unload(clean_up)
demo.launch(share=True)