# Import necessary libraries import openai import streamlit as st import time import csv from datetime import datetime from util import check_password import feedback_saver # Set your OpenAI Assistant ID here assistant_id = 'asst_t8r4OXz30mxH1vIUFVXAMoTH' instruction = """ ## Strictly do not give any response based on an assumption, and make sure the response should be from the documents provided or the knowledge base specific to ASAP Tickets and Trevolution ## Strictly avoid referring words like document, file, knowledge.pdf in the response. ## Add links from the knowledge and instructions if necessary to assist the customer. ## If you do not have clear information in the document of the question asked, respond with - **Unfortunately, I could not find a specific answer to your question at the moment as I am still learning. I apologize for any inconvenience. If you still need assistance, please contact our 24x7 customer service for personalized help:** - **Email: customerservice@asaptickets.com** - **USA & Canada: +1-800-750-2238** - **Philippines: +632-8672-7896** - **Spain: +34-911-881-607** - **Italy: +39-06-94809637** - **Israel: +972-3-7219252** - **UK: +44-203-695-0964** - **Let me know if there's anything else I can assist you with.** """ openai.api_key = st.secrets["API_KEY"] access_key = "*****" secret_access_key="****" bucket_name = "feedbackchatbot" object_key = "feedback_data.csv" # Initialize the OpenAI client (ensure to set your API key in the sidebar within the app) client = openai feedback_saver = feedback_saver.FeedbackSaverWithPandas(access_key, secret_access_key) # Assuming your OpenAI setup and other necessary imports are done earlier in the code def apply_custom_css(): st.markdown(""" """, unsafe_allow_html=True) apply_custom_css() # Check the password and stop the app if it's incorrect if not check_password(): st.stop() # Streamlit App Layout st.title("Customer Support Bot - ASAP Tickets") # Custom greeting or instructions st.markdown(""" Welcome to our Customer Support chatbot interface for ASAP Tickets! 👋 - Type your question or message below. - Use the thumbs up/down buttons to provide feedback on responses. """) # Initialize session state variables for file IDs and chat control if "file_id_list" not in st.session_state: st.session_state.file_id_list = [] if "start_chat" not in st.session_state: st.session_state.start_chat = False if "thread_id" not in st.session_state: st.session_state.thread_id = None # Ensure feedback-related session state variables are defined if "feedback_given" not in st.session_state: st.session_state.feedback_given = False if "feedback_question" not in st.session_state: st.session_state.feedback_question = "" if "feedback_response" not in st.session_state: st.session_state.feedback_response = "" if "feedback_type" not in st.session_state: st.session_state.feedback_type = "" st.session_state.start_chat = True # Initialize a session state variable for feedback button visibility if "show_feedback_buttons" not in st.session_state: st.session_state.show_feedback_buttons = False # Create a thread once and store its ID in session state thread = client.beta.threads.create() st.session_state.thread_id = thread.id #st.write("thread id: ", thread.id) # Define the function to process messages with citations def process_message_with_citations(message): """Extract content and annotations from the message and format citations as footnotes.""" message_content = message.content[0].text annotations = message_content.annotations if hasattr(message_content, 'annotations') else [] citations = [] # Iterate over the annotations and add footnotes for index, annotation in enumerate(annotations): # Replace the text with a footnote message_content.value = message_content.value.replace(annotation.text, f' [{index + 1}]') # Gather citations based on annotation attributes if (file_citation := getattr(annotation, 'file_citation', None)): cited_file = {'filename': 'knowledge.pdf'} # Actual file retrieval logic needed citations.append(f'[{index + 1}] {file_citation.quote} from {cited_file["filename"]}') elif (file_path := getattr(annotation, 'file_path', None)): cited_file = {'filename': 'knowledge.pdf'} # Actual file retrieval logic needed citations.append(f'[{index + 1}] Click [here](#) to download {cited_file["filename"]}') # Actual download link needed if citations: full_response = message_content.value + '\n\nREFERENCES FROM KNOWLEDGE BASE:\n\n' + '\n'.join(citations) # Add footnotes to the end of the message content else: full_response = message_content.value return full_response def openai_api_calculate_cost(usage,model="gpt-4-0125-preview"): pricing = { 'gpt-3.5-turbo-1106': { 'prompt': 0.001, 'completion': 0.002, }, 'gpt-4-0125-preview': { 'prompt': 0.01, 'completion': 0.03, }, 'gpt-4': { 'prompt': 0.03, 'completion': 0.06, } } try: model_pricing = pricing[model] except KeyError: raise ValueError("Invalid model specified") prompt_cost = usage['prompt_tokens'] * model_pricing['prompt'] / 1000 completion_cost = usage['completion_tokens'] * model_pricing['completion'] / 1000 total_cost = prompt_cost + completion_cost # round to 6 decimals total_cost = round(total_cost, 6) #print(f"\nTokens used: {usage['prompt_tokens']:,} prompt + {usage['completion_tokens']:,} completion = {usage['total_tokens']:,} tokens") #print(f"Total cost for {model}: ${total_cost:.4f}\n") return total_cost def handle_feedback_positive(): st.session_state.feedback_question = prompt st.session_state.feedback_response = full_responses st.session_state.feedback_type = "Positive" feedback_saver.save_feedback_to_s3(prompt, full_responses, "Positive", bucket_name, object_key) def handle_feedback_negative(): st.session_state.feedback_question = prompt st.session_state.feedback_response = full_responses st.session_state.feedback_type = "Negative" feedback_saver.save_feedback_to_s3(prompt, full_responses, "Negative", bucket_name, object_key) # Additional function to save feedback # Additional function to save feedback into a CSV file def save_feedback(question, response, feedback): filename = "feedback_data.csv" # Specify your desired filename # Check if the file exists and determine if headers are needed try: with open(filename, "r") as csvfile: # Try to read the header reader = csv.reader(csvfile, delimiter='~') headers = next(reader, None) headers_needed = not bool(headers) except FileNotFoundError: headers_needed = True with open(filename, "a", newline='') as csvfile: writer = csv.writer(csvfile, delimiter='~') # If the file is new or empty, write the header if headers_needed: writer.writerow(["Index", "Date", "Question", "Response", "Feedback"]) # Write the feedback data # Fetch the last index if file is not empty/new and increment, else start from 1 last_index = 0 if not headers_needed: with open(filename, "r", newline='') as f: last_line = list(csv.reader(f, delimiter='~'))[-1] if last_line: # Check if the last line is not empty last_index = int(last_line[0]) index = last_index + 1 current_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S") writer.writerow([index, current_date, question, response, feedback]) # Only show the chat interface if the chat has been started if st.session_state.start_chat: # Initialize the model and messages list if not already in session state if "openai_model" not in st.session_state: st.session_state.openai_model = "gpt-4-0125-preview" if "messages" not in st.session_state: st.session_state.messages = [] # Display existing messages in the chat for message in st.session_state.messages: with st.chat_message(message["role"]): st.markdown(message["content"]) # Chat input for the user if prompt := st.chat_input("Hi! I'm Olivia Trevolution's assistant. How can I help you today?"): # Add user message to the state and display it st.session_state.messages.append({"role": "user", "content": prompt}) with st.chat_message("user"): st.session_state.show_feedback_buttons = True st.markdown(prompt) start_time = time.time() thinking_message = st.empty() thinking_message.text("Processing...") # Add the user's message to the existing thread client.beta.threads.messages.create( thread_id=st.session_state.thread_id, role="user", content=prompt ) # Create a run with additional instructions run = client.beta.threads.runs.create( thread_id=st.session_state.thread_id, assistant_id=assistant_id, instructions= instruction ) # Initialize a timeout mechanism timeout = 60 # seconds response_received = False # Poll for the run to complete with timeout while time.time() - start_time <= timeout: # Poll for the run to complete and retrieve the assistant's messages run = client.beta.threads.runs.retrieve( thread_id=st.session_state.thread_id, run_id=run.id ) if run.status == 'completed': response_received = True break time.sleep(1) if response_received: # Retrieve messages added by the assistant messages = client.beta.threads.messages.list( thread_id=st.session_state.thread_id ) # Process and display assistant messages assistant_messages_for_run = [ message for message in messages if message.run_id == run.id and message.role == "assistant" ] with st.chat_message("assistant"): # Display thumbs up and thumbs down buttons for feedback message_placeholder = st.empty() full_responses = '' # Calculate the time taken end_time = time.time() time_taken = end_time - start_time for message in assistant_messages_for_run: full_response = process_message_with_citations(message) full_responses += (f"Response received in {time_taken:.2f} seconds. \n") usage = client.beta.threads.runs.list(thread_id=st.session_state.thread_id).data[0].usage #cost = openai_api_calculate_cost(usage, ) full_responses += "\t \t"+f"\nTokens used: {usage['prompt_tokens']:,} prompt + {usage['completion_tokens']:,} completion = {usage['total_tokens']:,} tokens\n" full_responses += '\n \n' #full_responses += "\t \t"+f"Total cost for {st.session_state.openai_model}: ${cost:.4f}\n" full_responses += '\n \n' for response in full_response: full_responses += response message_placeholder.markdown(full_responses + "▌", unsafe_allow_html=True) time.sleep(0.001) message_placeholder.empty() thinking_message.empty() message_placeholder.markdown(full_responses, unsafe_allow_html=True) st.session_state.show_feedback_buttons = True # Before your main app loop if "feedback_given" not in st.session_state: st.session_state.feedback_given = None st.session_state.feedback_type = "" if st.session_state.show_feedback_buttons: col1, col2 = st.columns(2) with col1: thumbs_up = st.button("👍", key="thumbs_up", on_click=handle_feedback_positive) if thumbs_up: handle_feedback_positive() st.session_state.show_feedback_buttons = False with col2: thumbs_down = st.button("👎", key="thumbs_down", on_click=handle_feedback_negative) if thumbs_down: handle_feedback_negative() st.session_state.show_feedback_buttons = False st.session_state.messages.append({"role": "assistant", "content": full_responses}) else: # Fallback message if response takes more than 60 seconds fallback_message = """ Sorry, it's taking longer than expected to get a response. Please try again later or contact customer service for immediate assistance.

Unfortunately, I could not find a specific answer to your question at the moment as I am still learning. I apologize for any inconvenience.

If you still need assistance, please contact our 24x7 customer service for personalized help:
- Email: customerservice@asaptickets.com
- USA & Canada: +1-800-750-2238
- Philippines: +632-8672-7896
- Spain: +34-911-881-607
- Italy: +39-06-94809637
- Israel: +972-3-7219252
- UK: +44-203-695-0964

Let me know if there's anything else I can assist you with. """ thinking_message.empty() # Clear the "Thinking..." message st.markdown(fallback_message, unsafe_allow_html=True)