import gradio as gr import fitz # PyMuPDF import os import requests from huggingface_hub import HfApi import base64 from io import BytesIO import urllib.parse import tempfile from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from docx import Document import asyncio import docx from transformers import AutoTokenizer, AutoModel from sentence_transformers import SentenceTransformer from fuzzywuzzy import fuzz from nltk.stem import PorterStemmer import torch # Zugriff auf das Secret als Umgebungsvariable HF_WRITE = os.getenv("HF_WRITE") HF_READ = os.getenv("HF_READ") # CONSTANTS REPO_ID = "alexkueck/kkg_suche" REPO_TYPE = "space" SAVE_DIR = "kkg_dokumente" # HfApi-Instanz erstellen api = HfApi() # Relativer Pfad zum Verzeichnis mit den Dokumenten DOCS_DIR = "kkg_dokumente" ######################################################## ##########Ki Modell für Embeddings der Suchanfrage nutzen # Laden des Sentence-Transformer-Modells model = SentenceTransformer('paraphrase-MiniLM-L6-v2') # Funktion zum Extrahieren von Text aus PDF def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) pages = [] for page in doc: text = page.get_text() # Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist lines = text.split('\n') header = lines[0] if lines else '' content = '\n'.join(lines[1:]) if len(lines) > 1 else '' pages.append({'header': header, 'content': content}) return pages def extract_text_from_docx(docx_path): doc = Document(docx_path) pages = [] current_page = [] header = '' for para in doc.paragraphs: if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen if current_page: pages.append({'header': header, 'content': '\n'.join(current_page)}) current_page = [] header = para.text else: current_page.append(para.text) if current_page: # Letzte Seite hinzufügen pages.append({'header': header, 'content': '\n'.join(current_page)}) return pages # Initialisierung der Dokumente def initialize_documents(): documents = [] if os.path.exists(DOCS_DIR): for file_name in os.listdir(DOCS_DIR): if file_name.endswith(".pdf"): pdf_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_pdf(pdf_path) documents.append({"file": file_name, "pages": pages}) elif file_name.endswith(".docx"): docx_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_docx(docx_path) documents.append({"file": file_name, "pages": pages}) return documents # Hochladen von Dateien def upload_pdf(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen upload_path = f"kkg_dokumente/{filename}" api.upload_file( path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=REPO_ID, repo_type=REPO_TYPE, token=HF_WRITE ) return f"PDF '{filename}' erfolgreich hochgeladen." def list_pdfs(): if not os.path.exists(SAVE_DIR): return [] return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] ###################################### #nach relevanten suche -> download Link der passenden Dokuemtne erstellen def download_link(doc_name): # URL für das Herunterladen der Datei file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}" return f'{doc_name}' ########################################################### ############# KI um Suchanfrage zu Embedden ############### # Funktion zur Berechnung der Embeddings def get_embeddings(texts): return model.encode(texts, convert_to_tensor=True) #um ähnliche Wörter anhand ihres Wortstammes zu erkennen # Stemming-Funktion def stem_text(text): ps = PorterStemmer() return ' '.join([ps.stem(word) for word in text.split()]) # Durchsuchen von Dokumenten def search_documents(query): documents = initialize_documents() # Texte und Überschriften in die Embeddings aufnehmen texts = [page['content'] for doc in documents for page in doc['pages']] stemmed_texts = [stem_text(text) for text in texts] text_embeddings = get_embeddings(stemmed_texts) # Stemming des Queries stemmed_query = stem_text(query) # Embedding des Queries query_embedding = get_embeddings([stemmed_query]) # Berechnung der Ähnlichkeit similarities = cosine_similarity(query_embedding.cpu(), text_embeddings.cpu()).flatten() # Sortieren nach Relevanz related_docs_indices = similarities.argsort()[::-1] results=[] relevant_text = "" relevant_docs = {} num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] page = doc['pages'][page_index] page_content = page['content'] header_content = page.get('header', '') # Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist index_in_content = page_content.lower().find(query.lower()) index_in_header = header_content.lower().find(query.lower()) # Berücksichtigung der Levenshtein-Distanz words_in_query = stemmed_query.split() page_words = stem_text(page_content).split() if index_in_content != -1 or index_in_header != -1 or any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words): # <--- Integration von fuzz.ratio für jedes Wort # Erstellen Sie einen Snippet für die Suchergebnisse start = max(0, index_in_content - 400) if index_in_content != -1 else 0 end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content) snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
" # Fügen Sie die Überschrift hinzu, falls vorhanden if header_content: snippet += f"Überschrift: {header_content}
" snippet += f"...{page_content[start:end]}...

" relevant_text += snippet if doc['file'] not in relevant_docs: relevant_docs[doc['file']] = [] relevant_docs[doc['file']].append(snippet) # Sortieren nach Relevanz results = sorted(results, key=lambda x: x[1], reverse=True) results = [res[0] for res in results] results = list(relevant_docs.keys()) return results, relevant_text ####################################### #Suche starten und View aktialisieren def search_and_update(query): if not query.strip(): return "
Bitte geben Sie einen Suchbegriff ein.
", "
Bitte geben Sie einen Suchbegriff ein.
" relevant_docs, relevant_text = search_documents(query) if not relevant_docs: doc_links = "
Keine passenden Dokumente gefunden.
" else: doc_links = "" for doc in relevant_docs: doc_links += download_link(doc) + "
" if not relevant_text: relevant_text = "
Kein relevanter Text gefunden.
" return doc_links, relevant_text ###################################################################### ############### Anwendung starten #################################### with gr.Blocks(css=""" .results { background-color: #f0f0f0; padding: 10px; border-radius: 5px; overflow-y: auto; max-height: 400px; } .no-results { color: red; } .doc-name { font-weight: bold; color: #DDA0DD; } .page-number { font-weight: bold; color: #FF5733; } #doc_links, #relevant_text { color: #DDA0DD; background-color: #333333; /* Sehr dunkles Grau */ padding: 10px; /* Innenabstand */ border-radius: 5px; /* Abgerundete Ecken */ max-height: 400px; /* Max Höhe für bessere Lesbarkeit */ overflow-y: auto; /* Vertikale Scrollbalken */ } """) as demo: with gr.Tab("Suche"): query_input = gr.Textbox(label="Suchbegriff") with gr.Row(): with gr.Column(scale=1): doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) with gr.Column(scale=2): relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text], show_progress="full") with gr.Tab("Datei hochladen"): upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") output_text = gr.Textbox(label="Status") upload_button = gr.Button("Datei hochladen") upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) demo.launch() """ ###############################funktioniert - suche nicht gut genug##################################################################### ############################################################ ####### Hilfsfunktionen #alle documente den text extrahieren und die einzelen texte als dictionay zusammenstellen - mit überschrift, seitenzahl und text def initialize_documents(): documents = [] if os.path.exists(DOCS_DIR): for file_name in os.listdir(DOCS_DIR): if file_name.endswith(".pdf"): pdf_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_pdf(pdf_path) documents.append({"file": file_name, "pages": pages}) elif file_name.endswith(".docx"): docx_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_docx(docx_path) documents.append({"file": file_name, "pages": pages}) return documents # Funktion zum Extrahieren des Textes aus einer PDF-Datei def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) pages = [] for page in doc: text = page.get_text() # Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist lines = text.split('\n') header = lines[0] if lines else '' content = '\n'.join(lines[1:]) if len(lines) > 1 else '' pages.append({'header': header, 'content': content}) return pages def extract_text_from_docx(docx_path): doc = Document(docx_path) pages = [] current_page = [] header = '' for para in doc.paragraphs: if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen if current_page: pages.append({'header': header, 'content': '\n'.join(current_page)}) current_page = [] header = para.text else: current_page.append(para.text) if current_page: # Letzte Seite hinzufügen pages.append({'header': header, 'content': '\n'.join(current_page)}) return pages ########################################################### #################### Suche init ########################### # Dynamische Erstellung der Dokumentenliste und Extraktion der Texte print("Martix init ........................") #documents = initialize_documents() # Initialisierung des Vektorizersvectorizer = TfidfVectorizer() # Extrahieren der Texte aus den Dokumentseiten #texts = [page['content'] for doc in documents for page in doc['pages']] # Erstellen der TF-IDF-Matrix #tfidf_matrix = vectorizer.fit_transform(texts) ########################################################## ############# weitere Hilsffunktionen #################### def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] #Dokumente durchgehen und Vektorisieren für die anschließende Suche documents = initialize_documents() vectorizer = TfidfVectorizer() texts = [page['content'] for doc in documents for page in doc['pages']] tfidf_matrix = vectorizer.fit_transform(texts) #Sucha nach relvanten dokumenten query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_docs = {} num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] page = doc['pages'][page_index] page_content = page['content'] header_content = page.get('header', '') # Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist index_in_content = page_content.lower().find(query.lower()) index_in_header = header_content.lower().find(query.lower()) if index_in_content != -1 or index_in_header != -1: # Erstellen Sie einen Snippet für die Suchergebnisse start = max(0, index_in_content - 400) if index_in_content != -1 else 0 end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content) snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
" # Fügen Sie die Überschrift hinzu, falls vorhanden if header_content: snippet += f"Überschrift: {header_content}
" snippet += f"...{page_content[start:end]}...

" relevant_text += snippet if doc['file'] not in relevant_docs: relevant_docs[doc['file']] = [] relevant_docs[doc['file']].append(snippet) results = list(relevant_docs.keys()) return results, relevant_text def update_display(selected_pdf): return display_document(selected_pdf) def update_dropdown(): return gr.Dropdown.update(choices=list_pdfs()) def search_and_updateBack(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" images = [] temp_dir = tempfile.mkdtemp() for pdf, page in relevant_pdfs: pdf_path = os.path.join(SAVE_DIR, pdf) document = fitz.open(pdf_path) # Seite als Integer umwandeln page_num = int(page) page = document.load_page(page_num) pix = page.get_pixmap() img_path = os.path.join(temp_dir, f"{pdf}_page_{page.number}.png") pix.save(img_path) images.append(img_path) return images, rel_text def search_and_updateback(query): relevant_docs, relevant_text, _ = search_documents(query) doc_links = "" for doc in relevant_docs: doc_links += download_link(doc) + "
" return doc_links, relevant_text def upload_pdf(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen upload_path = f"kkg_dokumente/{filename}" api.upload_file( path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=REPO_ID, repo_type=REPO_TYPE, token=HF_WRITE ) return f"PDF '{filename}' erfolgreich hochgeladen." def list_pdfs(): if not os.path.exists(SAVE_DIR): return [] return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] ###################################### #nach relevanten suche -> download Link der passenden Dokuemtne erstellen def download_link(doc_name): # URL für das Herunterladen der Datei file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}" return f'{doc_name}' ####################################### #Suche starten und View aktialisieren def search_and_update(query): if not query.strip(): return "
Bitte geben Sie einen Suchbegriff ein.
", "
Bitte geben Sie einen Suchbegriff ein.
" relevant_docs, relevant_text = search_documents(query) if not relevant_docs: doc_links = "
Keine passenden Dokumente gefunden.
" else: doc_links = "" for doc in relevant_docs: doc_links += download_link(doc) + "
" if not relevant_text: relevant_text = "
Kein relevanter Text gefunden.
" return doc_links, relevant_text ###################################################################### ############### Anwendung starten #################################### with gr.Blocks(css= .results { background-color: #f0f0f0; padding: 10px; border-radius: 5px; overflow-y: auto; max-height: 400px; } .no-results { color: red; } .doc-name { font-weight: bold; color: #007BFF; } .page-number { font-weight: bold; color: #FF5733; } ) as demo: with gr.Tab("Suche"): query_input = gr.Textbox(label="Suchbegriff") with gr.Row(): with gr.Column(scale=2): doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) with gr.Column(scale=1): relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text]) with gr.Tab("Datei hochladen"): upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") output_text = gr.Textbox(label="Status") upload_button = gr.Button("Datei hochladen") upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) demo.launch() """ """ def initialize_documents(): documents = [] if os.path.exists(DOCS_DIR): for file_name in os.listdir(DOCS_DIR): if file_name.endswith(".pdf"): pdf_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_pdf(pdf_path) documents.append({"file": file_name, "pages": pages}) elif file_name.endswith(".docx"): docx_path = os.path.join(DOCS_DIR, file_name) pages = extract_text_from_docx(docx_path) documents.append({"file": file_name, "pages": pages}) return documents # Funktion zum Extrahieren des Textes aus einer PDF-Datei oder Word-Datei def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) pages = [] for page in doc: text = page.get_text() # Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist lines = text.split('\n') header = lines[0] if lines else '' content = '\n'.join(lines[1:]) if len(lines) > 1 else '' pages.append({'header': header, 'content': content}) return pages def extract_text_from_docx(docx_path): doc = Document(docx_path) pages = [] current_page = [] header = '' for para in doc.paragraphs: if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen if current_page: pages.append({'header': header, 'content': '\n'.join(current_page)}) current_page = [] header = para.text else: current_page.append(para.text) if current_page: # Letzte Seite hinzufügen pages.append({'header': header, 'content': '\n'.join(current_page)}) return pages ########################################################### #################### Suche init ########################### # Dynamische Erstellung der Dokumentenliste und Extraktion der Texte print("Martix init ........................") documents = initialize_documents() # Initialisierung des Vektorizers vectorizer = TfidfVectorizer() # Extrahieren der Texte aus den Dokumentseiten texts = [page['content'] for doc in documents for page in doc['pages']] # Erstellen der TF-IDF-Matrix tfidf_matrix = vectorizer.fit_transform(texts) ########################################################## ############# weitere Hilsffunktionen #################### def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_docs = {} num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] page = doc['pages'][page_index] page_content = page['content'] header_content = page.get('header', '') # Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist index_in_content = page_content.lower().find(query.lower()) index_in_header = header_content.lower().find(query.lower()) if index_in_content != -1 or index_in_header != -1: # Erstellen Sie einen Snippet für die Suchergebnisse start = max(0, index_in_content - 400) if index_in_content != -1 else 0 end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content) snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
" # Fügen Sie die Überschrift hinzu, falls vorhanden if header_content: snippet += f"Überschrift: {header_content}
" snippet += f"...{page_content[start:end]}...

" relevant_text += snippet if doc['file'] not in relevant_docs: relevant_docs[doc['file']] = [] relevant_docs[doc['file']].append(snippet) results = list(relevant_docs.keys()) return results, relevant_text ###################################### #nach relevanten suche -> download Link der passenden Dokuemtne erstellen def download_link(doc_name): # URL für das Herunterladen der Datei file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}" return f'{doc_name}' ####################################### #Suche starten und View aktialisieren def search_and_update(query): if not query.strip(): return "
Bitte geben Sie einen Suchbegriff ein.
", "
Bitte geben Sie einen Suchbegriff ein.
" relevant_docs, relevant_text = search_documents(query) if not relevant_docs: doc_links = "
Keine passenden Dokumente gefunden.
" else: doc_links = "" for doc in relevant_docs: doc_links += download_link(doc) + "
" if not relevant_text: relevant_text = "
Kein relevanter Text gefunden.
" return doc_links, relevant_text ###################################### # File hochladen über sep Tab #muss async sein, da der hauptthread weitergeht, während das dokument zu ende hochlädt async def wait_for_file(file_path, timeout=10): for _ in range(timeout * 2): # Timeout in 0.5-Sekunden-Schritten if os.path.exists(file_path): return True await asyncio.sleep(0.5) return False async def upload_pdf_word(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen (überschreiben, falls vorhanden) upload_path = f"kkg_dokumente/{filename}" try: await asyncio.to_thread(api.upload_file, path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=REPO_ID, repo_type=REPO_TYPE, token=HF_WRITE) except Exception as e: return None, f"Fehler beim Hochladen der Datei: {str(e)}" # Warten, bis die Datei vorhanden ist local_path = os.path.join(DOCS_DIR, filename) file_exists = await wait_for_file(local_path) if not file_exists: return None, f"Fehler: Datei '{local_path}' nicht gefunden." # Text extrahieren basierend auf Dateityp try: if filename.endswith('.pdf'): pages = await asyncio.to_thread(extract_text_from_pdf, local_path) pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages]) headings = [page['header'] for page in pages] elif filename.endswith('.docx'): pages = await asyncio.to_thread(extract_text_from_docx, local_path) pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages]) headings = [page['header'] for page in pages] else: return None, "Nicht unterstütztes Dateiformat (nur PDF und Word hochladen)" except Exception as e: return None, f"Fehler bei der Verarbeitung der Datei: {str(e)}" try: if filename.endswith('.pdf'): pages = extract_text_from_pdf(upload_path) pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages]) headings = [page['header'] for page in pages] elif filename.endswith('.docx'): pages = extract_text_from_docx(upload_path) pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages]) headings = [page['header'] for page in pages] else: return None, "Nicht unterstütztes Dateiformat (nur PDF und Word hochladen)" return f"Datei '{filename}' erfolgreich hochgeladen und verarbeitet." except Exception as e: return None, f"Fehler bei der Verarbeitung der Datei: {str(e)}" # Text extrahieren basierend auf Dateityp -> um ihn in die documents zu packen für spätere suche nach relevatnen teten bei der suche if filename.endswith('.pdf'): pages = extract_text_from_pdf(upload_path) pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages]) headings = [page['header'] for page in pages] elif filename.endswith('.docx'): docx_data = extract_text_from_docx(upload_path) pages_text = docx_data["content"] headings = docx_data["headings"] else: return None, "Nicht unterstütztes Dateiformat (nur PDF und Word hochladen)" # Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix #documents.append({"file": filename, "pages": pages_text, "headings": headings}) def upload_filebackup(file): file_name = file.name file_path = os.path.join(DOCS_DIR, file_name) # Debugging-Ausgabe: Überprüfen Sie, ob das Verzeichnis existiert if not os.path.exists(DOCS_DIR): print(f"Verzeichnis {DOCS_DIR} existiert nicht. Erstelle Verzeichnis.") os.makedirs(DOCS_DIR) # Debugging-Ausgabe: Dateiname und Pfad print(f"Speichere Datei nach {file_path}") with open(file_path, "wb") as f: f.write(file) # Überprüfen, ob die Datei korrekt gespeichert wurde if os.path.exists(file_path): print(f"Datei erfolgreich gespeichert: {file_path}") else: print(f"Fehler beim Speichern der Datei: {file_path}") # Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix pages_text = extract_text_from_pdf(file_path) documents.append({"file": file_name, "pages": pages_text}) global tfidf_matrix tfidf_matrix = vectorizer.fit_transform([page for doc in documents for page in doc['pages']]) return search_and_update("") def upload_pdf_wordback(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen upload_path = f"kkg_dokumente/{filename}" api.upload_file( path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=REPO_ID, repo_type=REPO_TYPE, token=HF_WRITE ) print("filepath ........................"+str(upload_path)) print("filenaem ........................"+str(filename)) # Text extrahieren basierend auf Dateityp if filename.endswith('.pdf'): pages_text, headings = extract_text_from_pdf(upload_path) elif filename.endswith('.docx'): pages_text, headings = extract_text_from_docx(upload_path) else: return None, "Unsupported file format" # Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix #documents.append({"file": filename, "pages": pages_text, "headings": headings}) return f"Datei '{filename}' erfolgreich hochgeladen und verarbeitet." def upload_pdf_wordback(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen upload_path = f"kkg_dokumente/{filename}" api.upload_file( path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=REPO_ID, repo_type=REPO_TYPE, token=HF_WRITE ) # Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix #pages_text = extract_text_from_pdf(upload_path) #documents.append({"file": filename, "pages": pages_text}) return f"PDF '{filename}' erfolgreich hochgeladen." def run_async_upload(file): # Wrapper, um die asynchrone Funktion auszuführen return asyncio.run(upload_pdf_word(file)) ###################################################################### ############### Anwendung starten #################################### with gr.Blocks(css= .results { background-color: #f0f0f0; padding: 10px; border-radius: 5px; overflow-y: auto; max-height: 400px; } .no-results { color: red; } .doc-name { font-weight: bold; color: #007BFF; } .page-number { font-weight: bold; color: #FF5733; } ) as demo: with gr.Tab("Suche"): query_input = gr.Textbox(label="Suchbegriff") with gr.Row(): with gr.Column(scale=2): doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) with gr.Column(scale=1): relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text]) with gr.Tab("Datei hochladen"): upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") output_text = gr.Textbox(label="Status") upload_button = gr.Button("Datei hochladen") upload_button.click(fn=run_async_upload, inputs=upload_pdf_file, outputs=output_text) demo.launch() # Initialisieren der Gradio-Oberfläche with gr.Blocks(css= .results { background-color: #f0f0f0; padding: 10px; border-radius: 5px; overflow-y: auto; max-height: 400px; } .no-results { color: red; } .doc-name { font-weight: bold; color: #007BFF; } .page-number { font-weight: bold; color: #FF5733; } ) as demo: with gr.Tab("Suche"): query_input = gr.Textbox(label="Suchbegriff") with gr.Row(): with gr.Column(scale=2): doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) with gr.Column(scale=1): relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text]) with gr.Tab("Datei Upload"): upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") upload_status = gr.Textbox(label="Status") upload_button = gr.Button("Upload") upload_button.click(upload_pdf_word, inputs=upload_pdf_file, outputs=upload_status) with gr.Tab("PDF Auswahl und Anzeige"): pdf_dropdown = gr.Dropdown(label="Wählen Sie eine PDF-Datei", choices=list_pdfs()) query = gr.Textbox(label="Suchanfrage", type="text") display_status = gr.Textbox(label="Status") display_button = gr.Button("Anzeigen") with gr.Row(): pdf_image = gr.Image(label="PDF-Seite als Bild", type="filepath") relevant_text = gr.Textbox(label="Relevanter Text", lines=10) display_button.click(display_pdf, inputs=[pdf_dropdown], outputs=[pdf_image, display_status]) # Automatische Aktualisierung der Dropdown-Liste nach dem Hochladen einer PDF-Datei #upload_button.click(update_dropdown, inputs=None, outputs=pdf_dropdown) #upload_button.click(lambda: pdf_dropdown.update(choices=list_pdfs()), outputs=pdf_dropdown) demo.launch(share=True) """ """ # Funktion zum Extrahieren des Textes aus einer PDF-Datei def extract_text_from_pdf(pdf_path): doc = fitz.open(pdf_path) text = [] for page in doc: text.append(page.get_text()) return text # Dynamische Erstellung der Dokumentenliste und Extraktion der Texte documents = [] for file_name in os.listdir(SAVE_DIR): if file_name.endswith(".pdf"): pdf_path = os.path.join(SAVE_DIR, file_name) pages_text = extract_text_from_pdf(pdf_path) documents.append({"file": file_name, "pages": pages_text}) # TF-IDF Vectorizer vorbereiten vectorizer = TfidfVectorizer() tfidf_matrix = vectorizer.fit_transform([page for doc in documents for page in doc['pages']]) #################################################### def search_documents(query): if not query: return [doc['file'] for doc in documents], "", [] query_vector = vectorizer.transform([query]) cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten() related_docs_indices = cosine_similarities.argsort()[::-1] results = [] relevant_text = "" relevant_pdfs = [] num_pages_per_doc = [len(doc['pages']) for doc in documents] cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] for i in related_docs_indices: if cosine_similarities[i] > 0: doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative) page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] doc = documents[doc_index] results.append(doc['file']) page_content = doc['pages'][page_index] index = page_content.lower().find(query.lower()) if index != -1: start = max(0, index - 400) end = min(len(page_content), index + 400) relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n" relevant_pdfs.append((doc['file'], page_index)) return results, relevant_text, relevant_pdfs def download_link(doc_name): file_path = os.path.join(SAVE_DIR, doc_name) file_url = f"/kkg_dokumente/{doc_name}" return f'{doc_name}' def update_display(selected_pdf): return display_document(selected_pdf) def update_dropdown(): return gr.Dropdown.update(choices=list_pdfs()) def search_and_updateBack(query): results, rel_text, relevant_pdfs = search_documents(query) pdf_html = "" images = [] temp_dir = tempfile.mkdtemp() for pdf, page in relevant_pdfs: pdf_path = os.path.join(SAVE_DIR, pdf) document = fitz.open(pdf_path) # Seite als Integer umwandeln page_num = int(page) page = document.load_page(page_num) pix = page.get_pixmap() img_path = os.path.join(temp_dir, f"{pdf}_page_{page.number}.png") pix.save(img_path) images.append(img_path) return images, rel_text def search_and_update(query): relevant_docs, relevant_text, _ = search_documents(query) doc_links = "" for doc in relevant_docs: doc_links += download_link(doc) + "
" return doc_links, relevant_text def upload_pdf(file): if file is None: return None, "Keine Datei hochgeladen." # Extrahieren des Dateinamens aus dem vollen Pfad filename = os.path.basename(file.name) # Datei zum Hugging Face Space hochladen upload_path = f"kkg_dokumente/{filename}" api.upload_file( path_or_fileobj=file.name, path_in_repo=upload_path, repo_id=REPO_ID, repo_type=REPO_TYPE, token=HF_WRITE ) return f"PDF '{filename}' erfolgreich hochgeladen." def list_pdfs(): if not os.path.exists(SAVE_DIR): return [] return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] def display_pdf(selected_pdf): pdf_path = os.path.join(SAVE_DIR, selected_pdf) # PDF-URL im Hugging Face Space encoded_pdf_name = urllib.parse.quote(selected_pdf) pdf_url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/kkg_dokumente/{encoded_pdf_name}" # PDF von der URL herunterladen headers = {"Authorization": f"Bearer {HF_READ}"} response = requests.get(pdf_url, headers=headers) if response.status_code == 200: with open(pdf_path, 'wb') as f: f.write(response.content) else: return None, f"Fehler beim Herunterladen der PDF-Datei von {pdf_url}" # PDF in Bilder umwandeln document = fitz.open(pdf_path) temp_dir = tempfile.mkdtemp() # Nur die erste Seite als Bild speichern page = document.load_page(0) pix = page.get_pixmap() img_path = os.path.join(temp_dir, f"page_0.png") pix.save(img_path) status = f"PDF '{selected_pdf}' erfolgreich geladen und verarbeitet." return img_path, status ############################################################## with gr.Blocks() as demo: with gr.Tab("Upload PDF"): upload_pdf_file = gr.File(label="PDF-Datei hochladen") upload_status = gr.Textbox(label="Status") upload_button = gr.Button("Upload") upload_button.click(upload_pdf, inputs=upload_pdf_file, outputs=upload_status) with gr.Tab("PDF Auswahl und Anzeige"): pdf_dropdown = gr.Dropdown(label="Wählen Sie eine PDF-Datei", choices=list_pdfs()) query = gr.Textbox(label="Suchanfrage", type="text") display_status = gr.Textbox(label="Status") display_button = gr.Button("Anzeigen") with gr.Row(): pdf_image = gr.Image(label="PDF-Seite als Bild", type="filepath") relevant_text = gr.Textbox(label="Relevanter Text", lines=10) display_button.click(display_pdf, inputs=[pdf_dropdown], outputs=[pdf_image, display_status]) with gr.Tab("Suche"): search_query = gr.Textbox(label="Suchanfrage") search_button = gr.Button("Suchen") with gr.Row(): search_results = gr.Gallery(label="Relevante PDFs", type="filepath") search_text = gr.Textbox(label="Relevanter Text", lines=10) search_button.click(search_and_update, inputs=search_query, outputs=[search_results, search_text]) # Automatische Aktualisierung der Dropdown-Liste nach dem Hochladen einer PDF-Datei #upload_button.click(update_dropdown, inputs=None, outputs=pdf_dropdown) #upload_button.click(lambda: pdf_dropdown.update(choices=list_pdfs()), outputs=pdf_dropdown) demo.launch(share=True) """