import gradio as gr
import fitz # PyMuPDF
import os
import re
import requests
from huggingface_hub import HfApi
import base64
from io import BytesIO
import urllib.parse
import tempfile
from sklearn.metrics.pairwise import cosine_similarity
from docx import Document
import asyncio
import docx
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer, util
from fuzzywuzzy import fuzz
import nltk
from nltk.stem import PorterStemmer
from nltk.corpus import stopwords
from nltk.tokenize import RegexpTokenizer
from transformers import BertModel, BertTokenizer
from nltk.tokenize import word_tokenize
from nltk.stem.snowball import SnowballStemmer
import torch
# Zugriff auf das Secret als Umgebungsvariable
HF_READ = os.getenv("HF_READ")
HF_WRITE = os.getenv("HF_WRITE")
# Relativer Pfad zum Verzeichnis mit den Dokumenten
DOCS_DIR = "kkg_dokumente"
# Konstanten für Datei-Upload
REPO_ID = "alexkueck/kkg_suche"
REPO_TYPE = "space"
# HfApi-Instanz erstellen
api = HfApi()
# Falls noch nicht geschehen, müssen Sie die NLTK Ressourcen herunterladen
nltk.download('punkt')
nltk.download('stopwords')
german_stopwords = set(stopwords.words('german'))
########################################################
##########Ki Modell für Embeddings der Suchanfrage nutzen
# Laden des Sentence-Transformer-Modells
# Laden des vortrainierten Sentence-BERT-Modells
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
########################################################
######## Hilfsfunktionen für die Suche #################
# Funktion zum Extrahieren von Text aus PDF - und Word
def extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
pages = []
for page in doc:
text = page.get_text()
# Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist
lines = text.split('\n')
header = lines[0] if lines else ''
content = '\n'.join(lines[1:]) if len(lines) > 1 else ''
pages.append({'header': header, 'content': content})
return pages
def extract_text_from_docx(docx_path):
doc = Document(docx_path)
pages = []
current_page = []
header = ''
for para in doc.paragraphs:
if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen
if current_page:
pages.append({'header': header, 'content': '\n'.join(current_page)})
current_page = []
header = para.text
else:
current_page.append(para.text)
if current_page: # Letzte Seite hinzufügen
pages.append({'header': header, 'content': '\n'.join(current_page)})
return pages
# Initialisierung der Dokumente - Dictionary um die Dokuemtneteninhalte, Switen und Überschriften zu halten
def initialize_documents():
documents = []
if os.path.exists(DOCS_DIR):
for file_name in os.listdir(DOCS_DIR):
if file_name.endswith(".pdf"):
pdf_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_pdf(pdf_path)
documents.append({"file": file_name, "pages": pages})
elif file_name.endswith(".docx"):
docx_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_docx(docx_path)
documents.append({"file": file_name, "pages": pages})
return documents
#######################################################
#nach relevanten suche -> download Link der passenden Dokuemtne erstellen
def download_link(doc_name):
# URL für das Herunterladen der Datei
file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}"
return f'{doc_name}'
# Zeitelumbrüche entfernen - bei einzelnen, mehrere hinterienander zu einem zusammenfassen
#zur Ziet nicht im Einsatz
def remove_line_breaks(text):
# Entfernt alle einzelnen Zeilenumbrüche
#text = re.sub(r'(?"
files_table += "
Dateiname | Größe (KB) |
"
for i, file in enumerate(files):
file_path = os.path.join(DOCS_DIR, file)
file_size = os.path.getsize(file_path) / 1024 # Größe in KB
row_color = "#4f4f4f" if i % 2 == 0 else "#3a3a3a" # Wechselnde Zeilenfarben
files_table += f""
files_table += f"{download_link(file)} | "
files_table += f"{file_size:.2f} |
"
files_table += ""
return files_table
# gefundene relevante Dokumente auflisten (links)
def list_pdfs():
if not os.path.exists(DOCS_DIR):
return []
return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')]
###########################################################
############# KI um Suchanfrage zu Embedden ###############
#um ähnliche Wörter anhand ihres Wortstammes zu erkennen
# Funktion zur Stemmatisierung des Textes
def preprocess_text(text):
if not text:
return ""
text = text.lower()
tokenizer = RegexpTokenizer(r'\w+')
word_tokens = tokenizer.tokenize(text)
filtered_words = [word for word in word_tokens if word not in german_stopwords]
stemmer = SnowballStemmer("german")
stemmed_words = [stemmer.stem(word) for word in filtered_words]
return " ".join(stemmed_words)
# Funktion zur Bereinigung des Textes aus den Pdfs und Word Dokuemtne, um den Tokenizer nicht zu überfordern
def clean_text(text):
# Entfernen nicht druckbarer Zeichen
text = re.sub(r'[^\x00-\x7F]+', ' ', text)
# Ersetzen ungewöhnlicher Leerzeichen durch normale Leerzeichen
text = re.sub(r'\s+', ' ', text)
return text.strip()
# Durchsuchen von Dokumenten
def search_documents(query):
documents = initialize_documents()
# Texte und Überschriften in die Embeddings aufnehmen
texts = [page['content'] for doc in documents for page in doc['pages']]
# Stemming von texten und query (also auf Wort-grundformen bringen) - und vorher unwichtige Wörter entfernen - um Suchergebnis zu verbessern
#es soll auf die contents der seiten und die Überschriften angewendet werden
# Texte und Überschriften in die Embeddings aufnehmen
all_texts = []
for doc in documents:
for page in doc['pages']:
combined_text = page['header'] + " " + page['content']
preprocessed_text = preprocess_text(combined_text)
if preprocessed_text: # Überprüfen, ob der präprozessierte Text nicht leer ist
all_texts.append(preprocessed_text)
#und nun entsprechend auch die Query überarbeiten
prepro_query = preprocess_text(query)
if not all_texts or not prepro_query:
return "", ""
else:
# Berechnung der Embeddings für alle Dokumente
document_embeddings = model.encode(all_texts, convert_to_tensor=True)
# Berechnung des Embeddings für die Suchanfrage
query_embedding = model.encode(prepro_query, convert_to_tensor=True)
# Berechnung der Ähnlichkeiten zwischen der Suchanfrage und den Dokumenten
similarities = util.pytorch_cos_sim(query_embedding, document_embeddings)[0]
# Berechnung der Ähnlichkeit
#similarities = cosine_similarity(query_tfidf, text_tfidf).flatten()
# Sortieren nach Relevanz
sorted_indices = similarities.argsort(descending=True)
results = []
relevant_text = ""
relevant_docs = {}
num_pages_per_doc = [len(doc['pages']) for doc in documents]
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))]
for i in sorted_indices:
if similarities[i] > 0.3:
doc_index = None
for idx, cumulative in enumerate(cumulative_pages):
if i < cumulative:
doc_index = idx
break
if doc_index is None:
continue
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1]
doc = documents[doc_index]
page = doc['pages'][page_index]
page_content = page['content']
header_content = page.get('header', '')
# Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist
index_in_content = page_content.lower().find(prepro_query.lower())
index_in_header = header_content.lower().find(prepro_query.lower())
# Berücksichtigung der Levenshtein-Distanz
# Berücksichtigung der Levenshtein-Distanz
words_in_query = prepro_query.split()
page_words = preprocess_text(page_content).split()
header_words = preprocess_text(header_content).split()
if (index_in_content != -1 or index_in_header != -1 or
any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words) or
any(fuzz.ratio(word, header_word) > 80 for word in words_in_query for header_word in header_words)):
# Erstellen Sie einen Snippet für die Suchergebnisse
start = max(0, index_in_content - 400) if index_in_content != -1 else 0
end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content)
snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
"
# Fügen Sie die Überschrift hinzu, falls vorhanden
if header_content:
snippet += f"Überschrift: {header_content}
"
snippet += f"{remove_line_breaks(page_content[start:end])}
"
relevant_text += snippet
if doc['file'] not in relevant_docs:
relevant_docs[doc['file']] = []
relevant_docs[doc['file']].append(snippet)
# Sortieren nach Relevanz
results = sorted(results, key=lambda x: x[1], reverse=True)
results = [res[0] for res in results]
results = list(relevant_docs.keys())
return results, relevant_text
###########################################################
############## Vorbereitung View in gradio ################
#######################################
#Suche starten und View aktialisieren
def search_and_update(query):
if not query.strip():
return "Bitte geben Sie einen Suchbegriff ein.
", "Bitte geben Sie einen Suchbegriff ein.
"
relevant_docs, relevant_text = search_documents(query)
if not relevant_docs:
doc_links = "Keine passenden Dokumente gefunden.
"
else:
doc_links = ""
for doc in relevant_docs:
doc_links += download_link(doc) + "
"
if not relevant_text:
relevant_text = "Kein relevanter Text gefunden.
"
return "", doc_links, relevant_text
#Fortschritt anzeigen beim Warten auf Suchergebnisse
def show_progress():
return gr.update(value="Suche läuft...", visible=True)
def hide_progress():
return gr.update(value="", visible=False)
######################################################################
############### Anwendung starten ####################################
with gr.Blocks(css="""
.results {
background-color: #f0f0f0;
padding: 10px;
border-radius: 5px;
overflow-y: auto;
/*max-height: 400px;*/
width: 100%; /* Volle Breite */
}
.no-results {
color: red;
}
.doc-name {
font-weight: bold;
color: #B05DF9; /* Dunkleres Lila für verlinkte Dokumente */
}
.page-number {
font-weight: bold;
color: #FF5733;
}
#doc_links, #relevant_text {
background-color: #333333; /* Sehr dunkles Grau */
padding: 10px; /* Innenabstand */
border-radius: 5px; /* Abgerundete Ecken */
overflow-y: auto; /* Vertikale Scrollbalken */
white-space: pre-wrap; /* Textumbruch innerhalb des Feldes */
height: auto; /* Automatische Höhe */
width: 100%; /* Volle Breite */
}
#doc_links a {
color: #BB70FC; /* Helles Lila für Links im doc_links Feld */
font-weight: bold;
width: 100%; /* Volle Breite */
}
""") as demo:
with gr.Tab("Suche"):
progress = gr.Markdown(value="")
query_input = gr.Textbox(label="Suchanfrage")
with gr.Row():
with gr.Column(scale=1):
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False)
with gr.Column(scale=2):
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False)
query_input.submit(show_progress, inputs=[], outputs=[progress], show_progress="false")
query_input.submit(search_and_update, inputs=[query_input], outputs=[progress, doc_links, relevant_text], show_progress="true").then(
hide_progress,
inputs=[],
outputs=[progress]
)
with gr.Tab("Datei hochladen"):
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen")
output_text = gr.Textbox(label="Status")
#upload_button = gr.Button("Datei hochladen")
file_list = gr.HTML(elem_id="file_list", show_label=False)
#upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text)
# Automatisches Ausführen der Upload-Funktion, wenn eine Datei hochgeladen wird
upload_pdf_file.change(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text)
#gr.HTML(update=display_files, elem_id="file_list", show_label=False)
demo.load(display_files, outputs=file_list)
demo.queue(default_concurrency_limit=10).launch(debug=True)
"""
########################################################
##########Ki Modell für Embeddings der Suchanfrage nutzen
# Laden des Sentence-Transformer-Modells
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
########################################################
######## Hilfsfunktionen für die Suche #################
# Funktion zum Extrahieren von Text aus PDF - und Word
def extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
pages = []
for page in doc:
text = page.get_text()
# Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist
lines = text.split('\n')
header = lines[0] if lines else ''
content = '\n'.join(lines[1:]) if len(lines) > 1 else ''
pages.append({'header': header, 'content': content})
return pages
def extract_text_from_docx(docx_path):
doc = Document(docx_path)
pages = []
current_page = []
header = ''
for para in doc.paragraphs:
if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen
if current_page:
pages.append({'header': header, 'content': '\n'.join(current_page)})
current_page = []
header = para.text
else:
current_page.append(para.text)
if current_page: # Letzte Seite hinzufügen
pages.append({'header': header, 'content': '\n'.join(current_page)})
return pages
# Initialisierung der Dokumente - Dictionary um die Dokuemtneteninhalte, Switen und Überschriften zu halten
def initialize_documents():
documents = []
if os.path.exists(DOCS_DIR):
for file_name in os.listdir(DOCS_DIR):
if file_name.endswith(".pdf"):
pdf_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_pdf(pdf_path)
documents.append({"file": file_name, "pages": pages})
elif file_name.endswith(".docx"):
docx_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_docx(docx_path)
documents.append({"file": file_name, "pages": pages})
return documents
#######################################################
#nach relevanten suche -> download Link der passenden Dokuemtne erstellen
def download_link(doc_name):
# URL für das Herunterladen der Datei
file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}"
return f'{doc_name}'
# Zeitelumbrüche entfernen - bei einzelnen, mehrere hinterienander zu einem zusammenfassen
#zur Ziet nicht im Einsatz
def remove_line_breaks(text):
# Entfernt alle einzelnen Zeilenumbrüche
#text = re.sub(r'(?"
files_table += "Dateiname | Größe (KB) |
"
for i, file in enumerate(files):
file_path = os.path.join(DOCS_DIR, file)
file_size = os.path.getsize(file_path) / 1024 # Größe in KB
row_color = "#4f4f4f" if i % 2 == 0 else "#3a3a3a" # Wechselnde Zeilenfarben
files_table += f""
files_table += f"{download_link(file)} | "
files_table += f"{file_size:.2f} |
"
files_table += ""
return files_table
# gefundene relevante Dokumente auflisten (links)
def list_pdfs():
if not os.path.exists(DOCS_DIR):
return []
return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')]
###########################################################
############# KI um Suchanfrage zu Embedden ###############
# Funktion zur Entfernung von Stopwörtern und Tokenisierung - um bei längeren suchanfragen auf relevante wörter zu konzentrieren
def preprocess_textback(text):
if not text:
return []
stop_words = set(stopwords.words('german'))
tokenizer = RegexpTokenizer(r'\w+')
word_tokens = tokenizer.tokenize(text)
filtered_words = [word for word in word_tokens if word.lower() not in stop_words]
return filtered_words
def preprocess_text(text):
if not text:
return ""
# Konvertiere den Text zu Kleinbuchstaben
text = text.lower()
# Tokenisierung
tokenizer = RegexpTokenizer(r'\w+')
word_tokens = tokenizer.tokenize(text)
# Entfernen von Stoppwörtern
filtered_words = [word for word in word_tokens if word not in stop_words]
# Stemming
stemmer = SnowballStemmer("german")
stemmed_words = [stemmer.stem(word) for word in filtered_words]
return " ".join(stemmed_words)
# Funktion zur Bereinigung des Textes aus den Pdfs und Word Dokuemtne, um den Tokenizer nicht zu überfordern
def clean_text(text):
# Entfernen nicht druckbarer Zeichen
text = re.sub(r'[^\x00-\x7F]+', ' ', text)
# Ersetzen ungewöhnlicher Leerzeichen durch normale Leerzeichen
text = re.sub(r'\s+', ' ', text)
return text.strip()
# Funktion zur Berechnung der Embeddings
def get_embeddings(texts):
return model.encode(texts, convert_to_tensor=True)
#um ähnliche Wörter anhand ihres Wortstammes zu erkennen
# Funktion zur Stemmatisierung des Textes
def stem_text(text):
if not text:
return ""
stemmer = SnowballStemmer("german")
tokenizer = RegexpTokenizer(r'\w+')
word_tokens = tokenizer.tokenize(text)
stemmed_words = [stemmer.stem(word) for word in word_tokens]
return " ".join(stemmed_words)
# Durchsuchen von Dokumenten
def search_documents(query):
documents = initialize_documents()
# Texte und Überschriften in die Embeddings aufnehmen
texts = [page['content'] for doc in documents for page in doc['pages']]
stemmed_texts = [stem_text(text) for text in texts]
text_embeddings = get_embeddings(stemmed_texts)
# Stemming des Queries - and vorher unwichtige Wörter entfernen - um suchergebnis zu verbessern
prepro_query = preprocess_text(query) #stem_text(" ".join(preprocess_text(query)))
# Embedding des Queries
query_embedding = get_embeddings(prepro_query)
# Sicherstellen, dass die Embeddings 2D-Arrays sind
if len(query_embedding.shape) == 1:
query_embedding = query_embedding.reshape(1, -1)
if len(text_embeddings.shape) == 1:
text_embeddings = text_embeddings.reshape(1, -1)
# Berechnung der Ähnlichkeit
similarities = cosine_similarity(query_embedding.cpu(), text_embeddings.cpu()).flatten()
# Sortieren nach Relevanz
related_docs_indices = similarities.argsort()[::-1]
results=[]
relevant_text = ""
relevant_docs = {}
num_pages_per_doc = [len(doc['pages']) for doc in documents]
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))]
for i in related_docs_indices:
if similarities[i] > 0.3:
doc_index = None
for idx, cumulative in enumerate(cumulative_pages):
if i < cumulative:
doc_index = idx
break
if doc_index is None:
continue
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1]
doc = documents[doc_index]
page = doc['pages'][page_index]
page_content = page['content']
header_content = page.get('header', '')
# Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist
index_in_content = page_content.lower().find(prepro_query.lower())
index_in_header = header_content.lower().find(prepro_query.lower())
# Berücksichtigung der Levenshtein-Distanz
words_in_query = prepro_query
page_words = stem_text(page_content).split()
if index_in_content != -1 or index_in_header != -1 or any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words): # <--- Integration von fuzz.ratio für jedes Wort
# Erstellen Sie einen Snippet für die Suchergebnisse
start = max(0, index_in_content - 400) if index_in_content != -1 else 0
end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content)
snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
"
# Fügen Sie die Überschrift hinzu, falls vorhanden
if header_content:
snippet += f"Überschrift: {header_content}
"
snippet += f"{remove_line_breaks(page_content[start:end])}
"
relevant_text += snippet
if doc['file'] not in relevant_docs:
relevant_docs[doc['file']] = []
relevant_docs[doc['file']].append(snippet)
# Sortieren nach Relevanz
results = sorted(results, key=lambda x: x[1], reverse=True)
results = [res[0] for res in results]
results = list(relevant_docs.keys())
return results, relevant_text
###########################################################
############## Vorbereitung View in gradio ################
#######################################
#Suche starten und View aktialisieren
def search_and_update(query):
if not query.strip():
return "Bitte geben Sie einen Suchbegriff ein.
", "Bitte geben Sie einen Suchbegriff ein.
"
relevant_docs, relevant_text = search_documents(query)
if not relevant_docs:
doc_links = "Keine passenden Dokumente gefunden.
"
else:
doc_links = ""
for doc in relevant_docs:
doc_links += download_link(doc) + "
"
if not relevant_text:
relevant_text = "Kein relevanter Text gefunden.
"
return "", doc_links, relevant_text
#Fortschritt anzeigen beim Warten auf Suchergebnisse
def show_progress():
return gr.update(value="Suche läuft...", visible=True)
def hide_progress():
return gr.update(value="", visible=False)
######################################################################
############### Anwendung starten ####################################
with gr.Blocks(css=
.results {
background-color: #f0f0f0;
padding: 10px;
border-radius: 5px;
overflow-y: auto;
/*max-height: 400px;*/
width: 100%; /* Volle Breite */
}
.no-results {
color: red;
}
.doc-name {
font-weight: bold;
color: #B05DF9; /* Dunkleres Lila für verlinkte Dokumente */
}
.page-number {
font-weight: bold;
color: #FF5733;
}
#doc_links, #relevant_text {
background-color: #333333; /* Sehr dunkles Grau */
padding: 10px; /* Innenabstand */
border-radius: 5px; /* Abgerundete Ecken */
overflow-y: auto; /* Vertikale Scrollbalken */
white-space: pre-wrap; /* Textumbruch innerhalb des Feldes */
height: auto; /* Automatische Höhe */
width: 100%; /* Volle Breite */
}
#doc_links a {
color: #BB70FC; /* Helles Lila für Links im doc_links Feld */
font-weight: bold;
width: 100%; /* Volle Breite */
}
) as demo:
with gr.Tab("Suche"):
progress = gr.Markdown(value="")
query_input = gr.Textbox(label="Suchanfrage")
with gr.Row():
with gr.Column(scale=1):
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False)
with gr.Column(scale=2):
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False)
query_input.submit(show_progress, inputs=[], outputs=[progress], show_progress="false")
query_input.submit(search_and_update, inputs=[query_input], outputs=[progress, doc_links, relevant_text], show_progress="true").then(
hide_progress,
inputs=[],
outputs=[progress]
)
with gr.Tab("Datei hochladen"):
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen")
output_text = gr.Textbox(label="Status")
#upload_button = gr.Button("Datei hochladen")
file_list = gr.HTML(elem_id="file_list", show_label=False)
#upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text)
# Automatisches Ausführen der Upload-Funktion, wenn eine Datei hochgeladen wird
upload_pdf_file.change(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text)
#gr.HTML(update=display_files, elem_id="file_list", show_label=False)
demo.load(display_files, outputs=file_list)
demo.launch()
"""