import chromadb
import os
import gradio as gr
import json
from huggingface_hub import InferenceClient
path='/Users/thiloid/Desktop/LSKI/ole_nest/Chatbot/LLM/chroma'
if(os.path.exists(path)==False): path="/home/user/app/chroma"
print(path)
client = chromadb.PersistentClient(path=path)
print(client.heartbeat())
print(client.get_version())
print(client.list_collections())
from chromadb.utils import embedding_functions
default_ef = embedding_functions.DefaultEmbeddingFunction()
sentence_transformer_ef = embedding_functions.SentenceTransformerEmbeddingFunction(model_name="T-Systems-onsite/cross-en-de-roberta-sentence-transformer")#"VAGOsolutions/SauerkrautLM-Mixtral-8x7B-Instruct")
collection = client.get_collection(name="chromatsc", embedding_function=sentence_transformer_ef)
client = InferenceClient("mistralai/Mixtral-8x7B-Instruct-v0.1")
from huggingface_hub import InferenceClient
from nltk.tokenize import word_tokenize
import pandas as pd
import string
import re
osa= pd.read_excel("/home/user/app/OSA.xlsx", nrows=136)
osa.loc[osa["Hochschule"] == "Übergreifend", "Hochschule"] = " "
osa.loc[osa["OSA: Fach (original)"] == "Allgemein", "OSA: Fach (original)"] = " "
# text that will be added to chatbot answer
osa.loc[osa["Link_Studium_Allgemein"].notna(), "Link_Studium_Allgemein"] = "Basierend auf deiner Frage empfehele ich dir diesen Interessenstest: "+ osa["Link_Studium_Allgemein"]
osa.loc[osa["Link_Uni_Allgemein"].notna(), "Link_Uni_Allgemein"] = "Basierend auf deinem Interessa an der "+osa["Hochschule"]+" empfehele ich dir diesen Interessenstest: "+ osa["Link_Uni_Allgemein"]
osa.loc[osa["Link_Fach"].notna(), "Link_Fach"] = "Basierend auf deinem Interessa an der " +osa["Hochschule"]+ " "+osa["OSA: Fach (original)"]+" zu studieren, empfehele ich dir diesen Interessenstest: "+ osa["Link_Fach"]
osa["chattext"]= osa["Link_Studium_Allgemein"].fillna('')+ osa["Link_Uni_Allgemein"].fillna('')+osa["Link_Fach"].fillna('')
# Text to compare with user prompt
osa["combi"]= osa["Hochschule"]+ " "+ osa["OSA: Fach (original)"]
osalist= osa["combi"].tolist()
osalist
def simosa(prompt, osalist, osa):
lcos = []
prompt = prompt.lower()
p_list = word_tokenize(prompt)
# Form a set containing keywords of the prompt
sw = [",", "?"]
p_set = {w for w in p_list if not w in sw}
for val in osalist:
val = val.lower()
v_list = word_tokenize(val)
# Form a set containing keywords of the current value
v_set = {w for w in v_list if not w in sw}
# Union of both sets
rvector = p_set.union(v_set)
# Create vectors
l1 = [1 if w in p_set else 0 for w in rvector]
l2 = [1 if w in v_set else 0 for w in rvector]
# Compute cosine similarity
dot_product = sum(l1[i] * l2[i] for i in range(len(rvector)))
magnitude1 = sum(l1)
magnitude2 = sum(l2)
if magnitude1 == 0 or magnitude2 == 0:
cosine = 0.0
else:
cosine = dot_product / float((magnitude1 * magnitude2) ** 0.5)
lcos.append(cosine)
osa["testsim"]=lcos
match=osa.loc[osa['testsim'].idxmax()]["testsim"]
#print(match)
if match >0.29:
answer = str(osa.loc[osa['testsim'].idxmax()]["chattext"])
else:
answer= "Wenn du dir unsicher bist, was du studieren könntest oder ob deine Fähigkeiten ausreichen, dann mach doch diesen Test (https://www.was-studiere-ich.de/) oder schau dir mal diese Seminare an (https://www.bw-best.de)."
return answer
def parse_for_nc(text):
'''
Parses text for words relating to NC and Abiturnote
:param text: a string
:return: an automatic response in form of a string
'''
nc_words = [" nc ", "abischnitt", "abiturschnitt", "abinote", "abiturnote", "ncschnitt", "abschlussnote", "abschlussdurchschnitt", "abschlussnote", "zulassungsbeschränkung", "numerus clausus", "noten"]
response = "Wenn du dir unsicher bist, ob du die Zulassungsvoraussetzungen zu einem Studiengang erfüllst, schau am besten einmal auf der Website der Universität nach, was gefordert ist.\n Häufig entscheidet nicht allein die Abiturnote die Zulassung, sondern auch Faktoren wie praktische Erfahrung oder ein FSJ.\n Lass dich außerdem nicht von den NCs vergangener Jahre verunsichern.\n Der NC gibt nur an, was im vergangenen Jahr die schlechteste Note des regulären Prozesses war, mit der man noch zugelassen wurde.\n Der NC kann sich also von Jahr zu Jahr verändern und oft werden auch Leute zugelassen, die einen schlechteren Schnitt ab (bspw. durch Wartesemester).\n Wenn du dir hingegen unsicher bist, ob deine Fähigkeiten mit denen des Fachs übereinstimmen,\n dann mach doch vielleicht mal einen Test. Außerdem gibt es Aufbau- und Vorbereitungskurse mittels derer du Wissen und Fähigkeiten aufbauen kannst."
# if the string is not empty
if text:
text = text.strip() # strip
text = text.lower() # lower all letters
text = text.translate(str.maketrans('', '', string.punctuation)) # remove punctuation
text = " "+text+" " # add whitespaces so that nc can be found correctly
for nc_word in nc_words:
if nc_word in text:
return response
return "No"
def format_prompt(message):
prompt = "" #""
#for user_prompt, bot_response in history:
# prompt += f"[INST] {user_prompt} [/INST]"
# prompt += f" {bot_response} "
prompt += f"[INST] {message} [/INST]"
return prompt
def responsecritical(
prompt, temperature=0.9, max_new_tokens=500, top_p=0.95, repetition_penalty=1.0,
):
temperature = float(temperature)
if temperature < 1e-2: temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=42,
)
system="Bitte evaluiere ob die Frage soziokulturell oder allgemein problematisch oder auch sensibel oder politisch ist. Antworte ausschließlich mit Ja wenn sie soziokulturell oder allgemein problematisch ist, ansonsten nur mit Nein. Erkläre deine Entscheidung nicht.\n\nUser-Anliegen:"
formatted_prompt = format_prompt(system+"\n"+prompt)
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
sentence_lower = output.lower()
# Check if the word 'nein' is in the sentence
if 'nein' in sentence_lower:
return True
else:
return False
def response(
prompt, temperature=0.9, max_new_tokens=500, top_p=0.95, repetition_penalty=1.0,
):
temperature = float(temperature)
if temperature < 1e-2: temperature = 1e-2
top_p = float(top_p)
generate_kwargs = dict(
temperature=temperature,
max_new_tokens=max_new_tokens,
top_p=top_p,
repetition_penalty=repetition_penalty,
do_sample=True,
seed=42,
)
if responsecritical(prompt)==False:
return "Es scheint so, als sei dies keine Frage, die sich auf die Studienorientierung bezieht"
else:
answernc=parse_for_nc(prompt)
if answernc!="No":
return answernc
else:
prompt = re.sub(r'\buni\b', 'Universität', prompt, flags=re.IGNORECASE)
addon=""
results=collection.query(
query_texts=[prompt],
n_results=60
)
dists=["
(relevance: "+str(round((1-d)*100)/100)+";" for d in results['distances'][0]]
results=results['documents'][0]
combination = zip(results,dists)
combination = [' '.join(triplets) for triplets in combination]
if(len(results)>1):
addon=" Bitte berücksichtige bei deiner Antwort ausschießlich folgende Auszüge aus unserer Datenbank, sofern sie für die Antwort erforderlich sind. Beantworte die Frage knapp und präzise. Ignoriere unpassende Datenbank-Auszüge OHNE sie zu kommentieren, zu erwähnen oder aufzulisten:\n"+"\n".join(results)
system="Du bist ein deutschsprachiges KI-basiertes Studienberater Assistenzsystem, das zu jedem Anliegen möglichst geeignete Studieninformationen empfiehlt."+addon+"\n\nUser-Anliegen:"
formatted_prompt = format_prompt(system+"\n"+prompt)
stream = client.text_generation(formatted_prompt, **generate_kwargs, stream=True, details=True, return_full_text=False)
output = ""
for response in stream:
output += response.token.text
osaanswer=simosa(prompt, osalist, osa)
output=output[:-4]+"\n"+osaanswer
yield output
gr.ChatInterface(response, chatbot=gr.Chatbot(value=[[None,"Herzlich willkommen! Ich bin Chätti ein KI-basiertes Studienassistenzsystem, das für jede Anfrage die am besten Studieninformationen empfiehlt.
Erzähle mir, was du gerne tust!"]],render_markdown=True),title="German Studyhelper Chätti").queue().launch(share=True) #False, server_name="0.0.0.0", server_port=7864)
print("Interface up and running!")