from typing import List import openai import gradio as gr import pandas as pd import tiktoken import re from openai.error import AuthenticationError encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") pd.set_option('display.max_columns', None) # Display all columns pd.set_option('display.width', None) # Automatically adjust the width to fit the content class Bot(): def __init__(self): self.ger_system_prompt = None self.eng_system_prompt = None self.api_key = None self.lang = "English" self.system_prompt = "" self.change_language(self.lang, []) self.df = None self.available_car_models = None self.set_prompts() self.columns_in_response = ["marke", "modell", "kategorie", "kilometer", "preis", "grundfarbe"] self.columns_in_response_eng = ["brand", "model", "category", "kilometer", "price", "color"] def set_prompts(self): self.eng_system_prompt = f"""You are a virtual car salesman salesman assistant for an online German car dealership. You only recommend cars that are currently in your inventory. To access your inventory return dataframe query method like this: ``` df.query("brand.str.contains('ford')") ``` The dataframe has the following columns: ["brand", "model", "category", "kilometer", "price"], with all values in lowercase. The available car categories are: limousine, offroad, estatecar, smallcar, othercar, and carbo. The available car brands are: {self.available_car_models}. To search for specific car information, use the contains() method instead of equals(). Do not say that any models in your inventory or that any models are available, show the query instead. Do not ask the user if you need to check the inventory, show them how to query the dataframe. Users should not know that you are using a dataframe. They should think that you are using a database. If you need to check the inventory to answer the user's question, only show the query and nothing else.""" self.ger_system_prompt = f"""Du bist ein virtueller Autoverkäufer für einen deutschen Online-Autohändler. Du empfiehlst nur Autos, die derzeit in deinem Inventar sind. Um auf dein Inventar zuzugreifen, gibst du die Datenbankabfrage-Methode wie folgt ein: ``` df.query("marke.str.contains('ford')") ``` Die Datenbank hat die folgenden Spalten: ["marke", "modell", "kategorie", "kilometer", "preis"], mit allen Werten in Kleinbuchstaben. Die verfügbaren Autokategorien sind: limousine, offroad, estatecar, smallcar, othercar und carbo. Die verfügbaren Automarken sind: {self.available_car_models}. Um nach bestimmten Fahrzeuginformationen zu suchen, verwende die contains() -Methode anstelle der equals() -Methode. Sage nicht, dass irgendeine Modelle in deinem Inventar sind oder dass irgendeine Modelle verfügbar sind, zeige die Abfrage an. Frag den Benutzer nicht, ob du das Inventar überprüfen musst, zeige ihm, wie du die Datenbankabfrage durchführst. Benutzer sollten nicht wissen, dass du eine Datenbank verwendest. Sie sollten denken, dass du eine Datenbank verwendest. Wenn du das Inventar überprüfen musst, um die Frage des Benutzers zu beantworten, zeige nur die Abfrage und nichts anderes an.""" def change_language(self, lang, history): self.lang = lang if lang in ["English", "German"] else "English" if self.lang == "English": self.system_prompt = self.eng_system_prompt if not self.api_key: history = [(None, "Please enter your OpenAI API KEY")] return history if self.df is None: history = [(None, "Please upload a csv file")] return history else: self.system_prompt = self.ger_system_prompt if not self.api_key: history = [(None, "Bitte geben Sie Ihren OpenAI API-Schlüssel ein")] return history if self.df is None: history = [(None, "Bitte laden Sie eine csv-Datei hoch")] return history return [] def process_file(self, file, history): # check if file is a string filename = file if not isinstance(file, str): # throw error if file is not csv if file.name.split(".")[-1] != "csv": if lang == "English": history.append((None, "Please upload a csv file")) else: history.append((None, "Bitte laden Sie eine csv-Datei hoch")) return history else: filename = file.name df = pd.read_csv(filename) # drop all rows with at least one NaN value # only use columns marke modell kategorie kilomeret preis df = df[self.columns_in_response + ["interne_nummer", "Modellbasis"]] df = self._construct_urls(df) df = df.rename(columns={"modell2": "modell"}) df = df.dropna() # convert price and kilometer to int df["preis"] = df["preis"].astype(int) df["kilometer"] = df["kilometer"].astype(int) # make marke modell kategorie lowercase df["marke"] = df["marke"].str.lower().str.strip() df["modell"] = df["modell"].str.lower().str.strip() df["grundfarbe"] = df["grundfarbe"].str.lower().str.strip() df["kategorie"] = df["kategorie"].str.lower().str.replace("car.", "").str.strip() # remove duplicates df = df.drop_duplicates() self.available_car_models = str(df["marke"].unique()).replace('\n', "") print(self.available_car_models) if self.lang == "English": df = df.rename( columns={"marke": "brand", "modell": "model", "kategorie": "category", "kilometer": "kilometer", "preis": "price", "grundfarbe": "color"}) df = df.loc[:, ~df.columns.duplicated()] self.df = df self.set_prompts() self.change_language(self.lang, history) if self.api_key: history = [] return history def _construct_urls(self, df): for index, row in df.iterrows(): # check if all values are not na if row.isna().any(): continue marke = row['marke'] modellbasis = row['Modellbasis'] interne_nummer = row['interne_nummer'] base_url = f"https://www.hugopfohe.de/gebrauchtwagen/details/" df.at[index, 'url'] = f"{base_url}{marke}_{modellbasis}_{interne_nummer}/?&etcc_cmp=consultant" return df def check_requirements(self, query, history): if query.startswith("sk-"): self.api_key = query.strip("\n").strip() openai.api_key = query.strip("\n").strip() return self.process_file("./AI Car-Test - complete data.csv", []), False if not self.api_key: if self.lang == "English": history.append((None, "Please enter your OpenAI API KEY")) else: history.append((None, "Bitte geben Sie Ihren OpenAI API-Schlüssel ein")) return history, False if self.df is None: return self.process_file("./AI Car-Test - complete data.csv", []), False if self.lang == "English": history.append((None, "Please upload a csv file. (This might take a few minutes)")) else: history.append((None, "Bitte laden Sie eine csv-Datei hoch. (Dies kann einige Minuten dauern)")) return history, False return history, True def run_text(self, history, query): history, passed = self.check_requirements(query, history) if not passed: return history messages = [] # add messages from history for q, response in history: if q: messages.append({"role": "user", "content": q}) if response: messages.append({"role": "assistant", "content": response}) # add system message 1 message before last messages.append({"role": "system", "content": self.system_prompt}) # remove all messages in front of for the first one # if the total number of tokens is greater than 3000 total_tokens = self._count_tokens(messages) while total_tokens > 1200: messages.pop(1) total_tokens = self._count_tokens(messages) messages.append({"role": "user", "content": query}) # print(prompt) print(messages) # generate response response = openai.ChatCompletion.create( model="gpt-3.5-turbo", # change to gpt-3.5-turbo if don't have access messages=messages, temperature=0.0, )['choices'][0]['message']['content'] if "df.query" in response: response = self.query_db(response, messages) # only add query and response to history # the context is not needed history.append((query, response)) return history def query_db(self, response, messages): # remove first before last message (previous system prompt) messages.pop(-2) # extract line that contains the query for line in response.split("\n"): if "df.query" in line: query_line = line.strip() break result = eval("self." + query_line) print(query_line) if len(result) == 0: if self.lang == "English": messages.insert(-1, {"role": "system", "content": "You are a virtual car salesman assistant in an online car dealership. There are no results this query. Help user adjust their preferences or recommend other brands."}) else: messages.insert(-1, {"role": "system", "content": "Sie sind ein virtueller Autohändler-Assistent in einem Online-Autohaus. Keine Ergebnisse für diese Abfrage. Benutzer bei der Anpassung ihrer Präferenzen oder Empfehlung anderer Marken unterstützen"}) else: # check if result is a dataframe if isinstance(result, pd.DataFrame): if len(result) > 5: result = result.sample(n=5) if self.lang == "English": result = result[self.columns_in_response_eng + ['url']] else: result = result[self.columns_in_response + ['url']] result = result.to_string(index=False).replace("
", "") elif isinstance(result, pd.Series): result = result.to_string(index=False).replace("
", "") elif isinstance(result, List): result = ", ".join(result) else: result = str(result) print(result) # add message to history 1 before last if self.lang == "English": messages.insert(-1, {"role": "system", "content": f"You are a virtual car salesman salesman assistant for an online car dealership. You only recommend cars that are currently in stock. Advice cars from this list that are currently in stock:\n\n{result}\n\n" + "Include links to cars in your recommendation from the url column."}) else: messages.insert(-1, {"role": "system", "content": f"Sie sind ein virtueller Autohändler-Assistent in einem Online-Autohaus. Sie empfehlen nur Autos, die derzeit auf Lager sind. Beraten Sie Autos aus dieser Liste, die derzeit auf Lager sind:\n\n{result}\n\n" + "Fügen Sie Links zu Autos in Ihrer Empfehlung aus der Spalte url hinzu."}) response = openai.ChatCompletion.create( model="gpt-3.5-turbo", messages=messages, temperature=0.1, )['choices'][0]['message']['content'] print(response) response = response.replace("
", "") # Use a regular expression to replace more than two consecutive empty lines with just two empty lines response = re.sub(r'\n{3,}', '\n\n', response) return response def _count_tokens(self, messages): total_tokens = 0 for message in messages: total_tokens += len(encoding.encode(message['content'])) return total_tokens if __name__ == '__main__': bot = Bot() # ui with gr.Blocks(css="#chatbot .overflow-y-auto{height:500px}") as demo: chatbot = gr.Chatbot([(None, "Please enter your OpenAI API KEY")], elem_id="chatbot", label="HUGO PFOHE - AI Car Salesman").style(height=500) with gr.Row(): with gr.Column(scale=1): txt = gr.Textbox(show_label=False, placeholder="What car would you like to buy?").style( container=False) # with gr.Column(scale=0.15): # submit = gr.Button("Submit") with gr.Row(): with gr.Column(scale=0.34, min_width=0): lang = gr.inputs.Dropdown(choices=['English', 'German'], label="Language", default=bot.lang) # with gr.Column(scale=0.33, min_width=0): # clear = gr.Button("Clear") # with gr.Column(scale=0.33, min_width=0): # btn = gr.UploadButton("Upload CSV", file_types=["csv"]) with gr.Column(scale=0.66, min_width=0): with gr.Row(): clear = gr.Button("Clear") with gr.Row(): btn = gr.UploadButton("Upload CSV", file_types=["csv"]) txt.submit(bot.run_text, [chatbot, txt], chatbot) txt.submit(lambda: "", None, txt) lang.change(bot.change_language, [lang, chatbot], chatbot) # submit.click(txt.submit, [chatbot, txt], chatbot) # submit.click(lambda: "", None, txt) clear.click(lambda: [], None, chatbot) btn.upload(bot.process_file, [btn, chatbot], chatbot) demo.launch()