|
import os |
|
import streamlit as st |
|
import time |
|
import json |
|
from datetime import datetime |
|
from openai import OpenAI |
|
|
|
|
|
st.set_page_config(page_title="Ctrack AI Assistant", layout="wide") |
|
|
|
|
|
AUTHORIZED_USERS = { |
|
"andrew@lortechnologies.com": "Pass.123", |
|
"rian@lortechnologies.com": "Pass.123", |
|
"test@test.com": "Pass.123" |
|
} |
|
|
|
if "authenticated" not in st.session_state: |
|
st.session_state["authenticated"] = False |
|
|
|
def login(): |
|
email = st.session_state.get("email", "") |
|
password = st.session_state.get("password", "") |
|
if email in AUTHORIZED_USERS and AUTHORIZED_USERS[email] == password: |
|
st.session_state["authenticated"] = True |
|
else: |
|
st.error("Invalid email or password. Please try again.") |
|
|
|
if not st.session_state["authenticated"]: |
|
st.title("Sign In") |
|
st.text_input("Email", key="email") |
|
st.text_input("Password", type="password", key="password") |
|
st.button("Login", on_click=login) |
|
st.stop() |
|
|
|
|
|
st.title("🚚 Ctrack Virtual Assistant") |
|
st.caption("Official AI assistant for Ctrack’s fleet management solutions") |
|
|
|
|
|
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY") |
|
CTRACK_ASSISTANT_ID = "asst_ItfaIt2UF2nN1UpOS5xa6l5p" |
|
|
|
if not OPENAI_API_KEY: |
|
st.error("Missing OPENAI_API_KEY environment variable.") |
|
st.stop() |
|
|
|
|
|
main_tab, flagged_tab = st.tabs(["Ctrack Assistant", "Flagged Responses"]) |
|
|
|
|
|
FLAGGED_RESPONSES_DIR = "flagged_responses" |
|
os.makedirs(FLAGGED_RESPONSES_DIR, exist_ok=True) |
|
|
|
def save_flagged_response(user_query, ai_response): |
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
|
filename = f"{FLAGGED_RESPONSES_DIR}/flagged_{timestamp}.json" |
|
flagged_data = { |
|
"query": user_query, |
|
"response": ai_response, |
|
"timestamp": datetime.now().isoformat() |
|
} |
|
with open(filename, "w") as file: |
|
json.dump(flagged_data, file, indent=4) |
|
st.success(f"Response flagged and saved as {filename}.") |
|
|
|
def ctrack_chat(tab): |
|
with tab: |
|
st.subheader("🤖 Chat with Ctrack AI") |
|
client = OpenAI(api_key=OPENAI_API_KEY) |
|
session_key = "ctrack_chat" |
|
|
|
if session_key not in st.session_state: |
|
st.session_state[session_key] = [] |
|
|
|
if st.button("Clear Chat", key="clear_chat"): |
|
st.session_state[session_key] = [] |
|
st.rerun() |
|
|
|
for idx, msg in enumerate(st.session_state[session_key]): |
|
role, content = msg["role"], msg["content"] |
|
st.chat_message(role).write(content) |
|
if role == "assistant" and st.button("🚩 Flag", key=f"flag_{idx}"): |
|
user_query = st.session_state[session_key][idx-1]["content"] if idx > 0 else "Unknown" |
|
save_flagged_response(user_query, content) |
|
|
|
if prompt := st.chat_input("Ask about Ctrack's services, fleet solutions, or integrations"): |
|
st.session_state[session_key].append({"role": "user", "content": prompt}) |
|
st.chat_message("user").write(prompt) |
|
|
|
try: |
|
thread = client.beta.threads.create() |
|
thread_id = thread.id |
|
|
|
client.beta.threads.messages.create( |
|
thread_id=thread_id, |
|
role="user", |
|
content=prompt |
|
) |
|
|
|
run = client.beta.threads.runs.create( |
|
thread_id=thread_id, |
|
assistant_id=CTRACK_ASSISTANT_ID |
|
) |
|
|
|
while True: |
|
status = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run.id) |
|
if status.status == "completed": |
|
break |
|
time.sleep(1) |
|
|
|
messages = client.beta.threads.messages.list(thread_id=thread_id) |
|
response = messages.data[0].content[0].text.value |
|
st.session_state[session_key].append({"role": "assistant", "content": response}) |
|
st.chat_message("assistant").write(response) |
|
if st.button("🚩 Flag", key=f"flag_response_{len(st.session_state[session_key])}"): |
|
save_flagged_response(prompt, response) |
|
|
|
except Exception as e: |
|
st.error(f"OpenAI Error: {str(e)}") |
|
|
|
ctrack_chat(main_tab) |
|
|
|
|
|
with flagged_tab: |
|
st.subheader("🚩 Flagged Responses") |
|
files = [f for f in os.listdir(FLAGGED_RESPONSES_DIR) if f.endswith(".json")] |
|
|
|
if files: |
|
selected = st.selectbox("Select a flagged response to download:", files) |
|
if selected: |
|
with open(os.path.join(FLAGGED_RESPONSES_DIR, selected), "r") as file: |
|
flagged = file.read() |
|
st.download_button("📥 Download", data=flagged, file_name=selected, mime="application/json") |
|
else: |
|
st.info("No flagged responses yet.") |
|
|