Spaces:
Running
Running
import gradio as gr | |
import whisper | |
import asyncio | |
import httpx | |
import tempfile | |
import os | |
import requests | |
import time | |
import threading | |
from datetime import datetime, timedelta | |
session = requests.Session() | |
from interview_protocol import protocols as interview_protocols | |
model = whisper.load_model("base") | |
base_url = "https://llm4socialisolation-fd4082d0a518.herokuapp.com" | |
# base_url = "http://localhost:8080" | |
timeout = 60 | |
concurrency_count=10 | |
# mapping between display names and internal chatbot_type values | |
display_to_value = { | |
'Echo': 'enhanced', | |
'Breeze': 'baseline' | |
} | |
value_to_display = { | |
'enhanced': 'Echo', | |
'baseline': 'Breeze' | |
} | |
def get_method_index(chapter, method): | |
all_methods = [] | |
for chap in interview_protocols.values(): | |
all_methods.extend(chap) | |
index = all_methods.index(method) | |
return index | |
async def initialization(api_key, chapter_name, topic_name, username, prompts, chatbot_type): | |
url = f"{base_url}/api/initialization" | |
headers = {'Content-Type': 'application/json'} | |
data = { | |
'api_key': api_key, | |
'chapter_name': chapter_name, | |
'topic_name': topic_name, | |
'username': username, | |
'chatbot_type': chatbot_type, | |
**prompts | |
} | |
async with httpx.AsyncClient(timeout=timeout) as client: | |
try: | |
response = await client.post(url, json=data, headers=headers) | |
if response.status_code == 200: | |
return "Initialization successful." | |
else: | |
return f"Initialization failed: {response.text}" | |
except asyncio.TimeoutError: | |
print("The request timed out") | |
return "Request timed out during initialization." | |
except Exception as e: | |
return f"Error in initialization: {str(e)}" | |
def fetch_default_prompts(chatbot_type): | |
url = f"{base_url}?chatbot_type={chatbot_type}" | |
try: | |
response = httpx.get(url, timeout=timeout) | |
if response.status_code == 200: | |
prompts = response.json() | |
print(prompts) | |
return prompts | |
else: | |
print(f"Failed to fetch prompts: {response.status_code} - {response.text}") | |
return {} | |
except Exception as e: | |
print(f"Error fetching prompts: {str(e)}") | |
return {} | |
async def get_backend_response(api_key, patient_prompt, username, chatbot_type): | |
url = f"{base_url}/responses/doctor" | |
headers = {'Content-Type': 'application/json'} | |
data = { | |
'username': username, | |
'patient_prompt': patient_prompt, | |
'chatbot_type': chatbot_type | |
} | |
async with httpx.AsyncClient(timeout=timeout) as client: | |
try: | |
response = await client.post(url, json=data, headers=headers) | |
if response.status_code == 200: | |
response_data = response.json() | |
return response_data | |
else: | |
return f"Failed to fetch response from backend: {response.text}" | |
except Exception as e: | |
return f"Error contacting backend service: {str(e)}" | |
async def save_conversation_and_memory(username, chatbot_type): | |
url = f"{base_url}/save/end_and_save" | |
headers = {'Content-Type': 'application/json'} | |
data = { | |
'username': username, | |
'chatbot_type': chatbot_type | |
} | |
async with httpx.AsyncClient(timeout=timeout) as client: | |
try: | |
response = await client.post(url, json=data, headers=headers) | |
if response.status_code == 200: | |
response_data = response.json() | |
return response_data.get('message', 'Saving Error!') | |
else: | |
return f"Failed to save conversations and memory graph: {response.text}" | |
except Exception as e: | |
return f"Error contacting backend service: {str(e)}" | |
async def get_conversation_histories(username, chatbot_type): | |
url = f"{base_url}/save/download_conversations" | |
headers = {'Content-Type': 'application/json'} | |
data = { | |
'username': username, | |
'chatbot_type': chatbot_type | |
} | |
async with httpx.AsyncClient(timeout=timeout) as client: | |
try: | |
response = await client.post(url, json=data, headers=headers) | |
if response.status_code == 200: | |
conversation_data = response.json() | |
return conversation_data | |
else: | |
return [] | |
except Exception as e: | |
return [] | |
def download_conversations(username, chatbot_type): | |
conversation_histories = asyncio.run(get_conversation_histories(username, chatbot_type)) | |
files = [] | |
temp_dir = tempfile.mkdtemp() | |
for conversation_entry in conversation_histories: | |
file_name = conversation_entry.get('file_name', f"Conversation_{len(files)+1}.txt") | |
conversation = conversation_entry.get('conversation', []) | |
conversation_text = "" | |
for message_pair in conversation: | |
if isinstance(message_pair, list) and len(message_pair) == 2: | |
speaker, message = message_pair | |
conversation_text += f"{speaker.capitalize()}: {message}\n\n" | |
else: | |
conversation_text += f"Unknown format: {message_pair}\n\n" | |
temp_file_path = os.path.join(temp_dir, file_name) | |
with open(temp_file_path, 'w') as temp_file: | |
temp_file.write(conversation_text) | |
files.append(temp_file_path) | |
return files | |
async def get_biography(username, chatbot_type): | |
url = f"{base_url}/save/generate_autobiography" | |
headers = {'Content-Type': 'application/json'} | |
data = { | |
'username': username, | |
'chatbot_type': chatbot_type | |
} | |
async with httpx.AsyncClient(timeout=timeout) as client: | |
try: | |
response = await client.post(url, json=data, headers=headers) | |
if response.status_code == 200: | |
biography_data = response.json() | |
biography_text = biography_data.get('biography', '') | |
return biography_text | |
else: | |
return "Failed to generate biography." | |
except Exception as e: | |
return f"Error contacting backend service: {str(e)}" | |
def download_biography(username, chatbot_type): | |
biography_text = asyncio.run(get_biography(username, chatbot_type)) | |
if not biography_text or "Failed" in biography_text or "Error" in biography_text: | |
return gr.update(value=None, visible=False), gr.update(value=biography_text, visible=True) | |
temp_dir = tempfile.mkdtemp() | |
temp_file_path = os.path.join(temp_dir, "biography.txt") | |
with open(temp_file_path, 'w') as temp_file: | |
temp_file.write(biography_text) | |
return temp_file_path, gr.update(value=biography_text, visible=True) | |
def transcribe_audio(audio_file): | |
transcription = model.transcribe(audio_file)["text"] | |
return transcription | |
def submit_text_and_respond(edited_text, api_key, username, history, chatbot_type): | |
response = asyncio.run(get_backend_response(api_key, edited_text, username, chatbot_type)) | |
print('------') | |
print(response) | |
if isinstance(response, str): | |
history.append((edited_text, response)) | |
return history, "", [] | |
doctor_response = response['doctor_response']['response'] | |
memory_event = response.get('memory_events', []) | |
history.append((edited_text, doctor_response)) | |
memory_graph = update_memory_graph(memory_event) | |
return history, "", memory_graph # Return memory_graph as output | |
def set_initialize_button(api_key_input, chapter_name, topic_name, username_input, | |
system_prompt_text, conv_instruction_prompt_text, therapy_prompt_text, autobio_prompt_text, chatbot_display_name): | |
chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced') | |
prompts = { | |
'system_prompt': system_prompt_text, | |
'conv_instruction_prompt': conv_instruction_prompt_text, | |
'therapy_prompt': therapy_prompt_text, | |
'autobio_prompt': autobio_prompt_text | |
} | |
message = asyncio.run(initialization(api_key_input, chapter_name, topic_name, username_input, prompts, chatbot_type)) | |
print(message) | |
return message, api_key_input, chatbot_type | |
def save_conversation(username, chatbot_type): | |
response = asyncio.run(save_conversation_and_memory(username, chatbot_type)) | |
return response | |
def start_recording(audio_file): | |
if not audio_file: | |
return "" | |
try: | |
transcription = transcribe_audio(audio_file) | |
return transcription | |
except Exception as e: | |
return f"Failed to transcribe: {str(e)}" | |
def update_methods(chapter): | |
return gr.update(choices=interview_protocols[chapter], value=interview_protocols[chapter][0]) | |
def update_memory_graph(memory_data): | |
table_data = [] | |
for node in memory_data: | |
table_data.append([ | |
node.get('date', ''), | |
node.get('topic', ''), | |
node.get('event_description', ''), | |
node.get('people_involved', '') | |
]) | |
return table_data | |
def update_prompts(chatbot_display_name): | |
chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced') | |
prompts = fetch_default_prompts(chatbot_type) | |
return ( | |
gr.update(value=prompts.get('system_prompt', '')), | |
gr.update(value=prompts.get('conv_instruction_prompt', '')), | |
gr.update(value=prompts.get('therapy_prompt', '')), | |
gr.update(value=prompts.get('autobio_generation_prompt', '')), | |
) | |
def update_chatbot_type(chatbot_display_name): | |
chatbot_type = display_to_value.get(chatbot_display_name, 'enhanced') | |
return chatbot_type | |
# Function to start the periodic toggle | |
def start_timer(): | |
target_timestamp = datetime.now() + timedelta(seconds=8 * 60) | |
return True, target_timestamp | |
def reset_timer(): | |
is_running = False | |
return is_running, "Timer remaining: 8:00" | |
# Async function to manage periodic updates, running every second | |
def periodic_call(is_running, target_timestamp): | |
if is_running: | |
prefix = 'Time remaining:' | |
time_difference = target_timestamp - datetime.now() | |
second_left = int(round(time_difference.total_seconds())) | |
if second_left <= 0: | |
second_left = 0 | |
minutes, seconds = divmod(second_left, 60) | |
new_remain_min = f'{minutes:02}' | |
new_remain_second = f'{seconds:02}' | |
new_info = f'{prefix} {new_remain_min}:{new_remain_second}' | |
return new_info | |
else: | |
return 'Time remaining: 8:00' | |
# initialize prompts with empty strings | |
initial_prompts = {'system_prompt': '', 'conv_instruction_prompt': '', 'therapy_prompt': '', 'autobio_generation_prompt': ''} | |
# CSS to keep the buttons small | |
css = """ | |
#start_button, #reset_button { | |
padding: 4px 10px !important; | |
font-size: 12px !important; | |
width: auto !important; | |
} | |
""" | |
with gr.Blocks(css=css) as app: | |
chatbot_type_state = gr.State('enhanced') | |
api_key_state = gr.State() | |
prompt_visibility_state = gr.State(False) | |
is_running = gr.State() | |
target_timestamp = gr.State() | |
with gr.Row(): | |
with gr.Column(scale=1, min_width=250): | |
gr.Markdown("## Settings") | |
# chatbot Type Selection | |
with gr.Box(): | |
gr.Markdown("### Chatbot Selection") | |
chatbot_type_dropdown = gr.Dropdown( | |
label="Select Chatbot Type", | |
choices=['Echo', 'Breeze'], | |
value='Echo', | |
) | |
chatbot_type_dropdown.change( | |
fn=update_chatbot_type, | |
inputs=[chatbot_type_dropdown], | |
outputs=[chatbot_type_state] | |
) | |
# fetch initial prompts based on the default chatbot type | |
system_prompt_value, conv_instruction_prompt_value, therapy_prompt_value, autobio_prompt_value = update_prompts('Echo') | |
# interview protocol selection | |
with gr.Box(): | |
gr.Markdown("### Interview Protocol") | |
chapter_dropdown = gr.Dropdown( | |
label="Select Chapter", | |
choices=list(interview_protocols.keys()), | |
value=list(interview_protocols.keys())[1], | |
) | |
method_dropdown = gr.Dropdown( | |
label="Select Topic", | |
choices=interview_protocols[chapter_dropdown.value], | |
value=interview_protocols[chapter_dropdown.value][3], | |
) | |
chapter_dropdown.change( | |
fn=update_methods, | |
inputs=[chapter_dropdown], | |
outputs=[method_dropdown] | |
) | |
# Update states when selections change | |
def update_chapter(chapter): | |
return chapter | |
def update_method(method): | |
return method | |
chapter_state = gr.State() | |
method_state = gr.State() | |
chapter_dropdown.change( | |
fn=update_chapter, | |
inputs=[chapter_dropdown], | |
outputs=[chapter_state] | |
) | |
method_dropdown.change( | |
fn=update_method, | |
inputs=[method_dropdown], | |
outputs=[method_state] | |
) | |
# customize Prompts | |
with gr.Box(): | |
toggle_prompts_button = gr.Button("Show Prompts") | |
# wrap the prompts in a component with initial visibility set to False | |
with gr.Column(visible=False) as prompt_section: | |
gr.Markdown("### Customize Prompts") | |
system_prompt = gr.Textbox( | |
label="System Prompt", | |
placeholder="Enter the system prompt here.", | |
value=system_prompt_value['value'] | |
) | |
conv_instruction_prompt = gr.Textbox( | |
label="Conversation Instruction Prompt", | |
placeholder="Enter the instruction for each conversation here.", | |
value=conv_instruction_prompt_value['value'] | |
) | |
therapy_prompt = gr.Textbox( | |
label="Therapy Prompt", | |
placeholder="Enter the instruction for reminiscence therapy.", | |
value=therapy_prompt_value['value'] | |
) | |
autobio_prompt = gr.Textbox( | |
label="Autobiography Generation Prompt", | |
placeholder="Enter the instruction for autobiography generation.", | |
value=autobio_prompt_value['value'] | |
) | |
# update prompts when chatbot_type changes | |
chatbot_type_dropdown.change( | |
fn=update_prompts, | |
inputs=[chatbot_type_dropdown], | |
outputs=[system_prompt, conv_instruction_prompt, therapy_prompt, autobio_prompt] | |
) | |
with gr.Box(): | |
gr.Markdown("### User Information") | |
username_input = gr.Textbox( | |
label="Username", placeholder="Enter your username" | |
) | |
api_key_input = gr.Textbox( | |
label="OpenAI API Key", | |
placeholder="Enter your openai api key", | |
type="password" | |
) | |
initialize_button = gr.Button("Initialize", variant="primary", size="large") | |
initialization_status = gr.Textbox( | |
label="Status", interactive=False, placeholder="Initialization status will appear here." | |
) | |
initialize_button.click( | |
fn=set_initialize_button, | |
inputs=[api_key_input, chapter_dropdown, method_dropdown, username_input, | |
system_prompt, conv_instruction_prompt, therapy_prompt, autobio_prompt, chatbot_type_dropdown], | |
outputs=[initialization_status, api_key_state, chatbot_type_state], | |
) | |
# define the function to toggle prompts visibility | |
def toggle_prompts(visibility): | |
new_visibility = not visibility | |
button_text = "Hide Prompts" if new_visibility else "Show Prompts" | |
return gr.update(value=button_text), gr.update(visible=new_visibility), new_visibility | |
toggle_prompts_button.click( | |
fn=toggle_prompts, | |
inputs=[prompt_visibility_state], | |
outputs=[toggle_prompts_button, prompt_section, prompt_visibility_state] | |
) | |
with gr.Column(scale=3): | |
with gr.Row(): | |
timer_display = gr.Textbox(value="Time remaining: 08:00", label="") | |
start_button = gr.Button("Start Timer", elem_id="start_button") | |
start_button.click(start_timer, outputs=[is_running, target_timestamp]).then( | |
periodic_call, inputs=[is_running, target_timestamp], outputs=timer_display, every=1) | |
chatbot = gr.Chatbot(label="Chat here for autobiography generation", height=500) | |
with gr.Row(): | |
transcription_box = gr.Textbox( | |
label="Transcription (You can edit this)", lines=3 | |
) | |
audio_input = gr.Audio( | |
source="microphone", type="filepath", label="🎤 Record Audio" | |
) | |
with gr.Row(): | |
submit_button = gr.Button("Submit", variant="primary", size="large") | |
save_conversation_button = gr.Button("End and Save Conversation", variant="secondary") | |
download_button = gr.Button("Download Conversations", variant="secondary") | |
download_biography_button = gr.Button("Download Biography", variant="secondary") | |
memory_graph_table = gr.Dataframe( | |
headers=["Date", "Topic", "Description", "People Involved"], | |
datatype=["str", "str", "str", "str"], | |
interactive=False, | |
label="Memory Events", | |
max_rows=5 | |
) | |
biography_textbox = gr.Textbox(label="Autobiography", visible=False) | |
audio_input.change( | |
fn=start_recording, | |
inputs=[audio_input], | |
outputs=[transcription_box] | |
) | |
state = gr.State([]) | |
submit_button.click( | |
submit_text_and_respond, | |
inputs=[transcription_box, api_key_state, username_input, state, chatbot_type_state], | |
outputs=[chatbot, transcription_box, memory_graph_table] | |
) | |
download_button.click( | |
fn=download_conversations, | |
inputs=[username_input, chatbot_type_state], | |
outputs=gr.Files() | |
) | |
download_biography_button.click( | |
fn=download_biography, | |
inputs=[username_input, chatbot_type_state], | |
outputs=[gr.File(label="Biography.txt"), biography_textbox] | |
) | |
save_conversation_button.click( | |
fn=save_conversation, | |
inputs=[username_input, chatbot_type_state], | |
outputs=None | |
) | |
app.queue() | |
app.launch(share=True, max_threads=10) | |