import gradio as gr import fitz # PyMuPDF import os import re import requests from huggingface_hub import HfApi import base64 from io import BytesIO import urllib.parse import tempfile from sklearn.metrics.pairwise import cosine_similarity from docx import Document import asyncio import docx from transformers import AutoTokenizer, AutoModel from sentence_transformers import SentenceTransformer, util from fuzzywuzzy import fuzz import nltk from nltk.stem import PorterStemmer from nltk.corpus import stopwords from nltk.tokenize import RegexpTokenizer from transformers import BertModel, BertTokenizer from nltk.tokenize import word_tokenize from nltk.stem.snowball import SnowballStemmer import torch # Zugriff auf das Secret als Umgebungsvariable HF_READ = os.getenv("HF_READ") HF_WRITE = os.getenv("HF_WRITE") # Relativer Pfad zum Verzeichnis mit den Dokumenten DOCS_DIR = "kkg_dokumente" # Konstanten für Datei-Upload REPO_ID = "alexkueck/kkg_suche" REPO_TYPE = "space" # HfApi-Instanz erstellen api = HfApi() # Falls noch nicht geschehen, müssen Sie die NLTK Ressourcen herunterladen nltk.download('punkt') nltk.download('stopwords') german_stopwords = set(stopwords.words('german')) ######################################################## ##########Ki Modell für Embeddings der Suchanfrage nutzen # Laden des Sentence-Transformer-Modells # Laden des vortrainierten Sentence-BERT-Modells model = SentenceTransformer('paraphrase-MiniLM-L6-v2') ######################################################## ######## Hilfsfunktionen für die Suche ################# # Funktion zum Extrahieren von Text aus PDF - und Word def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) pages = [] for page in doc: text = page.get_text() # Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist lines = text.split('\n') header = lines[0] if lines else '' content = '\n'.join(lines[1:]) if len(lines) > 1 else '' pages.append({'header': header, 'content': content}) return pages def extract_text_from_docx(docx_path): doc = Document(docx_path) pages = [] current_page = [] header = '' for para in doc.paragraphs: if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen if current_page: pages.append({'header': header, 'content': '\n'.join(current_page)}) current_page = [] header = para.text else: current_page.append(para.text) if current_page: # Letzte Seite hinzufügen pages.append({'header': header, 'content': '\n'.join(current_page)}) return pages # Initialisierung der Dokumente - Dictionary um die Dokuemtneteninhalte, Switen und Überschriften zu halten def initialize_documents(): documents = [] if os.path.exists(DOCS_DIR): for file_name in os.listdir(DOCS_DIR): if file_name.endswith(".pdf"): pdf_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_pdf(pdf_path) documents.append({"file": file_name, "pages": pages}) elif file_name.endswith(".docx"): docx_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_docx(docx_path) documents.append({"file": file_name, "pages": pages}) return documents ####################################################### #nach relevanten suche -> download Link der passenden Dokuemtne erstellen def download_link(doc_name): # URL für das Herunterladen der Datei file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}" return f'{doc_name}' # Zeitelumbrüche entfernen - bei einzelnen, mehrere hinterienander zu einem zusammenfassen #zur Ziet nicht im Einsatz def remove_line_breaks(text): # Entfernt alle einzelnen Zeilenumbrüche #text = re.sub(r'(?" files_table += "DateinameGröße (KB)" for i, file in enumerate(files): file_path = os.path.join(DOCS_DIR, file) file_size = os.path.getsize(file_path) / 1024 # Größe in KB row_color = "#4f4f4f" if i % 2 == 0 else "#3a3a3a" # Wechselnde Zeilenfarben files_table += f"" files_table += f"{download_link(file)}" files_table += f"{file_size:.2f}" files_table += "" return files_table # gefundene relevante Dokumente auflisten (links) def list_pdfs(): if not os.path.exists(DOCS_DIR): return [] return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] ########################################################### ############# KI um Suchanfrage zu Embedden ############### #um ähnliche Wörter anhand ihres Wortstammes zu erkennen # Funktion zur Stemmatisierung des Textes def preprocess_text(text): if not text: return "" text = text.lower() tokenizer = RegexpTokenizer(r'\w+') word_tokens = tokenizer.tokenize(text) filtered_words = [word for word in word_tokens if word not in german_stopwords] stemmer = SnowballStemmer("german") stemmed_words = [stemmer.stem(word) for word in filtered_words] return " ".join(stemmed_words) # Funktion zur Bereinigung des Textes aus den Pdfs und Word Dokuemtne, um den Tokenizer nicht zu überfordern def clean_text(text): # Entfernen nicht druckbarer Zeichen text = re.sub(r'[^\x00-\x7F]+', ' ', text) # Ersetzen ungewöhnlicher Leerzeichen durch normale Leerzeichen text = re.sub(r'\s+', ' ', text) return text.strip() # Durchsuchen von Dokumenten def search_documents(query): documents = initialize_documents() # Texte und Überschriften in die Embeddings aufnehmen texts = [page['content'] for doc in documents for page in doc['pages']] # Stemming von texten und query (also auf Wort-grundformen bringen) - und vorher unwichtige Wörter entfernen - um Suchergebnis zu verbessern #es soll auf die contents der seiten und die Überschriften angewendet werden # Texte und Überschriften in die Embeddings aufnehmen all_texts = [] for doc in documents: for page in doc['pages']: combined_text = page['header'] + " " + page['content'] preprocessed_text = preprocess_text(combined_text) if preprocessed_text: # Überprüfen, ob der präprozessierte Text nicht leer ist all_texts.append(preprocessed_text) #und nun entsprechend auch die Query überarbeiten prepro_query = preprocess_text(query) if not all_texts or not prepro_query: return "", "" else: # Berechnung der Embeddings für alle Dokumente document_embeddings = model.encode(all_texts, convert_to_tensor=True) # Berechnung des Embeddings für die Suchanfrage query_embedding = model.encode(prepro_query, convert_to_tensor=True) # Berechnung der Ähnlichkeiten zwischen der Suchanfrage und den Dokumenten similarities = util.pytorch_cos_sim(query_embedding, document_embeddings)[0] # Berechnung der Ähnlichkeit #similarities = cosine_similarity(query_tfidf, text_tfidf).flatten() # Sortieren nach Relevanz sorted_indices = similarities.argsort(descending=True) results = [] relevant_text = "" relevant_docs = {} num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in sorted_indices: if similarities[i] > 0.3: doc_index = None for idx, cumulative in enumerate(cumulative_pages): if i < cumulative: doc_index = idx break if doc_index is None: continue page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] page = doc['pages'][page_index] page_content = page['content'] header_content = page.get('header', '') # Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist index_in_content = page_content.lower().find(prepro_query.lower()) index_in_header = header_content.lower().find(prepro_query.lower()) # Berücksichtigung der Levenshtein-Distanz # Berücksichtigung der Levenshtein-Distanz words_in_query = prepro_query.split() page_words = preprocess_text(page_content).split() header_words = preprocess_text(header_content).split() if (index_in_content != -1 or index_in_header != -1 or any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words) or any(fuzz.ratio(word, header_word) > 80 for word in words_in_query for header_word in header_words)): # Erstellen Sie einen Snippet für die Suchergebnisse start = max(0, index_in_content - 400) if index_in_content != -1 else 0 end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content) snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
" # Fügen Sie die Überschrift hinzu, falls vorhanden if header_content: snippet += f"Überschrift: {header_content}
" snippet += f"{remove_line_breaks(page_content[start:end])}

" relevant_text += snippet if doc['file'] not in relevant_docs: relevant_docs[doc['file']] = [] relevant_docs[doc['file']].append(snippet) # Sortieren nach Relevanz results = sorted(results, key=lambda x: x[1], reverse=True) results = [res[0] for res in results] results = list(relevant_docs.keys()) return results, relevant_text ########################################################### ############## Vorbereitung View in gradio ################ ####################################### #Suche starten und View aktialisieren def search_and_update(query): if not query.strip(): return "
Bitte geben Sie einen Suchbegriff ein.
", "
Bitte geben Sie einen Suchbegriff ein.
" relevant_docs, relevant_text = search_documents(query) if not relevant_docs: doc_links = "
Keine passenden Dokumente gefunden.
" else: doc_links = "" for doc in relevant_docs: doc_links += download_link(doc) + "
" if not relevant_text: relevant_text = "
Kein relevanter Text gefunden.
" return "", doc_links, relevant_text #Fortschritt anzeigen beim Warten auf Suchergebnisse def show_progress(): return gr.update(value="Suche läuft...", visible=True) def hide_progress(): return gr.update(value="", visible=False) ###################################################################### ############### Anwendung starten #################################### with gr.Blocks(css=""" .results { background-color: #f0f0f0; padding: 10px; border-radius: 5px; overflow-y: auto; /*max-height: 400px;*/ width: 100%; /* Volle Breite */ } .no-results { color: red; } .doc-name { font-weight: bold; color: #B05DF9; /* Dunkleres Lila für verlinkte Dokumente */ } .page-number { font-weight: bold; color: #FF5733; } #doc_links, #relevant_text { background-color: #333333; /* Sehr dunkles Grau */ padding: 10px; /* Innenabstand */ border-radius: 5px; /* Abgerundete Ecken */ overflow-y: auto; /* Vertikale Scrollbalken */ white-space: pre-wrap; /* Textumbruch innerhalb des Feldes */ height: auto; /* Automatische Höhe */ width: 100%; /* Volle Breite */ } #doc_links a { color: #BB70FC; /* Helles Lila für Links im doc_links Feld */ font-weight: bold; width: 100%; /* Volle Breite */ } """) as demo: with gr.Tab("Suche"): progress = gr.Markdown(value="") query_input = gr.Textbox(label="Suchanfrage") with gr.Row(): with gr.Column(scale=1): doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) with gr.Column(scale=2): relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) query_input.submit(show_progress, inputs=[], outputs=[progress], show_progress="false") query_input.submit(search_and_update, inputs=[query_input], outputs=[progress, doc_links, relevant_text], show_progress="true").then( hide_progress, inputs=[], outputs=[progress] ) with gr.Tab("Datei hochladen"): upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") output_text = gr.Textbox(label="Status") #upload_button = gr.Button("Datei hochladen") file_list = gr.HTML(elem_id="file_list", show_label=False) #upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) # Automatisches Ausführen der Upload-Funktion, wenn eine Datei hochgeladen wird upload_pdf_file.change(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) #gr.HTML(update=display_files, elem_id="file_list", show_label=False) demo.load(display_files, outputs=file_list) demo.queue(default_concurrency_limit=10).launch(debug=True) """ ######################################################## ##########Ki Modell für Embeddings der Suchanfrage nutzen # Laden des Sentence-Transformer-Modells model = SentenceTransformer('paraphrase-MiniLM-L6-v2') ######################################################## ######## Hilfsfunktionen für die Suche ################# # Funktion zum Extrahieren von Text aus PDF - und Word def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) pages = [] for page in doc: text = page.get_text() # Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist lines = text.split('\n') header = lines[0] if lines else '' content = '\n'.join(lines[1:]) if len(lines) > 1 else '' pages.append({'header': header, 'content': content}) return pages def extract_text_from_docx(docx_path): doc = Document(docx_path) pages = [] current_page = [] header = '' for para in doc.paragraphs: if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen if current_page: pages.append({'header': header, 'content': '\n'.join(current_page)}) current_page = [] header = para.text else: current_page.append(para.text) if current_page: # Letzte Seite hinzufügen pages.append({'header': header, 'content': '\n'.join(current_page)}) return pages # Initialisierung der Dokumente - Dictionary um die Dokuemtneteninhalte, Switen und Überschriften zu halten def initialize_documents(): documents = [] if os.path.exists(DOCS_DIR): for file_name in os.listdir(DOCS_DIR): if file_name.endswith(".pdf"): pdf_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_pdf(pdf_path) documents.append({"file": file_name, "pages": pages}) elif file_name.endswith(".docx"): docx_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_docx(docx_path) documents.append({"file": file_name, "pages": pages}) return documents ####################################################### #nach relevanten suche -> download Link der passenden Dokuemtne erstellen def download_link(doc_name): # URL für das Herunterladen der Datei file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}" return f'{doc_name}' # Zeitelumbrüche entfernen - bei einzelnen, mehrere hinterienander zu einem zusammenfassen #zur Ziet nicht im Einsatz def remove_line_breaks(text): # Entfernt alle einzelnen Zeilenumbrüche #text = re.sub(r'(?" files_table += "DateinameGröße (KB)" for i, file in enumerate(files): file_path = os.path.join(DOCS_DIR, file) file_size = os.path.getsize(file_path) / 1024 # Größe in KB row_color = "#4f4f4f" if i % 2 == 0 else "#3a3a3a" # Wechselnde Zeilenfarben files_table += f"" files_table += f"{download_link(file)}" files_table += f"{file_size:.2f}" files_table += "" return files_table # gefundene relevante Dokumente auflisten (links) def list_pdfs(): if not os.path.exists(DOCS_DIR): return [] return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] ########################################################### ############# KI um Suchanfrage zu Embedden ############### # Funktion zur Entfernung von Stopwörtern und Tokenisierung - um bei längeren suchanfragen auf relevante wörter zu konzentrieren def preprocess_textback(text): if not text: return [] stop_words = set(stopwords.words('german')) tokenizer = RegexpTokenizer(r'\w+') word_tokens = tokenizer.tokenize(text) filtered_words = [word for word in word_tokens if word.lower() not in stop_words] return filtered_words def preprocess_text(text): if not text: return "" # Konvertiere den Text zu Kleinbuchstaben text = text.lower() # Tokenisierung tokenizer = RegexpTokenizer(r'\w+') word_tokens = tokenizer.tokenize(text) # Entfernen von Stoppwörtern filtered_words = [word for word in word_tokens if word not in stop_words] # Stemming stemmer = SnowballStemmer("german") stemmed_words = [stemmer.stem(word) for word in filtered_words] return " ".join(stemmed_words) # Funktion zur Bereinigung des Textes aus den Pdfs und Word Dokuemtne, um den Tokenizer nicht zu überfordern def clean_text(text): # Entfernen nicht druckbarer Zeichen text = re.sub(r'[^\x00-\x7F]+', ' ', text) # Ersetzen ungewöhnlicher Leerzeichen durch normale Leerzeichen text = re.sub(r'\s+', ' ', text) return text.strip() # Funktion zur Berechnung der Embeddings def get_embeddings(texts): return model.encode(texts, convert_to_tensor=True) #um ähnliche Wörter anhand ihres Wortstammes zu erkennen # Funktion zur Stemmatisierung des Textes def stem_text(text): if not text: return "" stemmer = SnowballStemmer("german") tokenizer = RegexpTokenizer(r'\w+') word_tokens = tokenizer.tokenize(text) stemmed_words = [stemmer.stem(word) for word in word_tokens] return " ".join(stemmed_words) # Durchsuchen von Dokumenten def search_documents(query): documents = initialize_documents() # Texte und Überschriften in die Embeddings aufnehmen texts = [page['content'] for doc in documents for page in doc['pages']] stemmed_texts = [stem_text(text) for text in texts] text_embeddings = get_embeddings(stemmed_texts) # Stemming des Queries - and vorher unwichtige Wörter entfernen - um suchergebnis zu verbessern prepro_query = preprocess_text(query) #stem_text(" ".join(preprocess_text(query))) # Embedding des Queries query_embedding = get_embeddings(prepro_query) # Sicherstellen, dass die Embeddings 2D-Arrays sind if len(query_embedding.shape) == 1: query_embedding = query_embedding.reshape(1, -1) if len(text_embeddings.shape) == 1: text_embeddings = text_embeddings.reshape(1, -1) # Berechnung der Ähnlichkeit similarities = cosine_similarity(query_embedding.cpu(), text_embeddings.cpu()).flatten() # Sortieren nach Relevanz related_docs_indices = similarities.argsort()[::-1] results=[] relevant_text = "" relevant_docs = {} num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if similarities[i] > 0.3: doc_index = None for idx, cumulative in enumerate(cumulative_pages): if i < cumulative: doc_index = idx break if doc_index is None: continue page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] page = doc['pages'][page_index] page_content = page['content'] header_content = page.get('header', '') # Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist index_in_content = page_content.lower().find(prepro_query.lower()) index_in_header = header_content.lower().find(prepro_query.lower()) # Berücksichtigung der Levenshtein-Distanz words_in_query = prepro_query page_words = stem_text(page_content).split() if index_in_content != -1 or index_in_header != -1 or any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words): # <--- Integration von fuzz.ratio für jedes Wort # Erstellen Sie einen Snippet für die Suchergebnisse start = max(0, index_in_content - 400) if index_in_content != -1 else 0 end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content) snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
" # Fügen Sie die Überschrift hinzu, falls vorhanden if header_content: snippet += f"Überschrift: {header_content}
" snippet += f"{remove_line_breaks(page_content[start:end])}

" relevant_text += snippet if doc['file'] not in relevant_docs: relevant_docs[doc['file']] = [] relevant_docs[doc['file']].append(snippet) # Sortieren nach Relevanz results = sorted(results, key=lambda x: x[1], reverse=True) results = [res[0] for res in results] results = list(relevant_docs.keys()) return results, relevant_text ########################################################### ############## Vorbereitung View in gradio ################ ####################################### #Suche starten und View aktialisieren def search_and_update(query): if not query.strip(): return "
Bitte geben Sie einen Suchbegriff ein.
", "
Bitte geben Sie einen Suchbegriff ein.
" relevant_docs, relevant_text = search_documents(query) if not relevant_docs: doc_links = "
Keine passenden Dokumente gefunden.
" else: doc_links = "" for doc in relevant_docs: doc_links += download_link(doc) + "
" if not relevant_text: relevant_text = "
Kein relevanter Text gefunden.
" return "", doc_links, relevant_text #Fortschritt anzeigen beim Warten auf Suchergebnisse def show_progress(): return gr.update(value="Suche läuft...", visible=True) def hide_progress(): return gr.update(value="", visible=False) ###################################################################### ############### Anwendung starten #################################### with gr.Blocks(css= .results { background-color: #f0f0f0; padding: 10px; border-radius: 5px; overflow-y: auto; /*max-height: 400px;*/ width: 100%; /* Volle Breite */ } .no-results { color: red; } .doc-name { font-weight: bold; color: #B05DF9; /* Dunkleres Lila für verlinkte Dokumente */ } .page-number { font-weight: bold; color: #FF5733; } #doc_links, #relevant_text { background-color: #333333; /* Sehr dunkles Grau */ padding: 10px; /* Innenabstand */ border-radius: 5px; /* Abgerundete Ecken */ overflow-y: auto; /* Vertikale Scrollbalken */ white-space: pre-wrap; /* Textumbruch innerhalb des Feldes */ height: auto; /* Automatische Höhe */ width: 100%; /* Volle Breite */ } #doc_links a { color: #BB70FC; /* Helles Lila für Links im doc_links Feld */ font-weight: bold; width: 100%; /* Volle Breite */ } ) as demo: with gr.Tab("Suche"): progress = gr.Markdown(value="") query_input = gr.Textbox(label="Suchanfrage") with gr.Row(): with gr.Column(scale=1): doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) with gr.Column(scale=2): relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) query_input.submit(show_progress, inputs=[], outputs=[progress], show_progress="false") query_input.submit(search_and_update, inputs=[query_input], outputs=[progress, doc_links, relevant_text], show_progress="true").then( hide_progress, inputs=[], outputs=[progress] ) with gr.Tab("Datei hochladen"): upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") output_text = gr.Textbox(label="Status") #upload_button = gr.Button("Datei hochladen") file_list = gr.HTML(elem_id="file_list", show_label=False) #upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) # Automatisches Ausführen der Upload-Funktion, wenn eine Datei hochgeladen wird upload_pdf_file.change(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) #gr.HTML(update=display_files, elem_id="file_list", show_label=False) demo.load(display_files, outputs=file_list) demo.launch() """