Spaces:
Runtime error
Runtime error
from typing import List | |
import openai | |
import gradio as gr | |
import pandas as pd | |
import tiktoken | |
import re | |
from openai.error import AuthenticationError | |
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo") | |
pd.set_option('display.max_columns', None) # Display all columns | |
pd.set_option('display.width', None) # Automatically adjust the width to fit the content | |
class Bot(): | |
def __init__(self): | |
self.ger_system_prompt = None | |
self.eng_system_prompt = None | |
self.api_key = None | |
self.lang = "English" | |
self.system_prompt = "" | |
self.change_language(self.lang, []) | |
self.df = None | |
self.available_car_models = None | |
self.set_prompts() | |
self.columns_in_response = ["marke", "modell", "kategorie", "kilometer", "preis", "grundfarbe"] | |
self.columns_in_response_eng = ["brand", "model", "category", "kilometer", "price", "color"] | |
def set_prompts(self): | |
self.eng_system_prompt = f"""You are a virtual car salesman salesman assistant for an online German car dealership. You only recommend cars that are currently in your inventory. | |
To access your inventory return dataframe query method like this: | |
``` | |
df.query("brand.str.contains('ford')") | |
``` | |
The dataframe has the following columns: ["brand", "model", "category", "kilometer", "price"], with all values in lowercase. | |
The available car categories are: limousine, offroad, estatecar, smallcar, othercar, and carbo. | |
The available car brands are: {self.available_car_models}. | |
To search for specific car information, use the contains() method instead of equals(). | |
Do not say that any models in your inventory or that any models are available, show the query instead. | |
Do not ask the user if you need to check the inventory, show them how to query the dataframe. | |
Users should not know that you are using a dataframe. They should think that you are using a database. | |
If you need to check the inventory to answer the user's question, only show the query and nothing else.""" | |
self.ger_system_prompt = f"""Du bist ein virtueller Autoverkäufer für einen deutschen Online-Autohändler. Du empfiehlst nur Autos, die derzeit in deinem Inventar sind. | |
Um auf dein Inventar zuzugreifen, gibst du die Datenbankabfrage-Methode wie folgt ein: | |
``` | |
df.query("marke.str.contains('ford')") | |
``` | |
Die Datenbank hat die folgenden Spalten: ["marke", "modell", "kategorie", "kilometer", "preis"], mit allen Werten in Kleinbuchstaben. | |
Die verfügbaren Autokategorien sind: limousine, offroad, estatecar, smallcar, othercar und carbo. | |
Die verfügbaren Automarken sind: {self.available_car_models}. | |
Um nach bestimmten Fahrzeuginformationen zu suchen, verwende die contains() -Methode anstelle der equals() -Methode. | |
Sage nicht, dass irgendeine Modelle in deinem Inventar sind oder dass irgendeine Modelle verfügbar sind, zeige die Abfrage an. | |
Frag den Benutzer nicht, ob du das Inventar überprüfen musst, zeige ihm, wie du die Datenbankabfrage durchführst. | |
Benutzer sollten nicht wissen, dass du eine Datenbank verwendest. Sie sollten denken, dass du eine Datenbank verwendest. | |
Wenn du das Inventar überprüfen musst, um die Frage des Benutzers zu beantworten, zeige nur die Abfrage und nichts anderes an.""" | |
def change_language(self, lang, history): | |
self.lang = lang if lang in ["English", "German"] else "English" | |
if self.lang == "English": | |
self.system_prompt = self.eng_system_prompt | |
if not self.api_key: | |
history = [(None, "Please enter your OpenAI API KEY")] | |
return history | |
if self.df is None: | |
history = [(None, "Please upload a csv file")] | |
return history | |
else: | |
self.system_prompt = self.ger_system_prompt | |
if not self.api_key: | |
history = [(None, "Bitte geben Sie Ihren OpenAI API-Schlüssel ein")] | |
return history | |
if self.df is None: | |
history = [(None, "Bitte laden Sie eine csv-Datei hoch")] | |
return history | |
return [] | |
def process_file(self, file, history): | |
# check if file is a string | |
filename = file | |
if not isinstance(file, str): | |
# throw error if file is not csv | |
if file.name.split(".")[-1] != "csv": | |
if lang == "English": | |
history.append((None, "Please upload a csv file")) | |
else: | |
history.append((None, "Bitte laden Sie eine csv-Datei hoch")) | |
return history | |
else: | |
filename = file.name | |
df = pd.read_csv(filename) | |
# drop all rows with at least one NaN value | |
# only use columns marke modell kategorie kilomeret preis | |
df = df[self.columns_in_response + ["interne_nummer", "Modellbasis"]] | |
df = self._construct_urls(df) | |
df = df.rename(columns={"modell2": "modell"}) | |
df = df.dropna() | |
# convert price and kilometer to int | |
df["preis"] = df["preis"].astype(int) | |
df["kilometer"] = df["kilometer"].astype(int) | |
# make marke modell kategorie lowercase | |
df["marke"] = df["marke"].str.lower().str.strip() | |
df["modell"] = df["modell"].str.lower().str.strip() | |
df["grundfarbe"] = df["grundfarbe"].str.lower().str.strip() | |
df["kategorie"] = df["kategorie"].str.lower().str.replace("car.", "").str.strip() | |
# remove duplicates | |
df = df.drop_duplicates() | |
self.available_car_models = str(df["marke"].unique()).replace('\n', "") | |
print(self.available_car_models) | |
if self.lang == "English": | |
df = df.rename( | |
columns={"marke": "brand", "modell": "model", "kategorie": "category", "kilometer": "kilometer", | |
"preis": "price", "grundfarbe": "color"}) | |
df = df.loc[:, ~df.columns.duplicated()] | |
self.df = df | |
self.set_prompts() | |
self.change_language(self.lang, history) | |
if self.api_key: | |
history = [] | |
return history | |
def _construct_urls(self, df): | |
for index, row in df.iterrows(): | |
# check if all values are not na | |
if row.isna().any(): | |
continue | |
marke = row['marke'] | |
modellbasis = row['Modellbasis'] | |
interne_nummer = row['interne_nummer'] | |
base_url = f"https://www.hugopfohe.de/gebrauchtwagen/details/" | |
df.at[index, 'url'] = f"{base_url}{marke}_{modellbasis}_{interne_nummer}/?&etcc_cmp=consultant" | |
return df | |
def check_requirements(self, query, history): | |
if query.startswith("sk-"): | |
self.api_key = query.strip("\n").strip() | |
openai.api_key = query.strip("\n").strip() | |
return self.process_file("./AI Car-Test - complete data.csv", []), False | |
if not self.api_key: | |
if self.lang == "English": | |
history.append((None, "Please enter your OpenAI API KEY")) | |
else: | |
history.append((None, "Bitte geben Sie Ihren OpenAI API-Schlüssel ein")) | |
return history, False | |
if self.df is None: | |
return self.process_file("./AI Car-Test - complete data.csv", []), False | |
if self.lang == "English": | |
history.append((None, "Please upload a csv file. (This might take a few minutes)")) | |
else: | |
history.append((None, "Bitte laden Sie eine csv-Datei hoch. (Dies kann einige Minuten dauern)")) | |
return history, False | |
return history, True | |
def run_text(self, history, query): | |
history, passed = self.check_requirements(query, history) | |
if not passed: | |
return history | |
messages = [] | |
# add messages from history | |
for q, response in history: | |
if q: | |
messages.append({"role": "user", "content": q}) | |
if response: | |
messages.append({"role": "assistant", "content": response}) | |
# add system message 1 message before last | |
messages.append({"role": "system", "content": self.system_prompt}) | |
# remove all messages in front of for the first one | |
# if the total number of tokens is greater than 3000 | |
total_tokens = self._count_tokens(messages) | |
while total_tokens > 1200: | |
messages.pop(1) | |
total_tokens = self._count_tokens(messages) | |
messages.append({"role": "user", "content": query}) | |
# print(prompt) | |
print(messages) | |
# generate response | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", # change to gpt-3.5-turbo if don't have access | |
messages=messages, | |
temperature=0.0, | |
)['choices'][0]['message']['content'] | |
if "df.query" in response: | |
response = self.query_db(response, messages) | |
# only add query and response to history | |
# the context is not needed | |
history.append((query, response)) | |
return history | |
def query_db(self, response, messages): | |
# remove first before last message (previous system prompt) | |
messages.pop(-2) | |
# extract line that contains the query | |
for line in response.split("\n"): | |
if "df.query" in line: | |
query_line = line.strip() | |
break | |
result = eval("self." + query_line) | |
print(query_line) | |
if len(result) == 0: | |
if self.lang == "English": | |
messages.insert(-1, {"role": "system", | |
"content": "You are a virtual car salesman assistant in an online car dealership. There are no results this query. Help user adjust their preferences or recommend other brands."}) | |
else: | |
messages.insert(-1, {"role": "system", | |
"content": "Sie sind ein virtueller Autohändler-Assistent in einem Online-Autohaus. Keine Ergebnisse für diese Abfrage. Benutzer bei der Anpassung ihrer Präferenzen oder Empfehlung anderer Marken unterstützen"}) | |
else: | |
# check if result is a dataframe | |
if isinstance(result, pd.DataFrame): | |
if len(result) > 5: | |
result = result.sample(n=5) | |
if self.lang == "English": | |
result = result[self.columns_in_response_eng + ['url']] | |
else: | |
result = result[self.columns_in_response + ['url']] | |
result = result.to_string(index=False).replace("<br>", "") | |
elif isinstance(result, pd.Series): | |
result = result.to_string(index=False).replace("<br>", "") | |
elif isinstance(result, List): | |
result = ", ".join(result) | |
else: | |
result = str(result) | |
print(result) | |
# add message to history 1 before last | |
if self.lang == "English": | |
messages.insert(-1, {"role": "system", | |
"content": f"You are a virtual car salesman salesman assistant for an online car dealership. You only recommend cars that are currently in stock. Advice cars from this list that are currently in stock:\n\n{result}\n\n" + "Include links to cars in your recommendation from the url column."}) | |
else: | |
messages.insert(-1, | |
{"role": "system", | |
"content": f"Sie sind ein virtueller Autohändler-Assistent in einem Online-Autohaus. Sie empfehlen nur Autos, die derzeit auf Lager sind. Beraten Sie Autos aus dieser Liste, die derzeit auf Lager sind:\n\n{result}\n\n" + "Fügen Sie Links zu Autos in Ihrer Empfehlung aus der Spalte url hinzu."}) | |
response = openai.ChatCompletion.create( | |
model="gpt-3.5-turbo", | |
messages=messages, | |
temperature=0.1, | |
)['choices'][0]['message']['content'] | |
print(response) | |
response = response.replace("<br>", "") | |
# Use a regular expression to replace more than two consecutive empty lines with just two empty lines | |
response = re.sub(r'\n{3,}', '\n\n', response) | |
return response | |
def _count_tokens(self, messages): | |
total_tokens = 0 | |
for message in messages: | |
total_tokens += len(encoding.encode(message['content'])) | |
return total_tokens | |
if __name__ == '__main__': | |
bot = Bot() | |
# ui | |
with gr.Blocks(css="#chatbot .overflow-y-auto{height:500px}") as demo: | |
chatbot = gr.Chatbot([(None, "Please enter your OpenAI API KEY")], elem_id="chatbot", | |
label="HUGO PFOHE - AI Car Salesman").style(height=500) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
txt = gr.Textbox(show_label=False, placeholder="What car would you like to buy?").style( | |
container=False) | |
# with gr.Column(scale=0.15): | |
# submit = gr.Button("Submit") | |
with gr.Row(): | |
with gr.Column(scale=0.34, min_width=0): | |
lang = gr.inputs.Dropdown(choices=['English', 'German'], label="Language", default=bot.lang) | |
# with gr.Column(scale=0.33, min_width=0): | |
# clear = gr.Button("Clear") | |
# with gr.Column(scale=0.33, min_width=0): | |
# btn = gr.UploadButton("Upload CSV", file_types=["csv"]) | |
with gr.Column(scale=0.66, min_width=0): | |
with gr.Row(): | |
clear = gr.Button("Clear") | |
with gr.Row(): | |
btn = gr.UploadButton("Upload CSV", file_types=["csv"]) | |
txt.submit(bot.run_text, [chatbot, txt], chatbot) | |
txt.submit(lambda: "", None, txt) | |
lang.change(bot.change_language, [lang, chatbot], chatbot) | |
# submit.click(txt.submit, [chatbot, txt], chatbot) | |
# submit.click(lambda: "", None, txt) | |
clear.click(lambda: [], None, chatbot) | |
btn.upload(bot.process_file, [btn, chatbot], chatbot) | |
demo.launch() | |