Spaces:
Runtime error
Runtime error
import requests | |
import os, sys, json | |
import gradio as gr | |
import openai | |
from openai import OpenAI | |
import time | |
import re | |
import io | |
from PIL import Image, ImageDraw, ImageOps, ImageFont | |
import base64 | |
import tempfile | |
from reportlab.pdfgen import canvas | |
from reportlab.lib.pagesizes import letter | |
from reportlab.pdfbase import pdfmetrics | |
from reportlab.lib.styles import getSampleStyleSheet | |
from reportlab.platypus import SimpleDocTemplate, Paragraph | |
from PyPDF2 import PdfReader, PdfWriter | |
from hugchat import hugchat | |
from hugchat.login import Login | |
from tavily import TavilyClient | |
from langchain.chains import LLMChain, RetrievalQA | |
from langchain.chat_models import ChatOpenAI | |
from langchain.document_loaders import PyPDFLoader, WebBaseLoader, UnstructuredWordDocumentLoader, DirectoryLoader | |
from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader | |
from langchain.document_loaders.generic import GenericLoader | |
from langchain.document_loaders.parsers import OpenAIWhisperParser | |
from langchain.schema import AIMessage, HumanMessage | |
from langchain.llms import HuggingFaceHub | |
from langchain.llms import HuggingFaceTextGenInference | |
from langchain.embeddings import HuggingFaceInstructEmbeddings, HuggingFaceEmbeddings, HuggingFaceBgeEmbeddings, HuggingFaceInferenceAPIEmbeddings | |
from langchain.retrievers.tavily_search_api import TavilySearchAPIRetriever | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.prompts import PromptTemplate | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.vectorstores import Chroma | |
from chromadb.errors import InvalidDimensionException | |
from utils import * | |
from beschreibungen import * | |
#from langchain.vectorstores import MongoDBAtlasVectorSearch | |
#from pymongo import MongoClient | |
from dotenv import load_dotenv, find_dotenv | |
_ = load_dotenv(find_dotenv()) | |
############################################### | |
#globale Variablen | |
############################################## | |
#nur bei ersten Anfrage splitten der Dokumente - um die Vektordatenbank entsprechend zu füllen | |
splittet = False | |
#DB für Vektorstore | |
db = None | |
############################################# | |
# Allgemeine Konstanten | |
#Filepath zu temp Folder (temp) mit File von ausgewähltem chatverlauf | |
file_path_download = "" | |
################################################## | |
#Für MongoDB statt Chroma als Vektorstore | |
#MONGODB_URI = os.environ["MONGODB_ATLAS_CLUSTER_URI"] | |
#client = MongoClient(MONGODB_URI) | |
#MONGODB_DB_NAME = "langchain_db" | |
#MONGODB_COLLECTION_NAME = "gpt-4" | |
#MONGODB_COLLECTION = client[MONGODB_DB_NAME][MONGODB_COLLECTION_NAME] | |
#MONGODB_INDEX_NAME = "default" | |
#Plattform Keys aus den Secrets holen zu diesem Space | |
HUGGINGFACEHUB_API_TOKEN = os.getenv("HF_ACCESS_READ") | |
OAI_API_KEY=os.getenv("OPENAI_API_KEY") | |
HEADERS = {"Authorization": f"Bearer {HUGGINGFACEHUB_API_TOKEN}"} | |
TAVILY_KEY = os.getenv("TAVILY_KEY") | |
os.environ["TAVILY_API_KEY"] = TAVILY_KEY | |
################################################ | |
#LLM Model mit dem gearbeitet wird | |
#openai------------------------------------- | |
#MODEL_NAME = "gpt-3.5-turbo-16k" | |
#MODEL_NAME = "gpt-3.5-turbo-1106" | |
MODEL_NAME= "gpt-4-1106-preview" | |
MODEL_NAME_IMAGE = "gpt-4-vision-preview" | |
MODEL_NAME_CODE = "code-davinci-002" | |
#verfügbare Modelle anzeigen lassen | |
#HuggingFace Reop ID-------------------------------- | |
#repo_id = "meta-llama/Llama-2-13b-chat-hf" | |
repo_id = "HuggingFaceH4/zephyr-7b-alpha" #das Modell ist echt gut!!! Vom MIT | |
#repo_id = "TheBloke/Yi-34B-Chat-GGUF" | |
#repo_id = "meta-llama/Llama-2-70b-chat-hf" | |
#repo_id = "tiiuae/falcon-40b" | |
#repo_id = "Vicuna-33b" | |
#repo_id = "alexkueck/ChatBotLI2Klein" | |
#repo_id = "mistralai/Mistral-7B-v0.1" | |
#repo_id = "internlm/internlm-chat-7b" | |
#repo_id = "Qwen/Qwen-7B" | |
#repo_id = "Salesforce/xgen-7b-8k-base" | |
#repo_id = "Writer/camel-5b-hf" | |
#repo_id = "databricks/dolly-v2-3b" | |
#repo_id = "google/flan-t5-xxl" | |
#repo_id = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
#repo_id = "abacusai/Smaug-72B-v0.1" | |
#HuggingFace Model name-------------------------------- | |
MODEL_NAME_HF = "mistralai/Mixtral-8x7B-Instruct-v0.1" | |
#MODLE_NAME_HF = "abacusai/Smaug-72B-v0.1" | |
MODEL_NAME_OAI_ZEICHNEN = "dall-e-3" | |
#Alternativ zeichnen: Stabe Diffusion from HF: | |
#API Inference allgemien: https://api-inference.huggingface.co/models/{model} | |
#Zeichnen | |
API_URL = "https://api-inference.huggingface.co/models/stabilityai/stable-diffusion-2-1" | |
#Textgenerierung | |
API_URL_TEXT = "https://api-inference.huggingface.co/models/argilla/notux-8x7b-v1" | |
############################################### | |
# Formatierung im PDF - Konstanten setzen | |
# Breite und Höhe für Spalten | |
COLUMN_WIDTH = 150 | |
ROW_HEIGHT = 20 | |
# Bereiche für Spalten | |
TIMESTAMP_X = 50 | |
USER_X = TIMESTAMP_X + COLUMN_WIDTH | |
ASSISTANT_X = USER_X + COLUMN_WIDTH | |
# Rand und Abstand zwischen Zeilen | |
MARGIN = 50 | |
LINE_SPACING = 10 | |
################################################ | |
#HF Hub Zugriff ermöglichen | |
############################################### | |
os.environ["HUGGINGFACEHUB_API_TOKEN"] = HUGGINGFACEHUB_API_TOKEN | |
############################################### | |
#Alternativ: HuggingChat API nutzen | |
pw=os.getenv("HFPW") | |
email= os.getenv("HFEMail") | |
#sign = Login(email, pw) | |
#cookies = sign.login() | |
# Save cookies to the local directory | |
#cookie_path_dir = "cookies_hf" | |
#sign.saveCookiesToDir(cookie_path_dir) | |
################################################ | |
#OpenAI Zugang, client und Assistant einmal erzeugen. | |
################################################ | |
#zentral einmal erzeugen!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! | |
client = OpenAI() | |
general_assistant_file = client.beta.assistants.create(name="File Analysator",instructions=template, model="gpt-4-1106-preview",) | |
thread_file = client.beta.threads.create() | |
general_assistant_suche= openai_assistant_suche(client) | |
################################################# | |
################################################# | |
################################################# | |
#Funktionen zur Verarbeitung | |
################################################ | |
############################################## | |
#wenn löschen Button geklickt | |
def clear_all(history, uploaded_file_paths, chats): | |
dic_history = {schluessel: wert for schluessel, wert in history} | |
print("start history:") | |
print(history) | |
print("end history:") | |
#später wird die summary auf 50 tokens verkürzt, um die Anfrage nicht so teuer werden zu lassen | |
#summary wird gebraucht für die Anfrage beim NN, um eine Überschrift des Eintrages zu generieren | |
summary = "\n\n".join(f'{schluessel}: \n {wert}' for schluessel, wert in dic_history.items()) | |
# um den Chatverlauf für das PDF vorzubereiten: | |
chat_history={} | |
for key, value in dic_history.items(): | |
if key == 'timestamp': | |
chat_history[key] = value | |
elif key == 'user': | |
chat_history[key] = value | |
elif key == 'assistant': | |
chat_history[key] = value | |
#falls file mit summay für download existiert hat: das zunächst löschen | |
#cleanup(file_path_download) | |
#noch nicht im Einsatz, aber hier werden alle Chats einer Sitzung gespeichert | |
#den aktuellen Chatverlauf zum Download bereitstellen: | |
if chats != {} : | |
id_neu = len(chats)+1 | |
chats[id_neu]= summary | |
else: | |
chats[0]= summary | |
#Eine Überschrift zu dem jeweiligen Chatverlauf finden - abhängig vom Inhalt | |
#file_path_download = save_and_download(summary) | |
headers, payload = process_chatverlauf(summary, MODEL_NAME, OAI_API_KEY) | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
#als json ausgeben | |
data = response.json() | |
# Den "content" auswählen, da dort die Antwort der Ki enthalten ist | |
result = data['choices'][0]['message']['content'] | |
worte = result.split() | |
if len(worte) > 2: | |
file_path_download = "data/" + str(len(chats)) + "_Chatverlauf.pdf" | |
else: | |
file_path_download = "data/" + str(len(chats)) + "_" + result + ".pdf" | |
#chat_history in ein File laden | |
# Erzeuge einen Bytestream | |
memoryFile = io.BytesIO() | |
# Erstelle eine Canvas-Instanz im Bytestream | |
c = canvas.Canvas(memoryFile, pagesize=letter) #oder A4 | |
# Zeitstempel | |
c.setFillColorRGB(0, 0, 0) | |
c.setFont("Times-Bold", 14) | |
c.drawCentredString(TIMESTAMP_X + COLUMN_WIDTH / 2, MARGIN, chat_history["timestamp"]) | |
# User und Assistant | |
c.setFont("Courier", 12) | |
c.drawString(USER_X, MARGIN + LINE_SPACING, chat_history["user"] + ": ") | |
c.setFont("Monaco", 14) | |
c.drawString(USER_X + COLUMN_WIDTH, MARGIN + LINE_SPACING, chat_history["assistant"]) | |
# Linie trennen | |
c.line(TIMESTAMP_X, MARGIN + LINE_SPACING + ROW_HEIGHT, ASSISTANT_X + COLUMN_WIDTH, MARGIN + LINE_SPACING + ROW_HEIGHT) | |
# Setze die Schriftart und -grösse | |
c.setFont("Helvetica", 12) | |
# Fülle die Canvas mit dem Inhalt der Textdatei | |
c.drawString(100, 750, summary) | |
# Schließe das Canvas, um das PDF zu rendern | |
c.save() | |
# Verschiebe den Lesekopf an den Beginn des Bytes IO Streams | |
memoryFile.seek(0) | |
# Erstelle ein PdfFileWriter-Objekt aus dem BytesIO-Stream | |
pdf_writer = PdfWriter() | |
pdf_reader = PdfReader(memoryFile) | |
# Gib das PDF-Objekt an PdfFileWriter weiter | |
for page in range(len(pdf_reader.pages)): | |
pdf_writer.add_page(pdf_reader.pages[page]) | |
# Speichere das PDF-Objekt in einer neuen Datei | |
with open(file_path_download, "wb") as fp: | |
pdf_writer.write(fp) | |
#die session variable in gradio erweitern und alle fliepath neu in das gr.File hochladen | |
uploaded_file_paths= uploaded_file_paths + [file_path_download] | |
return None, gr.Image(visible=False), uploaded_file_paths, [], gr.File(uploaded_file_paths, label="Download-Chatverläufe", visible=True, file_count="multiple", interactive = False), chats | |
#wenn löschen Button geklickt | |
def clear_all3(history): | |
#die session variable in gradio erweitern und alle fliepath neu in das gr.File hochladen | |
uploaded_file_paths= "" | |
return None, gr.Image(visible=False), [], | |
############################################## | |
#History - die Frage oder das File eintragen... | |
#in history_file ist ein file gespeichert, falls voher im Verlauf schon ein File hochgeladen wurde. | |
#wird ein neuer File hochgeladen, so wird history_fiel dadurch ersetzt | |
def add_text(chatbot, history, prompt, file, file_history): | |
if (file == None): | |
chatbot = chatbot +[(prompt, None)] | |
else: | |
file_history = file | |
if (prompt == ""): | |
chatbot=chatbot + [((file.name,), "Prompt fehlt!")] | |
else: | |
ext = analyze_file(file) | |
if (ext == "png" or ext == "PNG" or ext == "jpg" or ext == "jpeg" or ext == "JPG" or ext == "JPEG"): | |
chatbot = chatbot +[((file.name,), None), (prompt, None)] | |
else: | |
chatbot = chatbot +[("Hochgeladenes Dokument: "+ get_filename(file) +"\n" + prompt, None)] | |
return chatbot, history, prompt, file, file_history, gr.Image(visible = False), "" #gr.Image( label=None, size=(30,30), visible=False, scale=1) #gr.Textbox(value="", interactive=False) | |
def add_text2(chatbot, prompt): | |
if (prompt == ""): | |
chatbot = chatbot + [("", "Prompt fehlt!")] | |
else: | |
chatbot = chatbot + [(prompt, None)] | |
print("chatbot nach add_text............") | |
print(chatbot) | |
return chatbot, prompt, "" | |
############################################ | |
#nach dem Upload soll das zusätzliche Fenster mit dem image drinnen angezeigt werden | |
def file_anzeigen(file): | |
ext = analyze_file(file) | |
if (ext == "png" or ext == "PNG" or ext == "jpg" or ext == "jpeg" or ext == "JPG" or ext == "JPEG"): | |
return gr.Image(width=47, visible=True, interactive = False, height=47, min_width=47, show_label=False, show_share_button=False, show_download_button=False, scale = 0.5), file, file | |
else: | |
return gr.Image(width=47, visible=True, interactive = False, height=47, min_width=47, show_label=False, show_share_button=False, show_download_button=False, scale = 0.5), "data/file.png", file | |
def file_loeschen(): | |
return None, gr.Image(visible = False) | |
############################################ | |
#wenn 'Stop' Button geklickt, dann Message dazu und das Eingabe-Fenster leeren | |
def cancel_outputing(): | |
reset_textbox() | |
return "Stop Done" | |
def reset_textbox(): | |
return gr.update(value=""),"" | |
########################################## | |
#Hilfsfunktion, um ein von Stable Diffusion erzeugtes Bild für die Ausgabe in der History vorzubereiten | |
def umwandeln_fuer_anzeige(image): | |
buffer = io.BytesIO() | |
image.save(buffer, format='PNG') | |
return buffer.getvalue() | |
################################################## | |
#openassistant um uploaded Files zu analysieren | |
def create_assistant_file(prompt, file): | |
global client, general_assistant_file | |
#neues File dem Assistant hinzufügen | |
file_neu = client.files.create(file=open(file,"rb",),purpose="assistants",) | |
# Update Assistant | |
#wenn type: code_interpreter, wird das file mit angehängt an den Prpmt, aber vorher nicht bearbeitet | |
#wenn type: retrieval, wird das Dokument vorher embedded in einem vektorstore und nur entsprechende chunks mitgegeben. | |
#pro Assistant 20 cent pro Tag als Nutzung - egal wie viele Fragen dazu. | |
updated_assistant = client.beta.assistants.update(general_assistant_file.id,tools=[{"type": "code_interpreter"}, {"type": "retrieval"}],file_ids=[file_neu.id],) | |
thread_file, run = create_thread_and_run(prompt, client, updated_assistant.id) | |
run = wait_on_run(run, thread_file, client) | |
response = get_response(thread_file, client, updated_assistant.id) | |
result = response.data[1].content[0].text.value | |
return result | |
################################################## | |
#openassistant um im Netz zu suchen | |
def create_assistant_suche(prompt): | |
#global client, general_assistant_suche | |
retriever = TavilySearchAPIRetriever(k=4) | |
result = retriever.invoke(template + prompt) | |
erg = "Aus dem Internet: " + result[0].page_content + ".\n Quelle: " | |
src = result[0].metadata['source'] | |
""" | |
#neues Thread mit akt. prompt dem Assistant hinzufügen | |
thread_suche, run = create_thread_and_run(prompt, client, general_assistant_suche.id) | |
run = wait_on_run(run, thread_suche, client) | |
response = get_response(thread_suche, client, general_assistant_suche.id) | |
result = response.data[1].content[0].text.value | |
""" | |
return erg + src | |
#huggingchat um im Netz zu suchen | |
def create_assistant_suche_hf(chatbot, prompt): | |
erg, src = hugchat_search(chatbot, prompt) | |
return erg + src | |
################################################### | |
#Funktion von Gradio aus, die den dort eingegebenen Prompt annimmt und weiterverarbeitet | |
################################################### | |
######################################################### | |
#Funktion wird direkt aufgerufen aus der GUI - von hier muss auch die Rückmeldung kommen.... | |
#man kann einen Text-Prompt eingeben (mit oder ohne RAG), dazu ein Image hochladen, ein Bild zu einem reinen textprompt erzeugen lassen | |
def generate_auswahl(prompt_in, file, file_history, chatbot, history, rag_option, model_option, openai_api_key, k=3, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3,top_k=35, websuche="Aus"): | |
global splittet, db | |
#wenn RAG angeschaltet - Vektorstore initialisieren | |
#aber nur, wenn es noch nicht geshehen ist (splittet = False) | |
#falls schon ein File hochgeladen wurde, ist es in history_file gespeichert - falls ein neues File hochgeladen wurde, wird es anschließend neu gesetzt | |
neu_file = file_history | |
#prompt normalisieren bevor er an die KIs geht | |
prompt = normalise_prompt(prompt_in) | |
if (rag_option == "An"): | |
#muss nur einmal ausgeführt werden... | |
if not splittet: | |
splits = document_loading_splitting() | |
document_storage_chroma(splits) | |
db = document_retrieval_chroma2() | |
splittet = True | |
#else: #unnötig, da wenn Vektorstor einmal für alle user eingerichtet, wer weiter besthen bleiben kann - die unterschiedlichen Propmt werden dann später je nach rag_option erzeugt | |
#db=None | |
#splittet = False #sonst würde es für alle User wieder ausgeschaltet - Alternative: gr.State(False) dazu anlegen | |
#kein Bild hochgeladen -> auf Text antworten... | |
status = "Antwort der KI ..." | |
if (file == None and file_history == None): | |
result, status = generate_text(prompt, chatbot, history, rag_option, model_option, openai_api_key, db, websuche, k=3, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=3) | |
history = history + [[prompt, result]] | |
else: | |
#Es wurde ein File neu angehängt -> wenn prompt dazu, das Bild analysieren | |
#das history_fiel muss neu gesetzt werden | |
if (file != None): | |
# file_history wird neu gesetzt in der Rückgabe dieser Funktion... | |
neu_file = file | |
#herausfinden, ob Bild oder Dokument... | |
ext = analyze_file(neu_file) | |
if (ext == "png" or ext == "PNG" or ext == "jpg" or ext == "jpeg" or ext == "JPG" or ext == "JPEG"): | |
result= generate_text_zu_bild(neu_file, prompt, k, rag_option, chatbot, history, db) | |
else: | |
result = generate_text_zu_doc(neu_file, prompt, k, rag_option, chatbot, history, db) | |
#die history erweitern - abhängig davon, ob gerade ein file hochgeladen wurde oder nicht | |
if (file != None): | |
history = history + [[(file,), None],[prompt, result]] | |
else: | |
history = history + [[prompt, result]] | |
chatbot[-1][1] = "" | |
for character in result: | |
chatbot[-1][1] += character | |
time.sleep(0.03) | |
yield chatbot, history, None, neu_file, status | |
if shared_state.interrupted: | |
shared_state.recover() | |
try: | |
yield chatbot, history, None, neu_file, "Stop: Success" | |
except: | |
pass | |
################################################## | |
#zu einem Text-Prompt ein Bild via Stable Diffusion generieren | |
def generate_bild(prompt, chatbot, model_option_zeichnen='HuggingFace', temperature=0.5, max_new_tokens=4048,top_p=0.6, repetition_penalty=1.3): | |
global client | |
if (model_option_zeichnen == "Stable Diffusion"): | |
print("Bild Erzeugung HF..............................") | |
#Bild nach Anweisung zeichnen und in History darstellen... | |
data = {"inputs": prompt} | |
response = requests.post(API_URL, headers=HEADERS, json=data) | |
print("fertig Bild") | |
result = response.content | |
#Bild ausgeben | |
image = Image.open(io.BytesIO(result)) | |
image_64 = umwandeln_fuer_anzeige(image) | |
chatbot[-1][1]= "<img src='data:image/png;base64,{0}'/>".format(base64.b64encode(image_64).decode('utf-8')) | |
else: | |
print("Bild Erzeugung DallE..............................") | |
#als Format ginge auch 'url', n - Anz. der erzeugten Bilder | |
response = client.images.generate(model="dall-e-3",prompt=prompt,size="1024x1024",quality="standard",n=1, response_format='b64_json') | |
#chatbot[-1][1]= "<img src='data:image/png;base64,{0}'/>".format(base64.b64encode(image_64).decode('utf-8')) | |
chatbot[-1][1] = "<img src='data:image/png;base64,{0}'/>".format(response.data[0].b64_json) | |
return chatbot, "Antwort KI: Success" | |
################################################## | |
#zu einem Bild und Text-Prompt eine Analyse generieren | |
def generate_text_zu_bild(file, prompt, k, rag_option, chatbot, history, db): | |
global splittet | |
print("Text mit Bild ..............................") | |
prompt_neu = generate_prompt_with_history(prompt, history) | |
if (rag_option == "An"): | |
print("Bild mit RAG..............................") | |
neu_text_mit_chunks = rag_chain2(prompt, db, k) | |
#für Chat LLM: | |
#prompt = generate_prompt_with_history_openai(neu_text_mit_chunks, history) | |
#als reiner prompt: | |
prompt_neu = generate_prompt_with_history(neu_text_mit_chunks, history) | |
headers, payload = process_image(file, prompt_neu, MODEL_NAME_IMAGE, OAI_API_KEY) | |
response = requests.post("https://api.openai.com/v1/chat/completions", headers=headers, json=payload) | |
#als json ausgeben | |
data = response.json() | |
# Den "content" auswählen, da dort die Antwort der Ki enthalten ist | |
result = data['choices'][0]['message']['content'] | |
return result | |
################################################## | |
#zu einem Bild und Text-Prompt eine Analyse generieren | |
def generate_text_zu_doc(file, prompt, k, rag_option, chatbot, history, db): | |
global splittet | |
print("text mit doc ..............................") | |
prompt_neu = generate_prompt_with_history(prompt, history) | |
if (rag_option == "An"): | |
print("Doc mit RAG..............................") | |
neu_text_mit_chunks = rag_chain2(prompt, db, k) | |
#für Chat LLM: | |
#prompt_neu = generate_prompt_with_history_openai(neu_text_mit_chunks, history) | |
#als reiner prompt: | |
prompt_neu = generate_prompt_with_history(neu_text_mit_chunks, history) | |
result = create_assistant_file(prompt_neu, file) | |
return result | |
#################################################### | |
#aus einem Text-Prompt die Antwort von KI bekommen | |
#mit oder ohne RAG möglich | |
def generate_text (prompt, chatbot, history, rag_option, model_option, openai_api_key, db, websuche, k=3, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=35): | |
global splittet | |
#hugchat=False | |
suche_im_Netz="Antwort der KI ..." | |
print("Text pur..............................") | |
if (openai_api_key == "" or openai_api_key == "sk-"): | |
#raise gr.Error("OpenAI API Key is required.") | |
#eigenen OpenAI key nutzen | |
openai_api_key= OAI_API_KEY | |
if (rag_option is None): | |
raise gr.Error("Retrieval Augmented Generation ist erforderlich.") | |
if (prompt == ""): | |
raise gr.Error("Prompt ist erforderlich.") | |
#history für HuggingFace Models formatieren | |
#history_text_und_prompt = generate_prompt_with_history_hf(prompt, history) | |
#history für openAi formatieren | |
#history_text_und_prompt = generate_prompt_with_history_openai(prompt, history) | |
#history für Langchain formatieren | |
#history_text_und_prompt = generate_prompt_with_history_langchain(prompt, history) | |
try: | |
if (websuche=="Aus"): | |
########################### | |
#LLM auswählen (OpenAI oder HF) | |
########################### | |
if (model_option == "OpenAI"): | |
#Anfrage an OpenAI ---------------------------- | |
print("OpenAI Anfrage.......................") | |
llm = ChatOpenAI(model_name = MODEL_NAME, openai_api_key = openai_api_key, temperature=temperature)#, top_p = top_p) | |
#Prompt an history anhängen und einen Text daraus machen | |
if (rag_option == "An"): | |
history_text_und_prompt = generate_prompt_with_history(prompt, history) | |
else: | |
history_text_und_prompt = generate_prompt_with_history_openai(prompt, history) | |
else: | |
#oder an Hugging Face -------------------------- | |
print("HF Anfrage.......................") | |
model_kwargs={"temperature": 0.5, "max_length": 512, "num_return_sequences": 1, "top_k": top_k, "top_p": top_p, "repetition_penalty": repetition_penalty} | |
llm = HuggingFaceHub(repo_id=repo_id, model_kwargs=model_kwargs) | |
#llm = HuggingFaceChain(model=MODEL_NAME_HF, model_kwargs={"temperature": 0.5, "max_length": 128}) | |
#llm = HuggingFaceHub(url_??? = "https://wdgsjd6zf201mufn.us-east-1.aws.endpoints.huggingface.cloud", model_kwargs={"temperature": 0.5, "max_length": 64}) | |
#llm = HuggingFaceTextGenInference( inference_server_url="http://localhost:8010/", max_new_tokens=max_new_tokens,top_k=10,top_p=top_p,typical_p=0.95,temperature=temperature,repetition_penalty=repetition_penalty,) | |
#llm via HuggingChat | |
#llm = hugchat.ChatBot(cookies=cookies.get_dict()) | |
#hugchat=True #da dieses Model in llm_chain bzw reag_chain anderes verarbeitet wird | |
print("HF") | |
#Prompt an history anhängen und einen Text daraus machen | |
history_text_und_prompt = generate_prompt_with_history(prompt, history) | |
#zusätzliche Dokumenten Splits aus DB zum Prompt hinzufügen (aus VektorDB - Chroma oder Mongo DB) | |
if (rag_option == "An"): | |
print("LLM aufrufen mit RAG: ...........") | |
result = rag_chain(llm, history_text_und_prompt, db) #für hugchat noch kein rag möglich... | |
else: | |
#splittet = False | |
print("LLM aufrufen ohne RAG: ...........") | |
resulti = llm_chain(llm, history_text_und_prompt) | |
result = resulti.strip() | |
""" | |
#Alternativ mit API_URL - aber das model braucht 93 B Space!!! | |
data = {"inputs": prompt, "options": {"max_new_tokens": max_new_tokens},} | |
response = requests.post(API_URL_TEXT, headers=HEADERS, json=data) | |
result = response.json() | |
print("responseresult.............................") | |
print(result) | |
chatbot_response = result[0]['generated_text'] | |
print("anzahl tokens gesamt antwort:------------------") | |
print (len(chatbot_response.split())) | |
chatbot_message = chatbot_response[len(prompt):].strip() | |
print("history/chatbot_rsponse:--------------------------------") | |
print(history) | |
print(chatbot_message) | |
result = chatbot_message | |
""" | |
else: #Websuche ist An | |
print("Suche im Netz: ...........") | |
suche_im_Netz="Antwort aus dem Internet ..." | |
#Prompt an history anhängen und einen Text daraus machen | |
history_text_und_prompt = generate_prompt_with_history(prompt, history) | |
#if (hugchat): | |
#mit hugchat | |
#result = create_assistant_suche_hf(llm, history_text_und_prompt) | |
#else: | |
#mit tavily: | |
result = create_assistant_suche(history_text_und_prompt) | |
""" | |
#Wenn keine Antwort möglich "Ich weiß es nicht" etc., dann versuchen mit Suche im Internet. | |
if (result == None or is_response_similar(result)): | |
print("Suche im Netz: ...........") | |
suche_im_Netz="Antwort aus dem Internet ..." | |
result = create_assistant_suche(prompt) | |
""" | |
except Exception as e: | |
raise gr.Error(e) | |
return result, suche_im_Netz | |
#Funktion wird direkt aufgerufen aus der GUI - von hier muss auch die Rückmeldung kommen.... | |
#man kann einen Text-Prompt eingeben , dazu ein Image hochladen, und dann dazu code erzeugen lassen | |
def generate_code(prompt_in, file, file_history, chatbot, history, model_option, openai_api_key, k=3, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3,top_k=35): | |
#prompt normalisieren bevor er an die KIs geht | |
prompt = normalise_prompt(prompt_in) | |
#falls schon ein File hochgeladen wurde, ist es in history_file gespeichert - falls ein neues File hochgeladen wurde, wird es anschließend neu gesetzt | |
neu_file = file_history | |
#kein Bild hochgeladen -> auf Text antworten... | |
status = "Antwort der KI ..." | |
if (file == None and file_history == None): | |
result, status = generate_code_antwort(prompt, chatbot, history, model_option, openai_api_key, k=3, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=35) | |
history = history + [[prompt, result]] | |
else: | |
#Es wurde ein File neu angehängt -> wenn prompt dazu, das Bild analysieren | |
#das history_fiel muss neu gesetzt werden | |
if (file != None): | |
# file_history wird neu gesetzt in der Rückgabe dieser Funktion... | |
neu_file = file | |
#herausfinden, ob Bild oder Dokument... | |
ext = analyze_file(neu_file) | |
if (ext == "png" or ext == "PNG" or ext == "jpg" or ext == "jpeg" or ext == "JPG" or ext == "JPEG"): | |
result= generate_text_zu_bild(neu_file, prompt, k, rag_option, chatbot, history, db) | |
else: | |
result = generate_text_zu_doc(neu_file, prompt, k, rag_option, chatbot, history, db) | |
#die history erweitern - abhängig davon, ob gerade ein file hochgeladen wurde oder nicht | |
if (file != None): | |
history = history + [[(file,), None],[prompt, result]] | |
else: | |
history = history + [[prompt, result]] | |
chatbot[-1][1] = "" | |
for character in result: | |
chatbot[-1][1] += character | |
time.sleep(0.03) | |
yield chatbot, history, None, neu_file, status | |
if shared_state.interrupted: | |
shared_state.recover() | |
try: | |
yield chatbot, history, None, neu_file, "Stop: Success" | |
except: | |
pass | |
#################################################### | |
#aus einem Text-Prompt die Antwort von KI bekommen | |
#mit oder ohne RAG möglich | |
def generate_code_antwort (prompt, chatbot, history, model_option, openai_api_key, k=3, top_p=0.6, temperature=0.5, max_new_tokens=4048, max_context_length_tokens=2048, repetition_penalty=1.3, top_k=35): | |
suche_im_Netz="Antwort der KI ..." | |
print("Text pur..............................") | |
if (openai_api_key == "" or openai_api_key == "sk-"): | |
#raise gr.Error("OpenAI API Key is required.") | |
#eigenen OpenAI key nutzen | |
openai_api_key= OAI_API_KEY | |
if (prompt == ""): | |
raise gr.Error("Prompt ist erforderlich.") | |
try: | |
########################### | |
#LLM auswählen (OpenAI oder HF) | |
########################### | |
if (model_option == "Davinci"): | |
#Anfrage an OpenAI ---------------------------- | |
print("OpenAI Anfrage.......................") | |
llm = ChatOpenAI(model_name = MODEL_NAME_CODE, openai_api_key = openai_api_key, temperature=temperature)#, top_p = top_p) | |
#Prompt an history anhängen und einen Text daraus machen | |
history_text_und_prompt = generate_prompt_with_history_openai(prompt, history) | |
else: | |
llm = ChatOpenAI(model_name = MODEL_NAME_IMAGE, openai_api_key = openai_api_key, temperature=temperature)#, top_p = top_p) | |
#Prompt an history anhängen und einen Text daraus machen | |
history_text_und_prompt = generate_prompt_with_history_openai(prompt, history) | |
print("LLM aufrufen ohne RAG: ...........") | |
resulti = llm_chain(llm, history_text_und_prompt) | |
result = resulti.strip() | |
except Exception as e: | |
raise gr.Error(e) | |
return result, suche_im_Netz | |
################################################ | |
#GUI | |
############################################### | |
#Beschreibung oben in GUI | |
################################################ | |
#css = """.toast-wrap { display: none !important } """ | |
#examples=[['Was ist ChtGPT-4?'],['schreibe ein Python Programm, dass die GPT-4 API aufruft.']] | |
def vote(data: gr.LikeData): | |
if data.liked: print("You upvoted this response: " + data.value) | |
else: print("You downvoted this response: " + data.value) | |
def custom_css(): | |
return """ | |
body, html { | |
background-color: #303030; /* Dunkler Hintergrund */ | |
color:#353535; | |
} | |
""" | |
print ("Start GUIneu") | |
with open("custom.css", "r", encoding="utf-8") as f: | |
customCSS = f.read() | |
#Add Inputs für Tab 2 | |
additional_inputs = [ | |
gr.Slider(label="Temperature", value=0.65, minimum=0.0, maximum=1.0, step=0.05, interactive=True, info="Höhere Werte erzeugen diversere Antworten", visible=True), | |
gr.Slider(label="Max new tokens", value=1024, minimum=0, maximum=4096, step=64, interactive=True, info="Maximale Anzahl neuer Tokens", visible=True), | |
gr.Slider(label="Top-p (nucleus sampling)", value=0.6, minimum=0.0, maximum=1, step=0.05, interactive=True, info="Höhere Werte verwenden auch Tokens mit niedrigerer Wahrscheinlichkeit.", visible=True), | |
gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Strafe für wiederholte Tokens", visible=True) | |
] | |
with gr.Blocks(css=custom_css(), theme=themeAlex) as demo: | |
#Session Variablen, um Weete zu speichern, auch wenn die Felder in der GUI bereits wieder leer sind | |
# history parallel zu chatbot speichern - da in chatbot bei Bildern zum Anzeigen in der GUI die Bilder speziell formatiert werden, | |
# für die Übergabe an die ki aber der Pfad zum Bild behalten werden muss - was in der history der Fall ist! | |
history = gr.State([]) | |
uploaded_file_paths= gr.State([]) | |
history3 = gr.State([]) | |
uploaded_file_paths3= gr.State([]) | |
#alle chats einer Session sammeln | |
chats = gr.State({}) | |
#damit der Prompt auch nach dem upload in die History noch für predicts_args verfügbar ist | |
user_question = gr.State("") | |
#für die anderen Tabs auch... | |
#damit der Prompt auch nach dem upload in die History noch für predicts_args verfügbar ist | |
user_question2 = gr.State("") | |
user_question3 = gr.State("") | |
attached_file = gr.State(None) | |
attached_file_history = gr.State(None) | |
attached_file3 = gr.State(None) | |
attached_file_history3 = gr.State(None) | |
status_display = gr.State("") | |
status_display2 = gr.State("") | |
status_display3 = gr.State("") | |
################################################ | |
# Tab zum Chatbot mit Text oder Bildeingabe | |
################################################ | |
gr.Markdown(description_top) | |
with gr.Tab("LI Chatbot"): | |
with gr.Row(): | |
#gr.HTML("LI Chatot") | |
status_display = gr.Markdown("Antwort der KI ...", visible = True) #, elem_id="status_display") | |
with gr.Row(): | |
with gr.Column(scale=5): | |
with gr.Row(): | |
chatbot = gr.Chatbot(elem_id="li-chat",show_copy_button=True) | |
with gr.Row(): | |
with gr.Column(scale=12): | |
user_input = gr.Textbox( | |
show_label=False, placeholder="Gib hier deinen Prompt ein...", | |
container=False | |
) | |
with gr.Column(min_width=70, scale=1): | |
submitBtn = gr.Button("Senden") | |
with gr.Column(min_width=70, scale=1): | |
cancelBtn = gr.Button("Stop") | |
with gr.Row(): | |
image_display = gr.Image( visible=False) | |
upload = gr.UploadButton("📁", file_types=["image", "pdf", "docx", "pptx", "xlsx"], scale = 10) | |
emptyBtn = gr.ClearButton([user_input, chatbot, history, attached_file, attached_file_history, image_display], value="🧹 Neue Session", scale=10) | |
with gr.Column(): | |
with gr.Column(min_width=50, scale=1): | |
with gr.Tab(label="Chats ..."): | |
#Geht nicht, da für alle gleichzeitig sichtbar | |
#chat_selector = gr.CheckboxGroup(label="", choices=update_chat_options()) | |
#download_button = gr.Button("Download ausgewählte Chats") | |
file_download = gr.File(label="Noch keine Chatsverläufe", visible=True, interactive = False, file_count="multiple",) | |
with gr.Tab(label="Parameter"): | |
#gr.Markdown("# Parameters") | |
rag_option = gr.Radio(["Aus", "An"], label="LI Erweiterungen (RAG)", value = "Aus") | |
model_option = gr.Radio(["OpenAI", "HuggingFace"], label="Modellauswahl", value = "HuggingFace") | |
websuche = gr.Radio(["Aus", "An"], label="Web-Suche", value = "Aus") | |
top_p = gr.Slider( | |
minimum=-0, | |
maximum=1.0, | |
value=0.95, | |
step=0.05, | |
interactive=True, | |
label="Top-p", | |
visible=False, | |
) | |
top_k = gr.Slider( | |
minimum=1, | |
maximum=100, | |
value=35, | |
step=1, | |
interactive=True, | |
label="Top-k", | |
visible=False, | |
) | |
temperature = gr.Slider( | |
minimum=0.1, | |
maximum=2.0, | |
value=0.5, | |
step=0.1, | |
interactive=True, | |
label="Temperature", | |
visible=False | |
) | |
max_length_tokens = gr.Slider( | |
minimum=0, | |
maximum=512, | |
value=512, | |
step=8, | |
interactive=True, | |
label="Max Generation Tokens", | |
visible=False, | |
) | |
max_context_length_tokens = gr.Slider( | |
minimum=0, | |
maximum=4096, | |
value=2048, | |
step=128, | |
interactive=True, | |
label="Max History Tokens", | |
visible=False, | |
) | |
repetition_penalty=gr.Slider(label="Repetition penalty", value=1.2, minimum=1.0, maximum=2.0, step=0.05, interactive=True, info="Strafe für wiederholte Tokens", visible=False) | |
anzahl_docs = gr.Slider(label="Anzahl Dokumente", value=3, minimum=1, maximum=10, step=1, interactive=True, info="wie viele Dokumententeile aus dem Vektorstore an den prompt gehängt werden", visible=False) | |
openai_key = gr.Textbox(label = "OpenAI API Key", value = "sk-", lines = 1, visible = False) | |
################################################ | |
# Tab zum Zeichnen mit Stable Diffusion | |
################################################ | |
with gr.Tab("LI Zeichnen"): | |
with gr.Row(): | |
gr.HTML("Lass den KI-Bot deine Ideen zeichnen...") | |
status_display2 = gr.Markdown("Success", visible = False, elem_id="status_display") | |
#gr.Markdown(description2) | |
with gr.Row(): | |
with gr.Column(scale=5): | |
with gr.Row(): | |
chatbot_bild = gr.Chatbot(elem_id="li-zeichnen",show_copy_button=True, show_share_button=True) | |
with gr.Row(): | |
with gr.Column(scale=12): | |
user_input2 = gr.Textbox( | |
show_label=False, placeholder="Gib hier deinen Prompt ein...", | |
container=False | |
) | |
with gr.Column(min_width=70, scale=1): | |
submitBtn2 = gr.Button("Senden") | |
#with gr.Column(min_width=70, scale=1): | |
#cancelBtn2 = gr.Button("Stop") | |
with gr.Row(): | |
emptyBtn2 = gr.ClearButton([user_input, chatbot_bild], value="🧹 Neue Session", scale=10) | |
#additional_inputs_accordion = gr.Accordion(label="Weitere Eingaben...", open=False) | |
with gr.Column(): | |
with gr.Column(min_width=50, scale=1): | |
with gr.Tab(label="Parameter Einstellung"): | |
#gr.Markdown("# Parameters") | |
model_option_zeichnen = gr.Radio(["Stable Diffusion","DallE"], label="Modellauswahl", value = "Stable Diffusion") | |
with gr.Tab("LI Codebot"): | |
with gr.Row(): | |
gr.HTML("Gib als textuelle Beschreibung ein, was in Programmcode übersetzt werden soll und in welcher Sprache...") | |
status_display3 = gr.Markdown("Success", visible = False, elem_id="status_display") | |
#gr.Markdown(description2) | |
with gr.Row(): | |
with gr.Column(scale=5): | |
with gr.Row(): | |
chatbot_code = gr.Chatbot(elem_id="li-zeichnen",show_copy_button=True, show_share_button=True) | |
with gr.Row(): | |
with gr.Column(scale=12): | |
user_input3 = gr.Textbox( | |
show_label=False, placeholder="Gib hier deinen Prompt ein...", | |
container=False | |
) | |
with gr.Column(min_width=70, scale=1): | |
submitBtn3 = gr.Button("Senden") | |
with gr.Column(min_width=70, scale=1): | |
cancelBtn3 = gr.Button("Stop") | |
with gr.Row(): | |
#file_display = gr.File(visible=False) | |
image_display3 = gr.Image( visible=False) | |
upload3 = gr.UploadButton("📁", file_types=["image", "pdf", "docx", "pptx", "xlsx"], scale = 10) | |
emptyBtn3 = gr.ClearButton([user_input3, chatbot_code, history3, attached_file3, image_display3], value="🧹 Neue Session", scale=10) | |
with gr.Column(): | |
with gr.Column(min_width=50, scale=1): | |
with gr.Tab(label="Parameter Einstellung"): | |
#gr.Markdown("# Parameters") | |
model_option_code3 = gr.Radio(["Davinci","kommt noch"], label="Modellauswahl", value = "Davinci") | |
gr.Markdown(description) | |
###################################### | |
# Events und Übergabe Werte an Funktionen | |
####################################### | |
###################################### | |
# Für Tab 1: Chatbot | |
#Argumente für generate Funktion als Input | |
predict_args = dict( | |
fn=generate_auswahl, | |
inputs=[ | |
user_question, | |
attached_file, | |
attached_file_history, | |
chatbot, | |
history, | |
rag_option, | |
model_option, | |
openai_key, | |
anzahl_docs, | |
top_p, | |
temperature, | |
max_length_tokens, | |
max_context_length_tokens, | |
repetition_penalty, | |
top_k, | |
websuche | |
], | |
outputs=[chatbot, history, attached_file, attached_file_history, status_display], | |
show_progress=True, | |
) | |
reset_args = dict( | |
fn=reset_textbox, inputs=[], outputs=[user_input, status_display] | |
) | |
# Chatbot | |
transfer_input_args = dict( | |
fn=add_text, inputs=[chatbot, history, user_input, attached_file, attached_file_history], outputs=[chatbot, history, user_question, attached_file, attached_file_history, image_display , user_input], show_progress=True | |
) | |
predict_event1 = user_input.submit(**transfer_input_args, queue=False,).then(**predict_args) | |
predict_event2 = submitBtn.click(**transfer_input_args, queue=False,).then(**predict_args) | |
predict_event3 = upload.upload(file_anzeigen, [upload], [image_display, image_display, attached_file] ) #.then(**predict_args) | |
emptyBtn.click(clear_all, [history, uploaded_file_paths, chats], [attached_file, image_display, uploaded_file_paths, history, file_download, chats]) | |
#Bild Anzeige neben dem Button wieder entfernen oder austauschen.. | |
image_display.select(file_loeschen, [], [attached_file, image_display]) | |
#download_button.click(fn=download_chats, inputs=chat_selector, outputs=[file_download]) | |
#Berechnung oder Ausgabe anhalten (kann danach fortgesetzt werden) | |
cancelBtn.click(cancel_outputing, [], [status_display], cancels=[predict_event1,predict_event2, predict_event3]) | |
###################################### | |
# Für Tab 2: Zeichnen | |
predict_args2 = dict( | |
fn=generate_bild, | |
inputs=[ | |
user_question2, | |
chatbot_bild, | |
model_option_zeichnen, | |
#additional_inputs, | |
], | |
outputs=[chatbot_bild, status_display2], #[chatbot, history, status_display] | |
show_progress=True, | |
) | |
transfer_input_args2 = dict( | |
fn=add_text2, inputs=[chatbot_bild, user_input2], outputs=[chatbot_bild, user_question2, user_input2], show_progress=True | |
) | |
predict_event2_1 = user_input2.submit(**transfer_input_args2, queue=False,).then(**predict_args2) | |
predict_event2_2 = submitBtn2.click(**transfer_input_args2, queue=False,).then(**predict_args2) | |
#emptyBtn2.click(clear_all, [], [file_display, image_display]) | |
#cancelBtn2.click( | |
#cancels=[predict_event2_1,predict_event2_2 ] | |
#) | |
###################################### | |
# Für Tab 3: Codebot | |
#Argumente für generate Funktion als Input | |
predict_args3 = dict( | |
fn=generate_code, | |
inputs=[ | |
user_question3, | |
attached_file3, | |
attached_file_history3, | |
chatbot_code, | |
history3, | |
model_option, | |
openai_key, | |
top_p, | |
temperature, | |
max_length_tokens, | |
max_context_length_tokens, | |
repetition_penalty, | |
top_k | |
], | |
outputs=[chatbot_code, history3, attached_file3, status_display3], | |
show_progress=True, | |
) | |
reset_args3 = dict( | |
fn=reset_textbox, inputs=[], outputs=[user_input3, status_display3] | |
) | |
# Chatbot | |
transfer_input_args3 = dict( | |
fn=add_text, inputs=[chatbot_code, history3, user_input3, attached_file3, attached_file_history3], outputs=[chatbot_code, history3, user_question3, attached_file3, attached_file_history3, image_display3, user_input3], show_progress=True | |
) | |
predict_event3_1 = user_input3.submit(**transfer_input_args3, queue=False,).then(**predict_args3) | |
predict_event3_2 = submitBtn3.click(**transfer_input_args3, queue=False,).then(**predict_args3) | |
predict_event3_3 = upload3.upload(file_anzeigen, [upload3], [image_display3, image_display3, attached_file3] ) #.then(**predict_args) | |
emptyBtn3.click(clear_all3, [history3], [attached_file3, image_display3, history3]) | |
#Bild Anzeige neben dem Button wieder entfernen oder austauschen.. | |
image_display3.select(file_loeschen, [], [attached_file3, image_display3]) | |
#download_button.click(fn=download_chats, inputs=chat_selector, outputs=[file_download]) | |
demo.title = "LI-ChatBot" | |
demo.queue(default_concurrency_limit=15).launch(debug=True) | |