vrsen's picture
Added url parameter
448581a
raw
history blame contribute delete
No virus
14.2 kB
from typing import List
import openai
import gradio as gr
import pandas as pd
import tiktoken
import re
from openai.error import AuthenticationError
encoding = tiktoken.encoding_for_model("gpt-3.5-turbo")
pd.set_option('display.max_columns', None) # Display all columns
pd.set_option('display.width', None) # Automatically adjust the width to fit the content
class Bot():
def __init__(self):
self.ger_system_prompt = None
self.eng_system_prompt = None
self.api_key = None
self.lang = "English"
self.system_prompt = ""
self.change_language(self.lang, [])
self.df = None
self.available_car_models = None
self.set_prompts()
self.columns_in_response = ["marke", "modell", "kategorie", "kilometer", "preis", "grundfarbe"]
self.columns_in_response_eng = ["brand", "model", "category", "kilometer", "price", "color"]
def set_prompts(self):
self.eng_system_prompt = f"""You are a virtual car salesman salesman assistant for an online German car dealership. You only recommend cars that are currently in your inventory.
To access your inventory return dataframe query method like this:
```
df.query("brand.str.contains('ford')")
```
The dataframe has the following columns: ["brand", "model", "category", "kilometer", "price"], with all values in lowercase.
The available car categories are: limousine, offroad, estatecar, smallcar, othercar, and carbo.
The available car brands are: {self.available_car_models}.
To search for specific car information, use the contains() method instead of equals().
Do not say that any models in your inventory or that any models are available, show the query instead.
Do not ask the user if you need to check the inventory, show them how to query the dataframe.
Users should not know that you are using a dataframe. They should think that you are using a database.
If you need to check the inventory to answer the user's question, only show the query and nothing else."""
self.ger_system_prompt = f"""Du bist ein virtueller Autoverkäufer für einen deutschen Online-Autohändler. Du empfiehlst nur Autos, die derzeit in deinem Inventar sind.
Um auf dein Inventar zuzugreifen, gibst du die Datenbankabfrage-Methode wie folgt ein:
```
df.query("marke.str.contains('ford')")
```
Die Datenbank hat die folgenden Spalten: ["marke", "modell", "kategorie", "kilometer", "preis"], mit allen Werten in Kleinbuchstaben.
Die verfügbaren Autokategorien sind: limousine, offroad, estatecar, smallcar, othercar und carbo.
Die verfügbaren Automarken sind: {self.available_car_models}.
Um nach bestimmten Fahrzeuginformationen zu suchen, verwende die contains() -Methode anstelle der equals() -Methode.
Sage nicht, dass irgendeine Modelle in deinem Inventar sind oder dass irgendeine Modelle verfügbar sind, zeige die Abfrage an.
Frag den Benutzer nicht, ob du das Inventar überprüfen musst, zeige ihm, wie du die Datenbankabfrage durchführst.
Benutzer sollten nicht wissen, dass du eine Datenbank verwendest. Sie sollten denken, dass du eine Datenbank verwendest.
Wenn du das Inventar überprüfen musst, um die Frage des Benutzers zu beantworten, zeige nur die Abfrage und nichts anderes an."""
def change_language(self, lang, history):
self.lang = lang if lang in ["English", "German"] else "English"
if self.lang == "English":
self.system_prompt = self.eng_system_prompt
if not self.api_key:
history = [(None, "Please enter your OpenAI API KEY")]
return history
if self.df is None:
history = [(None, "Please upload a csv file")]
return history
else:
self.system_prompt = self.ger_system_prompt
if not self.api_key:
history = [(None, "Bitte geben Sie Ihren OpenAI API-Schlüssel ein")]
return history
if self.df is None:
history = [(None, "Bitte laden Sie eine csv-Datei hoch")]
return history
return []
def process_file(self, file, history):
# check if file is a string
filename = file
if not isinstance(file, str):
# throw error if file is not csv
if file.name.split(".")[-1] != "csv":
if lang == "English":
history.append((None, "Please upload a csv file"))
else:
history.append((None, "Bitte laden Sie eine csv-Datei hoch"))
return history
else:
filename = file.name
df = pd.read_csv(filename)
# drop all rows with at least one NaN value
# only use columns marke modell kategorie kilomeret preis
df = df[self.columns_in_response + ["interne_nummer", "Modellbasis"]]
df = self._construct_urls(df)
df = df.rename(columns={"modell2": "modell"})
df = df.dropna()
# convert price and kilometer to int
df["preis"] = df["preis"].astype(int)
df["kilometer"] = df["kilometer"].astype(int)
# make marke modell kategorie lowercase
df["marke"] = df["marke"].str.lower().str.strip()
df["modell"] = df["modell"].str.lower().str.strip()
df["grundfarbe"] = df["grundfarbe"].str.lower().str.strip()
df["kategorie"] = df["kategorie"].str.lower().str.replace("car.", "").str.strip()
# remove duplicates
df = df.drop_duplicates()
self.available_car_models = str(df["marke"].unique()).replace('\n', "")
print(self.available_car_models)
if self.lang == "English":
df = df.rename(
columns={"marke": "brand", "modell": "model", "kategorie": "category", "kilometer": "kilometer",
"preis": "price", "grundfarbe": "color"})
df = df.loc[:, ~df.columns.duplicated()]
self.df = df
self.set_prompts()
self.change_language(self.lang, history)
if self.api_key:
history = []
return history
def _construct_urls(self, df):
for index, row in df.iterrows():
# check if all values are not na
if row.isna().any():
continue
marke = row['marke']
modellbasis = row['Modellbasis']
interne_nummer = row['interne_nummer']
base_url = f"https://www.hugopfohe.de/gebrauchtwagen/details/"
df.at[index, 'url'] = f"{base_url}{marke}_{modellbasis}_{interne_nummer}/?&etcc_cmp=consultant"
return df
def check_requirements(self, query, history):
if query.startswith("sk-"):
self.api_key = query.strip("\n").strip()
openai.api_key = query.strip("\n").strip()
return self.process_file("./AI Car-Test - complete data.csv", []), False
if not self.api_key:
if self.lang == "English":
history.append((None, "Please enter your OpenAI API KEY"))
else:
history.append((None, "Bitte geben Sie Ihren OpenAI API-Schlüssel ein"))
return history, False
if self.df is None:
return self.process_file("./AI Car-Test - complete data.csv", []), False
if self.lang == "English":
history.append((None, "Please upload a csv file. (This might take a few minutes)"))
else:
history.append((None, "Bitte laden Sie eine csv-Datei hoch. (Dies kann einige Minuten dauern)"))
return history, False
return history, True
def run_text(self, history, query):
history, passed = self.check_requirements(query, history)
if not passed:
return history
messages = []
# add messages from history
for q, response in history:
if q:
messages.append({"role": "user", "content": q})
if response:
messages.append({"role": "assistant", "content": response})
# add system message 1 message before last
messages.append({"role": "system", "content": self.system_prompt})
# remove all messages in front of for the first one
# if the total number of tokens is greater than 3000
total_tokens = self._count_tokens(messages)
while total_tokens > 1200:
messages.pop(1)
total_tokens = self._count_tokens(messages)
messages.append({"role": "user", "content": query})
# print(prompt)
print(messages)
# generate response
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo", # change to gpt-3.5-turbo if don't have access
messages=messages,
temperature=0.0,
)['choices'][0]['message']['content']
if "df.query" in response:
response = self.query_db(response, messages)
# only add query and response to history
# the context is not needed
history.append((query, response))
return history
def query_db(self, response, messages):
# remove first before last message (previous system prompt)
messages.pop(-2)
# extract line that contains the query
for line in response.split("\n"):
if "df.query" in line:
query_line = line.strip()
break
result = eval("self." + query_line)
print(query_line)
if len(result) == 0:
if self.lang == "English":
messages.insert(-1, {"role": "system",
"content": "You are a virtual car salesman assistant in an online car dealership. There are no results this query. Help user adjust their preferences or recommend other brands."})
else:
messages.insert(-1, {"role": "system",
"content": "Sie sind ein virtueller Autohändler-Assistent in einem Online-Autohaus. Keine Ergebnisse für diese Abfrage. Benutzer bei der Anpassung ihrer Präferenzen oder Empfehlung anderer Marken unterstützen"})
else:
# check if result is a dataframe
if isinstance(result, pd.DataFrame):
if len(result) > 5:
result = result.sample(n=5)
if self.lang == "English":
result = result[self.columns_in_response_eng + ['url']]
else:
result = result[self.columns_in_response + ['url']]
result = result.to_string(index=False).replace("<br>", "")
elif isinstance(result, pd.Series):
result = result.to_string(index=False).replace("<br>", "")
elif isinstance(result, List):
result = ", ".join(result)
else:
result = str(result)
print(result)
# add message to history 1 before last
if self.lang == "English":
messages.insert(-1, {"role": "system",
"content": f"You are a virtual car salesman salesman assistant for an online car dealership. You only recommend cars that are currently in stock. Advice cars from this list that are currently in stock:\n\n{result}\n\n" + "Include links to cars in your recommendation from the url column."})
else:
messages.insert(-1,
{"role": "system",
"content": f"Sie sind ein virtueller Autohändler-Assistent in einem Online-Autohaus. Sie empfehlen nur Autos, die derzeit auf Lager sind. Beraten Sie Autos aus dieser Liste, die derzeit auf Lager sind:\n\n{result}\n\n" + "Fügen Sie Links zu Autos in Ihrer Empfehlung aus der Spalte url hinzu."})
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=messages,
temperature=0.1,
)['choices'][0]['message']['content']
print(response)
response = response.replace("<br>", "")
# Use a regular expression to replace more than two consecutive empty lines with just two empty lines
response = re.sub(r'\n{3,}', '\n\n', response)
return response
def _count_tokens(self, messages):
total_tokens = 0
for message in messages:
total_tokens += len(encoding.encode(message['content']))
return total_tokens
if __name__ == '__main__':
bot = Bot()
# ui
with gr.Blocks(css="#chatbot .overflow-y-auto{height:500px}") as demo:
chatbot = gr.Chatbot([(None, "Please enter your OpenAI API KEY")], elem_id="chatbot",
label="HUGO PFOHE - AI Car Salesman").style(height=500)
with gr.Row():
with gr.Column(scale=1):
txt = gr.Textbox(show_label=False, placeholder="What car would you like to buy?").style(
container=False)
# with gr.Column(scale=0.15):
# submit = gr.Button("Submit")
with gr.Row():
with gr.Column(scale=0.34, min_width=0):
lang = gr.inputs.Dropdown(choices=['English', 'German'], label="Language", default=bot.lang)
# with gr.Column(scale=0.33, min_width=0):
# clear = gr.Button("Clear")
# with gr.Column(scale=0.33, min_width=0):
# btn = gr.UploadButton("Upload CSV", file_types=["csv"])
with gr.Column(scale=0.66, min_width=0):
with gr.Row():
clear = gr.Button("Clear")
with gr.Row():
btn = gr.UploadButton("Upload CSV", file_types=["csv"])
txt.submit(bot.run_text, [chatbot, txt], chatbot)
txt.submit(lambda: "", None, txt)
lang.change(bot.change_language, [lang, chatbot], chatbot)
# submit.click(txt.submit, [chatbot, txt], chatbot)
# submit.click(lambda: "", None, txt)
clear.click(lambda: [], None, chatbot)
btn.upload(bot.process_file, [btn, chatbot], chatbot)
demo.launch()