david-oplatka's picture
Reorganize Files
757fbed
raw
history blame
25.8 kB
import asyncio
import threading
from markdown_it import MarkdownIt
from mdit_py_plugins import front_matter
from reactpy import component, html, hooks, use_effect, svg, run # DELETE run FOR DEPLOYMENT
from reactpy.backend.starlette import configure
from starlette.applications import Starlette
from vectara_agentic.agent import AgentStatusType
from agent import initialize_agent, get_agent_config
wait_message = "Please wait. Assistant at work..."
@component
def App():
# Used for showing/hiding logs
show_logs, set_show_logs = hooks.use_state(False)
# Determining when the "Show Logs" button should appear
show_logs_button, set_show_logs_button = hooks.use_state(False)
# Used for resetting chat bot with "Start Over" button.
first_turn, set_first_turn = hooks.use_state(True)
# Tracks the state of whether the header is collapsed or exapnded
more_info, set_more_info = hooks.use_state(False)
# Tracks the state of whether the footer is collapsed or expanded
collapsed, set_collapse = hooks.use_state(False)
# Record of Chat Messages
messages, set_messages = hooks.use_state([])
message, set_message = hooks.use_state("")
# input_value, set_input_value = hooks.use_state("")
def use_agent_logger():
agent_log_entries, set_agent_log_entries = hooks.use_state([])
def reset_log_entries():
set_agent_log_entries([])
def add_log_entry(new_log_entry):
set_agent_log_entries(lambda previous_entries: previous_entries + [new_log_entry])
return agent_log_entries, add_log_entry, reset_log_entries
log_entries, add_log_entry, reset_log_entries = use_agent_logger()
def update_func(status_type: AgentStatusType, msg: str):
if status_type != AgentStatusType.AGENT_UPDATE:
output = f"{status_type.value} - {msg}"
add_log_entry(output)
cfg, _ = hooks.use_state(get_agent_config())
agent, _ = hooks.use_state(initialize_agent(cfg, update_func))
def toggle_header(event=None):
set_more_info(not more_info)
def toggle_footer(event=None):
set_collapse(not collapsed)
# Clears all messages and resets chat interface
def start_over(event=None):
set_messages([])
set_first_turn(True)
def display_message(new_messages):
if first_turn:
set_first_turn(False)
set_messages(messages + list(new_messages))
async def chat_response(user_message):
response = await asyncio.to_thread(agent.chat, user_message)
return response
async def send_message_async(sent_message):
response = await chat_response(sent_message)
set_messages(messages[:-1])
display_message(
[
{"user": "human", "message": sent_message},
{"user": "bot", "message": response},
]
)
set_show_logs_button(True)
def send_message(event=None):
print(f"DEBUG: SEND MESSAGE CALLED FOR MESSAGE {message}")
if message.strip():
sent_message = message
set_message("")
# set_input_value("")
set_show_logs_button(False)
set_show_logs(False)
reset_log_entries()
display_message([{"user": "human", "message": sent_message}])
display_message([{"user": "bot", "message": wait_message}])
asyncio.create_task(send_message_async(sent_message))
def send_example(ex_prompt):
if ex_prompt.strip():
sent_message = ex_prompt
set_message("")
reset_log_entries()
display_message([{"user": "human", "message": sent_message}])
display_message([{"user": "bot", "message": wait_message}])
asyncio.create_task(send_message_async(sent_message))
# # New Code Suggestion from Claude
# def handle_input(event):
# set_message(event['target']['value'])
# def handle_key_down(event):
# if event['key'] == 'Enter':
# send_message()
# def handle_input(event):
# new_value = event['target']['value']
# set_input_value(new_value)
# def handle_key_down(event):
# if event['key'] == 'Enter':
# send_message()
# use_effect(lambda: set_message(input_value), [input_value])
async def handle_key_down(event):
if event['key'] == 'Enter' and message.strip():
await send_message()
set_message("")
def handle_input_change(event):
set_message(event['target']['value'])
@component
def Header(demo_name: str, short_description: str, extra_info: str):
return html.header(
{
"style": {
"backgroundColor": "#FFFFFF",
"display": "flex",
"justifyContent": "space-between",
"alignItems": "center",
}
},
html.div(
{
"style": {
"display": "flex",
"alignItems": "center",
"flex": 2,
"textAlign": "left",
"padding": "10px",
}
},
html.img(
{
"src": "https://avatars.githubusercontent.com/u/108304503?s=200&v=4",
"style": {
"height": "30px",
"marginRight": "15px",
"marginLeft": "5px",
"transform": "translateY(-2px)",
}
}
),
html.p(
{
"style": {
"fontSize": "25px",
"fontFamily": "Georgia, 'Times New Roman', Times, serif",
}
},
f"{demo_name}"
),
),
html.div(
{
"style": {
"flex": 5,
"textAlign": "center",
"padding": "10px 0px 15px 0px",
"fontFamily": "Lato",
"position": "relative",
}
},
html.h3(f"Welcome to the {demo_name} Demo"),
html.p(
short_description,
html.button(
{
"style": {
"display": "inline" if not more_info else "none",
"backgroundColor": "#FFFFFF",
"color": "#757575",
"fontSize": "13px",
"cursor": "pointer",
"border": "none",
"padding": "0px 0px 0px 5px",
},
"type": "button",
"onClick": toggle_header,
},
html.u(
{
"style": {
"flex": 2,
"textAlign": "right",
"padding": "10px",
}
},
"Learn More"
)
),
f" {extra_info}" if more_info else ""
),
html.button(
{
"style": {
"display": "block" if more_info else "none",
"background": "none",
"border": "none",
"color": "#757575",
"cursor": "pointer",
"position": "absolute",
"left": "50%",
"transform": "translateX(-50%)",
"bottom": "1px",
},
"type": "button",
"on_click": toggle_header,
},
svg.svg(
{
"width": "20",
"height": "20",
"viewBox": "0 0 20 20",
"fill": "none",
"stroke": "black",
"strokeWidth": "2",
"strokeLinecap": "round",
"strokeLinejoin": "round",
},
svg.path(
{
"d": "M12 19V5M5 12l7-7 7 7",
"stroke": "currentColor",
}
)
)
)
),
html.div(
{
"style": {
"flex": 2,
"textAlign": "right",
"padding": "10px",
}
},
html.button(
{
"style": {
"backgroundColor": "#FFFFFF",
"color": "#757575",
"fontSize": "14px",
"cursor": "pointer",
"border": "1px solid #e2dfdf",
"borderRadius": "5px",
"padding": "6px 20px",
"marginRight": "15px",
},
"type": "button",
"onClick": start_over,
},
"Start Over?"
)
)
)
def markdown_to_html(markdown_text):
md = (
MarkdownIt("commonmark", {"breaks": True, "html": True})
.use(front_matter.front_matter_plugin)
)
return md.render(markdown_text)
@component
def MarkdownRenderer(content):
html_content = markdown_to_html(content)
return html.div(
{
"style": {
"fontFamily": "Arial",
"color": "#49454F",
"fontSize": "14px",
},
"dangerouslySetInnerHTML": {"__html": html_content}
}
)
@component
def ExamplePrompts():
example_questions = [example.strip() for example in cfg['examples'].split(";")] if cfg.examples else []
def create_prompt_button(question):
return html.button(
{
"style": {
"backgroundColor": "#FFFFFF",
"borderWidth": "1px",
"borderColor": "#65558F",
"borderRadius": "20px",
"padding": "5px 7px",
"margin": "5px",
"cursor": "pointer",
"color": "#21005D",
"fontSize": "13px",
},
"type": "button",
"onClick": lambda _: send_example(question),
},
question
)
if first_turn:
return html.div(
{
"style": {
"display": "flex",
"transform": "translate(4%, 100%)",
"flexDirection": "column",
"justifyContent": "center",
"width": "90%",
}
},
html.p(
{
"style": {
"fontSize": "16px",
"fontFamily": "Arial",
"color": "#49454F",
"marginBottom": "5px",
"transform": "translateX(3%)",
"textAlign": "left",
}
},
"Queries to try:"
),
html.div(
{
"style": {
"display": "flex",
"flexWrap": "wrap",
"justifyContent": "center",
}
},
*[create_prompt_button(q) for q in example_questions]
)
)
return None
@component
def ChatBox():
return html.div(
{
"style": {
"position": "fixed",
"bottom": "70px" if collapsed else "140px",
"left": "50%",
"transform": "translateX(-50%)",
"width": "80%",
"display": "flex",
"alignItems": "center",
"backgroundColor": "#FFFFFF",
"borderRadius": "50px",
"zIndex": "1000",
}
},
html.input(
{
"type": "text",
"value": message,
"placeholder": "Your Message",
"onChange": handle_input_change,
"onKeyDown": handle_key_down,
"style": {
"width": "100%",
"padding": "10px 50px 10px 20px",
"border": "none",
"borderRadius": "50px",
"color": "#65558F",
}
}
),
html.button(
{
"type": "button",
"onClick": send_message,
"style": {
"position": "absolute",
"right": "8px",
"top": "50%",
"transform":"translateY(-50%)",
"background": "none",
"border": "none",
"cursor": "pointer",
"padding": "8px",
"display": "flex",
"alignItems": "center",
"justifyContent": "center",
}
},
svg.svg(
{
"xmlns": "http://www.w3.org/2000/svg",
"width": "20",
"height": "20",
"fill": "none",
"stroke": "currentColor",
"stroke-linecap": "round",
"stroke-linejoin": "round",
"stroke-width": "2",
"viewBox": "0 0 24 24",
"class": "feather feather-send",
},
svg.path({"d": "M22 2 11 13"}),
svg.path({"d": "M22 2 15 22 11 13 2 9z"})
)
)
)
@component
def ChatMessage(user, message):
return html.div(
{
"style": {
"display": "flex",
"width": "75%",
"transform": "translateX(20%)",
"justifyContent": "flex-end" if user == "human" else "flex-start",
"margin": "20px 0px 10px 0px"
}
},
html.div(
{
"style": {
"maxWidth": "50%",
"padding": "0px 15px 0px 10px",
"borderRadius": "15px",
"backgroundColor": "#E8DEF8" if user == "human" else "#ECE6F0",
"color": "#000000",
}
},
MarkdownRenderer(message)
)
)
@component
def Logs():
if (len(messages) > 0) and (len(log_entries) > 0) and (messages[-1]["message"] != wait_message) and (show_logs_button):
if not show_logs:
return html.div(
{
"style": {
"display": "flex",
"transform": "translateX(75%)",
"width": "20%",
"margin": "0px",
}
},
html.button(
{
"style": {
"position": "flex",
"background": "none",
"color": "#757575",
"border": "none",
"cursor": "pointer",
"fontFamily": "Arial",
"fontSize": "14px",
},
"type": "button",
"onClick": lambda _: set_show_logs(True)
},
"Show Logs"
)
)
else:
return html.div(
{
"style": {
"display": "flex",
"transform": "translateX(20%)",
"border": "2px solid #e2dfdf",
"borderRadius": "10px",
"width": "75%",
}
},
html.div(
[
html.p(
{
"style": {
"fontSize": "14px",
"marginLeft": "10px",
}
},
entry
) for entry in log_entries
],
html.button(
{
"style": {
"position": "flex",
"background": "none",
"color": "#757575",
"border": "none",
"cursor": "pointer",
"fontFamily": "Arial",
"fontSize": "14px",
"marginBottom": "10px",
"marginLeft": "5px",
},
"type": "button",
"onClick": lambda _: set_show_logs(False)
},
"Hide Logs"
)
)
)
return None
@component
def ChatUI():
def render_chat_content():
if first_turn:
return ExamplePrompts()
else:
return [ChatMessage(msg["user"], msg["message"]) for msg in messages]
return html.div(
{
"style": {
"display": "flex",
"flexDirection": "column",
"padding": "10px",
"backgroundColor": "#F4F1F4",
"overflowY": "auto",
"height": f"calc(100vh - {265 if collapsed else 335}px)",
"marginBottom": "15px",
"paddingBottom": "15px",
},
},
render_chat_content(),
Logs()
)
@component
def Footer():
if collapsed:
return html.footer(
{
"style": {
"backgroundColor": "#FFFFFF",
"position": "fixed",
"bottom": 0,
"width": "100%",
"height": "50px",
}
},
html.button(
{
"style": {
"background": "none",
"border": "none",
"color": "#757575",
"cursor": "pointer",
"position": "absolute",
"bottom": "5px",
"right": "10px",
},
"type": "button",
"on_click": toggle_footer,
},
svg.svg(
{
"xmlns": "http://www.w3.org/2000/svg",
"width": "24",
"height": "24",
"fill": "none",
"stroke": "currentColor",
"stroke-linecap": "round",
"stroke-linejoin": "round",
"stroke-width": "3",
"viewBox": "0 0 24 24",
},
svg.path({"d": "M19 14l-7-7-7 7"})
)
)
)
else:
return html.footer(
{
"style": {
"backgroundColor": "#FFFFFF",
"position": "fixed",
"bottom": 0,
"width": "100%",
}
},
html.div(
{
"style": {
"backgroundColor": "#FFFFFF",
"padding": "0px 20px 0px 50px",
"position": "relative",
"display": "block",
}
},
html.button(
{
"style": {
"position": "absolute",
"right": "10px",
"background": "none",
"border": "none",
"color": "#757575",
"cursor": "pointer",
},
"type": "button",
"on_click": toggle_footer,
},
svg.svg(
{
"xmlns": "http://www.w3.org/2000/svg",
"width": "24",
"height": "24",
"fill": "none",
"stroke": "currentColor",
"stroke-linecap": "round",
"stroke-linejoin": "round",
"stroke-width": "3",
"viewBox": "0 0 24 24",
},
svg.path({"d": "M18 6L6 18"}),
svg.path({"d": "M6 6l12 12"})
)
),
html.p(
{
"style": {
"fontSize": "20px",
"color": "#4b4851"
}
},
"How this works?",
),
html.p(
{
"style": {
"color": "#757575",
}
},
"This app was built with ",
html.a(
{
"href": "https://vectara.com/",
"target": "_blank",
},
"Vectara",
),
html.br(),
"It demonstrates the use of Agentic-RAG functionality with Vectara",
)
)
)
return html.div(
{
"style": {
"backgroundColor": "#F4F1F4",
"margin": "0",
"padding": "0",
"minHeight": "100vh",
"display": "flex",
"boxSizing": "border-box",
"flexDirection": "column",
"overflowX": "hidden",
"position": "relative",
}
},
html.head(
html.style("""
body {
margin: 0;
padding: 0;
}
""")
),
Header(
demo_name=cfg['demo_name'],
short_description=cfg['short_description'],
extra_info=cfg['extra_info']
),
ChatUI(),
ChatBox(),
Footer(),
)
# app = Starlette()
# configure(app, App)
# Should only need for local testing
if __name__ == "__main__":
run(App)