Spaces:
Running
Running
import os | |
from re import L | |
import tempfile | |
import gradio as gr | |
# import openai | |
from numpy._core.defchararray import isdecimal | |
from openai import OpenAI | |
from dotenv import load_dotenv | |
from pathlib import Path | |
from time import sleep | |
import audioread | |
import queue | |
import threading | |
from tempfile import NamedTemporaryFile | |
load_dotenv(override=True) | |
key = os.getenv('OPENAI_API_KEY') | |
users = os.getenv('LOGNAME') | |
unames = users.split(',') | |
pwds = os.getenv('PASSWORD') | |
pwdList = pwds.split(',') | |
site = os.getenv('SITE') | |
if site == 'local': | |
dp = Path('./data') | |
dp.mkdir(exist_ok=True) | |
dataDir = './data/' | |
else: | |
dp = Path('/data') | |
dp.mkdir(exist_ok=True) | |
dataDir = '/data/' | |
#speak_file = dataDir + "speek.wav" | |
client = OpenAI(api_key = key) | |
qspeech = queue.Queue() | |
qdelete = queue.Queue() | |
#digits = ['zero: ','one: ','two: ','three: ','four: ','five: ','six: ','seven: ','eight: ','nine: '] | |
abbrevs = {'St. ' : 'Saint ', 'Mr. ': 'mister ', 'Mrs. ':'mussus ', 'Mr. ':'mister ', 'Ms. ':'mizz '} | |
def genUsageStats(do_reset=False): | |
result = [] | |
ttotal4o_in = 0 | |
ttotal4o_out = 0 | |
ttotal4mini_in = 0 | |
ttotal4mini_out = 0 | |
totalAudio = 0 | |
totalSpeech = 0 | |
for user in unames: | |
tokens4o_in = 0 | |
tokens4o_out = 0 | |
tokens4mini_in = 0 | |
tokens4mini_out = 0 | |
fp = dataDir + user + '_log.txt' | |
if os.path.exists(fp): | |
accessOk = False | |
for i in range(3): | |
try: | |
with open(fp) as f: | |
dataList = f.readlines() | |
if do_reset: | |
os.remove(fp) | |
else: | |
for line in dataList: | |
(u, t) = line.split(':') | |
(t, m) = t.split('-') | |
(tin, tout) = t.split('/') | |
incount = int(tin) | |
outcount = int(tout) | |
if 'mini' in m: | |
tokens4mini_in += incount | |
tokens4mini_out += outcount | |
ttotal4mini_in += incount | |
ttotal4mini_out += outcount | |
else: | |
tokens4o_in += incount | |
tokens4o_out += outcount | |
ttotal4o_in += incount | |
ttotal4o_out += outcount | |
accessOk = True | |
break | |
except: | |
sleep(3) | |
if not accessOk: | |
return f'File access failed reading stats for user: {user}' | |
userAudio = 0 | |
fp = dataDir + user + '_audio.txt' | |
if os.path.exists(fp): | |
accessOk = False | |
for i in range(3): | |
try: | |
with open(fp) as f: | |
dataList = f.readlines() | |
if do_reset: | |
os.remove(fp) | |
else: | |
for line in dataList: | |
(dud, len) = line.split(':') | |
userAudio += int(len) | |
totalAudio += int(userAudio) | |
accessOk = True | |
break | |
except: | |
sleep(3) | |
if not accessOk: | |
return f'File access failed reading audio stats for user: {user}' | |
userSpeech = 0 | |
fp = dataDir + user + '_speech.txt' | |
if os.path.exists(fp): | |
accessOk = False | |
for i in range(3): | |
try: | |
with open(fp) as f: | |
dataList = f.readlines() | |
if do_reset: | |
os.remove(fp) | |
else: | |
for line in dataList: | |
(dud, len) = line.split(':') | |
userSpeech += int(len) | |
totalSpeech += int(len) | |
accessOk = True | |
break | |
except: | |
sleep(3) | |
if not accessOk: | |
return f'File access failed reading speech stats for user: {user}' | |
result.append([user, f'{tokens4mini_in}/{tokens4mini_out}', f'{tokens4o_in}/{tokens4o_out}', f'audio:{userAudio}',f'speech:{userSpeech}']) | |
result.append(['totals', f'{ttotal4mini_in}/{ttotal4mini_out}', f'{ttotal4o_in}/{ttotal4o_out}', f'audio:{totalAudio}',f'speech:{totalSpeech}']) | |
return result | |
def clear(): | |
while not qdelete.empty(): | |
fname = qdelete.get() | |
if os.path.exists(fname): | |
os.remove(fname) | |
return [None, [], None] | |
def updatePassword(txt): | |
return [txt.lower().strip(), "*********"] | |
# def setModel(val): | |
# return val | |
def chat(prompt, user_window, pwd_window, past, response, gptModel): | |
user_window = user_window.lower().strip() | |
isBoss = False | |
if user_window == unames[0] and pwd_window == pwdList[0]: | |
isBoss = True | |
if prompt == 'stats': | |
response = genUsageStats() | |
return [past, response, None, gptModel] | |
if prompt == 'reset': | |
response = genUsageStats(True) | |
return [past, response, None, gptModel] | |
if prompt.startswith('gpt4'): | |
gptModel = 'gpt-4o' | |
prompt = prompt[5:] | |
if user_window in unames and pwd_window in pwdList: | |
past.append({"role":"user", "content":prompt}) | |
completion = client.chat.completions.create(model=gptModel, | |
messages=past) | |
reply = completion.choices[0].message.content | |
tokens_in = completion.usage.prompt_tokens | |
tokens_out = completion.usage.completion_tokens | |
tokens = completion.usage.total_tokens | |
response += "\n\nYOU: " + prompt + "\nGPT: " + reply | |
if isBoss: | |
response += f"\n{gptModel}: tokens in/out = {tokens_in}/{tokens_out}" | |
if tokens > 40000: | |
response += "\n\nTHIS DIALOG IS GETTING TOO LONG. PLEASE RESTART CONVERSATION SOON." | |
past.append({"role":"assistant", "content": reply}) | |
accessOk = False | |
for i in range(3): | |
try: | |
dataFile = new_func(user_window) | |
with open(dataFile, 'a') as f: | |
m = '4o' | |
if 'mini' in gptModel: | |
m = '4omini' | |
f.write(f'{user_window}:{tokens_in}/{tokens_out}-{m}\n') | |
accessOk = True | |
break | |
except Exception as e: | |
sleep(3) | |
if not accessOk: | |
response += f"\nDATA LOG FAILED, path = {dataFile}" | |
return [past, response , None, gptModel] | |
else: | |
return [[], "User name and/or password are incorrect", prompt, gptModel] | |
def new_func(user_window): | |
dataFile = dataDir + user_window + '_log.txt' | |
return dataFile | |
def transcribe(user, pwd, fpath): | |
user = user.lower().strip() | |
pwd = pwd.lower().strip() | |
if not (user in unames and pwd in pwdList): | |
return 'Bad credentials' | |
with audioread.audio_open(fpath) as audio: | |
duration = int(audio.duration) | |
if duration > 0: | |
with open(dataDir + user + '_audio.txt','a') as f: | |
f.write(f'audio:{str(duration)}\n') | |
with open(fpath,'rb') as audio_file: | |
transcript = client.audio.transcriptions.create( | |
model='whisper-1', file = audio_file ,response_format = 'text' ) | |
reply = transcript | |
return str(reply) | |
def pause_message(): | |
return "Audio input is paused. Resume or Stop as desired" | |
# def gen_output_audio(txt): | |
# if len(txt) < 10: | |
# txt = "This dialog is too short to mess with!" | |
# response = client.audio.speech.create(model="tts-1", voice="fable", input=txt) | |
# with open(speak_file, 'wb') as fp: | |
# fp.write(response.content) | |
# return speak_file | |
def set_speak_button(txt): | |
vis = False | |
if len(txt) > 10: | |
vis = True | |
return gr.Button(visible=vis) | |
def clean_up(): | |
while not qdelete.empty(): | |
fname = qdelete.get() | |
if os.path.exists(fname): | |
os.remove(fname) | |
def speech_worker(chunks=[]): | |
for chunk in chunks: | |
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format="wav") | |
tempFile = NamedTemporaryFile(mode='wb', delete=False) | |
tempFile.write(response.content) | |
tempFile.close() | |
qdelete.put (tempFile.name) | |
qspeech.put(tempFile.name) | |
with gr.Blocks() as demo: | |
def initial_audio_output(txt, user): | |
global digits | |
global abbrevs | |
while not qspeech.empty(): | |
dud = qspeech.get() | |
for s,x in abbrevs.items(): | |
txt = txt.replace(s, x) | |
words_in = txt.replace('**', '').splitlines(False) | |
words_out = [] | |
for s in words_in: | |
s = s.lstrip('- *@#$%^&_=+-') | |
if len(s) > 0: | |
loc = s.index(' ') | |
if loc > 1: | |
val = s[0:loc] | |
isnum = val.replace('.','0').isdecimal() | |
if isnum: | |
if val.endswith('.'): | |
val = val[:-1].replace('.',' point ') + '., ' | |
else: | |
val = val.replace('.', ' point ') + ', ' | |
s = 'num'+ val + s[loc:] | |
words_out.append(s) | |
chunklist = [] | |
for chunk in words_out: | |
if chunk.strip() == '': | |
continue | |
isnumbered = chunk.startswith('num') | |
number = '' | |
loc = 0 | |
if isnumbered: | |
chunk = chunk[3:] | |
loc = chunk.index(',') | |
number = chunk[0:loc] | |
chunk = chunk[loc:] | |
locs = [] | |
for i in range(1,len(chunk)-1): | |
(a, b, c) = chunk[i-1:i+2] | |
if a.isdecimal() and b == '.' and c.isdecimal(): | |
locs.append(i) | |
for i in locs: | |
chunk = chunk[:i] + ' point ' + chunk[i+1:] | |
if len(chunk) > 50: | |
finechunks = chunk.split('.') | |
for fchunk in finechunks: | |
if isnumbered: | |
fchunk = number + fchunk | |
isnumbered = False | |
if len(fchunk) > 0: | |
if fchunk != '"': | |
chunklist.append(fchunk) | |
else: | |
line = number + chunk | |
if line != '"': | |
chunklist.append(line) | |
total_speech = 0 | |
for chunk in chunklist: | |
total_speech += len(chunk) | |
with open(dataDir + user + '_speech.txt','a') as f: | |
f.write(f'speech:{str(total_speech)}\n') | |
chunk = chunklist[0] | |
if chunk.strip() == '': | |
return gr.Audio(sources=None) | |
if len(chunklist) > 1: | |
threading.Thread(target=speech_worker, daemon=True, args=(chunklist[1:],)).start() | |
response = client.audio.speech.create(model="tts-1", voice="fable", input=chunk, speed=0.85, response_format="wav") | |
tempFile = NamedTemporaryFile(mode='wb', delete=False) | |
tempFile.write(response.content) | |
tempFile.close() | |
qdelete.put(tempFile.name) | |
return tempFile.name | |
def gen_output_audio(): | |
try: | |
fname = qspeech.get(timeout=5) | |
except: | |
return gr.Audio(sources=None) | |
return fname | |
history = gr.State([]) | |
password = gr.State("") | |
model = gr.State("gpt-4o-mini") | |
gr.Markdown('# GPT Chat') | |
gr.Markdown('Enter user name & password then enter prompt and click submit button. Restart conversation if topic changes') | |
gr.Markdown('You can enter prompts by voice. Tap Record, speak, then tap Stop.' + | |
' Tap "Reset Voice Entry", to enter more voice. Note: first voice response takes a long time.') | |
# heading = gr.Label(value="GPT Chat", scale=2, color="Crimson" ) | |
with gr.Row(): | |
user_window = gr.Textbox(label = "User Name") | |
pwd_window = gr.Textbox(label = "Password") | |
pwd_window.blur(updatePassword, pwd_window, [password, pwd_window]) | |
with gr.Row(): | |
audio_widget = gr.Audio(type='filepath', format='wav',waveform_options=gr.WaveformOptions( | |
show_recording_waveform=True), sources=['microphone'], scale = 3, label="Prompt/Question Voice Entry", max_length=120) | |
reset_button = gr.ClearButton(value="Reset Voice Entry", scale=1) #new_func1() | |
with gr.Row(): | |
clear_button = gr.Button(value="Restart Conversation") | |
# gpt_chooser=gr.Radio(choices=[("GPT-3.5","gpt-3.5-turbo"),("GPT-4o","gpt-4o-mini")], | |
# value="gpt-3.5-turbo", label="GPT Model", interactive=True) | |
submit_button = gr.Button(value="Submit Prompt/Question") | |
speak_output = gr.Button(value="Speak Dialog", visible=False) | |
prompt_window = gr.Textbox(label = "Prompt or Question") | |
output_window = gr.Textbox(label = "Dialog") | |
submit_button.click(chat, inputs=[prompt_window, user_window, password, history, output_window, model], | |
outputs=[history, output_window, prompt_window, model]) | |
clear_button.click(clear, inputs=[], outputs=[prompt_window, history, output_window]) | |
audio_widget.stop_recording(fn=transcribe, inputs=[user_window, password, audio_widget], | |
outputs=[prompt_window]) | |
audio_widget.pause_recording(fn=pause_message, outputs=[prompt_window]) | |
reset_button.add(audio_widget) | |
audio_out = gr.Audio(autoplay=True, visible=False) | |
audio_out.stop(fn=gen_output_audio, inputs=None, outputs = audio_out) | |
speak_output.click(fn=initial_audio_output, inputs=[output_window, user_window], outputs=audio_out) | |
output_window.change(fn=set_speak_button, inputs=output_window,outputs=speak_output) | |
demo.unload(clean_up) | |
demo.launch(share=True) | |