import gradio as gr
import fitz # PyMuPDF
import os
import requests
from huggingface_hub import HfApi
import base64
from io import BytesIO
import urllib.parse
import tempfile
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from docx import Document
import asyncio
import docx
from transformers import AutoTokenizer, AutoModel
from sentence_transformers import SentenceTransformer
from fuzzywuzzy import fuzz
from nltk.stem import PorterStemmer
import torch
# Zugriff auf das Secret als Umgebungsvariable
HF_WRITE = os.getenv("HF_WRITE")
HF_READ = os.getenv("HF_READ")
# CONSTANTS
REPO_ID = "alexkueck/kkg_suche"
REPO_TYPE = "space"
SAVE_DIR = "kkg_dokumente"
# HfApi-Instanz erstellen
api = HfApi()
# Relativer Pfad zum Verzeichnis mit den Dokumenten
DOCS_DIR = "kkg_dokumente"
########################################################
##########Ki Modell für Embeddings der Suchanfrage nutzen
# Laden des Sentence-Transformer-Modells
model = SentenceTransformer('paraphrase-MiniLM-L6-v2')
# Funktion zum Extrahieren von Text aus PDF
def extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
pages = []
for page in doc:
text = page.get_text()
# Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist
lines = text.split('\n')
header = lines[0] if lines else ''
content = '\n'.join(lines[1:]) if len(lines) > 1 else ''
pages.append({'header': header, 'content': content})
return pages
def extract_text_from_docx(docx_path):
doc = Document(docx_path)
pages = []
current_page = []
header = ''
for para in doc.paragraphs:
if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen
if current_page:
pages.append({'header': header, 'content': '\n'.join(current_page)})
current_page = []
header = para.text
else:
current_page.append(para.text)
if current_page: # Letzte Seite hinzufügen
pages.append({'header': header, 'content': '\n'.join(current_page)})
return pages
# Initialisierung der Dokumente
def initialize_documents():
documents = []
if os.path.exists(DOCS_DIR):
for file_name in os.listdir(DOCS_DIR):
if file_name.endswith(".pdf"):
pdf_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_pdf(pdf_path)
documents.append({"file": file_name, "pages": pages})
elif file_name.endswith(".docx"):
docx_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_docx(docx_path)
documents.append({"file": file_name, "pages": pages})
return documents
# Hochladen von Dateien
def upload_pdf(file):
if file is None:
return None, "Keine Datei hochgeladen."
# Extrahieren des Dateinamens aus dem vollen Pfad
filename = os.path.basename(file.name)
# Datei zum Hugging Face Space hochladen
upload_path = f"kkg_dokumente/{filename}"
api.upload_file(
path_or_fileobj=file.name,
path_in_repo=upload_path,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
token=HF_WRITE
)
return f"PDF '{filename}' erfolgreich hochgeladen."
def list_pdfs():
if not os.path.exists(SAVE_DIR):
return []
return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')]
######################################
#nach relevanten suche -> download Link der passenden Dokuemtne erstellen
def download_link(doc_name):
# URL für das Herunterladen der Datei
file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}"
return f'{doc_name}'
###########################################################
############# KI um Suchanfrage zu Embedden ###############
# Funktion zur Berechnung der Embeddings
def get_embeddings(texts):
return model.encode(texts, convert_to_tensor=True)
#um ähnliche Wörter anhand ihres Wortstammes zu erkennen
# Stemming-Funktion
def stem_text(text):
ps = PorterStemmer()
return ' '.join([ps.stem(word) for word in text.split()])
# Durchsuchen von Dokumenten
def search_documents(query):
documents = initialize_documents()
# Texte und Überschriften in die Embeddings aufnehmen
texts = [page['content'] for doc in documents for page in doc['pages']]
stemmed_texts = [stem_text(text) for text in texts]
text_embeddings = get_embeddings(stemmed_texts)
# Stemming des Queries
stemmed_query = stem_text(query)
# Embedding des Queries
query_embedding = get_embeddings([stemmed_query])
# Berechnung der Ähnlichkeit
similarities = cosine_similarity(query_embedding.cpu(), text_embeddings.cpu()).flatten()
# Sortieren nach Relevanz
related_docs_indices = similarities.argsort()[::-1]
results=[]
relevant_text = ""
relevant_docs = {}
num_pages_per_doc = [len(doc['pages']) for doc in documents]
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))]
for i in related_docs_indices:
if similarities[i] > 0:
doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative)
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1]
doc = documents[doc_index]
page = doc['pages'][page_index]
page_content = page['content']
header_content = page.get('header', '')
# Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist
index_in_content = page_content.lower().find(query.lower())
index_in_header = header_content.lower().find(query.lower())
# Berücksichtigung der Levenshtein-Distanz
words_in_query = stemmed_query.split()
page_words = stem_text(page_content).split()
if index_in_content != -1 or index_in_header != -1 or any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words): # <--- Integration von fuzz.ratio für jedes Wort
# Erstellen Sie einen Snippet für die Suchergebnisse
start = max(0, index_in_content - 400) if index_in_content != -1 else 0
end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content)
snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
"
# Fügen Sie die Überschrift hinzu, falls vorhanden
if header_content:
snippet += f"Überschrift: {header_content}
"
snippet += f"...{page_content[start:end]}...
"
relevant_text += snippet
if doc['file'] not in relevant_docs:
relevant_docs[doc['file']] = []
relevant_docs[doc['file']].append(snippet)
# Sortieren nach Relevanz
results = sorted(results, key=lambda x: x[1], reverse=True)
results = [res[0] for res in results]
results = list(relevant_docs.keys())
return results, relevant_text
#######################################
#Suche starten und View aktialisieren
def search_and_update(query):
if not query.strip():
return "
Bitte geben Sie einen Suchbegriff ein.
", "Bitte geben Sie einen Suchbegriff ein.
"
relevant_docs, relevant_text = search_documents(query)
if not relevant_docs:
doc_links = "Keine passenden Dokumente gefunden.
"
else:
doc_links = ""
for doc in relevant_docs:
doc_links += download_link(doc) + "
"
if not relevant_text:
relevant_text = "Kein relevanter Text gefunden.
"
return doc_links, relevant_text
######################################################################
############### Anwendung starten ####################################
with gr.Blocks(css="""
.results {
background-color: #f0f0f0;
padding: 10px;
border-radius: 5px;
overflow-y: auto;
max-height: 400px;
}
.no-results {
color: red;
}
.doc-name {
font-weight: bold;
color: #DDA0DD;
}
.page-number {
font-weight: bold;
color: #FF5733;
}
#doc_links, #relevant_text {
color: #DDA0DD;
background-color: #333333; /* Sehr dunkles Grau */
padding: 10px; /* Innenabstand */
border-radius: 5px; /* Abgerundete Ecken */
max-height: 400px; /* Max Höhe für bessere Lesbarkeit */
overflow-y: auto; /* Vertikale Scrollbalken */
}
""") as demo:
with gr.Tab("Suche"):
query_input = gr.Textbox(label="Suchbegriff")
with gr.Row():
with gr.Column(scale=1):
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False)
with gr.Column(scale=2):
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False)
query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text], show_progress="full")
with gr.Tab("Datei hochladen"):
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen")
output_text = gr.Textbox(label="Status")
upload_button = gr.Button("Datei hochladen")
upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text)
demo.launch()
"""
###############################funktioniert - suche nicht gut genug#####################################################################
############################################################
####### Hilfsfunktionen
#alle documente den text extrahieren und die einzelen texte als dictionay zusammenstellen - mit überschrift, seitenzahl und text
def initialize_documents():
documents = []
if os.path.exists(DOCS_DIR):
for file_name in os.listdir(DOCS_DIR):
if file_name.endswith(".pdf"):
pdf_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_pdf(pdf_path)
documents.append({"file": file_name, "pages": pages})
elif file_name.endswith(".docx"):
docx_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_docx(docx_path)
documents.append({"file": file_name, "pages": pages})
return documents
# Funktion zum Extrahieren des Textes aus einer PDF-Datei
def extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
pages = []
for page in doc:
text = page.get_text()
# Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist
lines = text.split('\n')
header = lines[0] if lines else ''
content = '\n'.join(lines[1:]) if len(lines) > 1 else ''
pages.append({'header': header, 'content': content})
return pages
def extract_text_from_docx(docx_path):
doc = Document(docx_path)
pages = []
current_page = []
header = ''
for para in doc.paragraphs:
if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen
if current_page:
pages.append({'header': header, 'content': '\n'.join(current_page)})
current_page = []
header = para.text
else:
current_page.append(para.text)
if current_page: # Letzte Seite hinzufügen
pages.append({'header': header, 'content': '\n'.join(current_page)})
return pages
###########################################################
#################### Suche init ###########################
# Dynamische Erstellung der Dokumentenliste und Extraktion der Texte
print("Martix init ........................")
#documents = initialize_documents()
# Initialisierung des Vektorizersvectorizer = TfidfVectorizer()
# Extrahieren der Texte aus den Dokumentseiten
#texts = [page['content'] for doc in documents for page in doc['pages']]
# Erstellen der TF-IDF-Matrix
#tfidf_matrix = vectorizer.fit_transform(texts)
##########################################################
############# weitere Hilsffunktionen ####################
def search_documents(query):
if not query:
return [doc['file'] for doc in documents], "", []
#Dokumente durchgehen und Vektorisieren für die anschließende Suche
documents = initialize_documents()
vectorizer = TfidfVectorizer()
texts = [page['content'] for doc in documents for page in doc['pages']]
tfidf_matrix = vectorizer.fit_transform(texts)
#Sucha nach relvanten dokumenten
query_vector = vectorizer.transform([query])
cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten()
related_docs_indices = cosine_similarities.argsort()[::-1]
results = []
relevant_text = ""
relevant_docs = {}
num_pages_per_doc = [len(doc['pages']) for doc in documents]
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))]
for i in related_docs_indices:
if cosine_similarities[i] > 0:
doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative)
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1]
doc = documents[doc_index]
page = doc['pages'][page_index]
page_content = page['content']
header_content = page.get('header', '')
# Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist
index_in_content = page_content.lower().find(query.lower())
index_in_header = header_content.lower().find(query.lower())
if index_in_content != -1 or index_in_header != -1:
# Erstellen Sie einen Snippet für die Suchergebnisse
start = max(0, index_in_content - 400) if index_in_content != -1 else 0
end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content)
snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
"
# Fügen Sie die Überschrift hinzu, falls vorhanden
if header_content:
snippet += f"Überschrift: {header_content}
"
snippet += f"...{page_content[start:end]}...
"
relevant_text += snippet
if doc['file'] not in relevant_docs:
relevant_docs[doc['file']] = []
relevant_docs[doc['file']].append(snippet)
results = list(relevant_docs.keys())
return results, relevant_text
def update_display(selected_pdf):
return display_document(selected_pdf)
def update_dropdown():
return gr.Dropdown.update(choices=list_pdfs())
def search_and_updateBack(query):
results, rel_text, relevant_pdfs = search_documents(query)
pdf_html = ""
images = []
temp_dir = tempfile.mkdtemp()
for pdf, page in relevant_pdfs:
pdf_path = os.path.join(SAVE_DIR, pdf)
document = fitz.open(pdf_path)
# Seite als Integer umwandeln
page_num = int(page)
page = document.load_page(page_num)
pix = page.get_pixmap()
img_path = os.path.join(temp_dir, f"{pdf}_page_{page.number}.png")
pix.save(img_path)
images.append(img_path)
return images, rel_text
def search_and_updateback(query):
relevant_docs, relevant_text, _ = search_documents(query)
doc_links = ""
for doc in relevant_docs:
doc_links += download_link(doc) + "
"
return doc_links, relevant_text
def upload_pdf(file):
if file is None:
return None, "Keine Datei hochgeladen."
# Extrahieren des Dateinamens aus dem vollen Pfad
filename = os.path.basename(file.name)
# Datei zum Hugging Face Space hochladen
upload_path = f"kkg_dokumente/{filename}"
api.upload_file(
path_or_fileobj=file.name,
path_in_repo=upload_path,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
token=HF_WRITE
)
return f"PDF '{filename}' erfolgreich hochgeladen."
def list_pdfs():
if not os.path.exists(SAVE_DIR):
return []
return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')]
######################################
#nach relevanten suche -> download Link der passenden Dokuemtne erstellen
def download_link(doc_name):
# URL für das Herunterladen der Datei
file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}"
return f'{doc_name}'
#######################################
#Suche starten und View aktialisieren
def search_and_update(query):
if not query.strip():
return "Bitte geben Sie einen Suchbegriff ein.
", "Bitte geben Sie einen Suchbegriff ein.
"
relevant_docs, relevant_text = search_documents(query)
if not relevant_docs:
doc_links = "Keine passenden Dokumente gefunden.
"
else:
doc_links = ""
for doc in relevant_docs:
doc_links += download_link(doc) + "
"
if not relevant_text:
relevant_text = "Kein relevanter Text gefunden.
"
return doc_links, relevant_text
######################################################################
############### Anwendung starten ####################################
with gr.Blocks(css=
.results {
background-color: #f0f0f0;
padding: 10px;
border-radius: 5px;
overflow-y: auto;
max-height: 400px;
}
.no-results {
color: red;
}
.doc-name {
font-weight: bold;
color: #007BFF;
}
.page-number {
font-weight: bold;
color: #FF5733;
}
) as demo:
with gr.Tab("Suche"):
query_input = gr.Textbox(label="Suchbegriff")
with gr.Row():
with gr.Column(scale=2):
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False)
with gr.Column(scale=1):
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False)
query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text])
with gr.Tab("Datei hochladen"):
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen")
output_text = gr.Textbox(label="Status")
upload_button = gr.Button("Datei hochladen")
upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text)
demo.launch()
"""
"""
def initialize_documents():
documents = []
if os.path.exists(DOCS_DIR):
for file_name in os.listdir(DOCS_DIR):
if file_name.endswith(".pdf"):
pdf_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_pdf(pdf_path)
documents.append({"file": file_name, "pages": pages})
elif file_name.endswith(".docx"):
docx_path = os.path.join(DOCS_DIR, file_name)
pages = extract_text_from_docx(docx_path)
documents.append({"file": file_name, "pages": pages})
return documents
# Funktion zum Extrahieren des Textes aus einer PDF-Datei oder Word-Datei
def extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
pages = []
for page in doc:
text = page.get_text()
# Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist
lines = text.split('\n')
header = lines[0] if lines else ''
content = '\n'.join(lines[1:]) if len(lines) > 1 else ''
pages.append({'header': header, 'content': content})
return pages
def extract_text_from_docx(docx_path):
doc = Document(docx_path)
pages = []
current_page = []
header = ''
for para in doc.paragraphs:
if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen
if current_page:
pages.append({'header': header, 'content': '\n'.join(current_page)})
current_page = []
header = para.text
else:
current_page.append(para.text)
if current_page: # Letzte Seite hinzufügen
pages.append({'header': header, 'content': '\n'.join(current_page)})
return pages
###########################################################
#################### Suche init ###########################
# Dynamische Erstellung der Dokumentenliste und Extraktion der Texte
print("Martix init ........................")
documents = initialize_documents()
# Initialisierung des Vektorizers
vectorizer = TfidfVectorizer()
# Extrahieren der Texte aus den Dokumentseiten
texts = [page['content'] for doc in documents for page in doc['pages']]
# Erstellen der TF-IDF-Matrix
tfidf_matrix = vectorizer.fit_transform(texts)
##########################################################
############# weitere Hilsffunktionen ####################
def search_documents(query):
if not query:
return [doc['file'] for doc in documents], "", []
query_vector = vectorizer.transform([query])
cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten()
related_docs_indices = cosine_similarities.argsort()[::-1]
results = []
relevant_text = ""
relevant_docs = {}
num_pages_per_doc = [len(doc['pages']) for doc in documents]
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))]
for i in related_docs_indices:
if cosine_similarities[i] > 0:
doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative)
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1]
doc = documents[doc_index]
page = doc['pages'][page_index]
page_content = page['content']
header_content = page.get('header', '')
# Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist
index_in_content = page_content.lower().find(query.lower())
index_in_header = header_content.lower().find(query.lower())
if index_in_content != -1 or index_in_header != -1:
# Erstellen Sie einen Snippet für die Suchergebnisse
start = max(0, index_in_content - 400) if index_in_content != -1 else 0
end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content)
snippet = f"Aus {doc['file']} (Seite {page_index + 1}):
"
# Fügen Sie die Überschrift hinzu, falls vorhanden
if header_content:
snippet += f"Überschrift: {header_content}
"
snippet += f"...{page_content[start:end]}...
"
relevant_text += snippet
if doc['file'] not in relevant_docs:
relevant_docs[doc['file']] = []
relevant_docs[doc['file']].append(snippet)
results = list(relevant_docs.keys())
return results, relevant_text
######################################
#nach relevanten suche -> download Link der passenden Dokuemtne erstellen
def download_link(doc_name):
# URL für das Herunterladen der Datei
file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}"
return f'{doc_name}'
#######################################
#Suche starten und View aktialisieren
def search_and_update(query):
if not query.strip():
return "Bitte geben Sie einen Suchbegriff ein.
", "Bitte geben Sie einen Suchbegriff ein.
"
relevant_docs, relevant_text = search_documents(query)
if not relevant_docs:
doc_links = "Keine passenden Dokumente gefunden.
"
else:
doc_links = ""
for doc in relevant_docs:
doc_links += download_link(doc) + "
"
if not relevant_text:
relevant_text = "Kein relevanter Text gefunden.
"
return doc_links, relevant_text
######################################
# File hochladen über sep Tab
#muss async sein, da der hauptthread weitergeht, während das dokument zu ende hochlädt
async def wait_for_file(file_path, timeout=10):
for _ in range(timeout * 2): # Timeout in 0.5-Sekunden-Schritten
if os.path.exists(file_path):
return True
await asyncio.sleep(0.5)
return False
async def upload_pdf_word(file):
if file is None:
return None, "Keine Datei hochgeladen."
# Extrahieren des Dateinamens aus dem vollen Pfad
filename = os.path.basename(file.name)
# Datei zum Hugging Face Space hochladen (überschreiben, falls vorhanden)
upload_path = f"kkg_dokumente/{filename}"
try:
await asyncio.to_thread(api.upload_file,
path_or_fileobj=file.name,
path_in_repo=upload_path,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
token=HF_WRITE)
except Exception as e:
return None, f"Fehler beim Hochladen der Datei: {str(e)}"
# Warten, bis die Datei vorhanden ist
local_path = os.path.join(DOCS_DIR, filename)
file_exists = await wait_for_file(local_path)
if not file_exists:
return None, f"Fehler: Datei '{local_path}' nicht gefunden."
# Text extrahieren basierend auf Dateityp
try:
if filename.endswith('.pdf'):
pages = await asyncio.to_thread(extract_text_from_pdf, local_path)
pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages])
headings = [page['header'] for page in pages]
elif filename.endswith('.docx'):
pages = await asyncio.to_thread(extract_text_from_docx, local_path)
pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages])
headings = [page['header'] for page in pages]
else:
return None, "Nicht unterstütztes Dateiformat (nur PDF und Word hochladen)"
except Exception as e:
return None, f"Fehler bei der Verarbeitung der Datei: {str(e)}"
try:
if filename.endswith('.pdf'):
pages = extract_text_from_pdf(upload_path)
pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages])
headings = [page['header'] for page in pages]
elif filename.endswith('.docx'):
pages = extract_text_from_docx(upload_path)
pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages])
headings = [page['header'] for page in pages]
else:
return None, "Nicht unterstütztes Dateiformat (nur PDF und Word hochladen)"
return f"Datei '{filename}' erfolgreich hochgeladen und verarbeitet."
except Exception as e:
return None, f"Fehler bei der Verarbeitung der Datei: {str(e)}"
# Text extrahieren basierend auf Dateityp -> um ihn in die documents zu packen für spätere suche nach relevatnen teten bei der suche
if filename.endswith('.pdf'):
pages = extract_text_from_pdf(upload_path)
pages_text = '\n\n'.join([f"Header: {page['header']}\nContent: {page['content']}" for page in pages])
headings = [page['header'] for page in pages]
elif filename.endswith('.docx'):
docx_data = extract_text_from_docx(upload_path)
pages_text = docx_data["content"]
headings = docx_data["headings"]
else:
return None, "Nicht unterstütztes Dateiformat (nur PDF und Word hochladen)"
# Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix
#documents.append({"file": filename, "pages": pages_text, "headings": headings})
def upload_filebackup(file):
file_name = file.name
file_path = os.path.join(DOCS_DIR, file_name)
# Debugging-Ausgabe: Überprüfen Sie, ob das Verzeichnis existiert
if not os.path.exists(DOCS_DIR):
print(f"Verzeichnis {DOCS_DIR} existiert nicht. Erstelle Verzeichnis.")
os.makedirs(DOCS_DIR)
# Debugging-Ausgabe: Dateiname und Pfad
print(f"Speichere Datei nach {file_path}")
with open(file_path, "wb") as f:
f.write(file)
# Überprüfen, ob die Datei korrekt gespeichert wurde
if os.path.exists(file_path):
print(f"Datei erfolgreich gespeichert: {file_path}")
else:
print(f"Fehler beim Speichern der Datei: {file_path}")
# Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix
pages_text = extract_text_from_pdf(file_path)
documents.append({"file": file_name, "pages": pages_text})
global tfidf_matrix
tfidf_matrix = vectorizer.fit_transform([page for doc in documents for page in doc['pages']])
return search_and_update("")
def upload_pdf_wordback(file):
if file is None:
return None, "Keine Datei hochgeladen."
# Extrahieren des Dateinamens aus dem vollen Pfad
filename = os.path.basename(file.name)
# Datei zum Hugging Face Space hochladen
upload_path = f"kkg_dokumente/{filename}"
api.upload_file(
path_or_fileobj=file.name,
path_in_repo=upload_path,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
token=HF_WRITE
)
print("filepath ........................"+str(upload_path))
print("filenaem ........................"+str(filename))
# Text extrahieren basierend auf Dateityp
if filename.endswith('.pdf'):
pages_text, headings = extract_text_from_pdf(upload_path)
elif filename.endswith('.docx'):
pages_text, headings = extract_text_from_docx(upload_path)
else:
return None, "Unsupported file format"
# Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix
#documents.append({"file": filename, "pages": pages_text, "headings": headings})
return f"Datei '{filename}' erfolgreich hochgeladen und verarbeitet."
def upload_pdf_wordback(file):
if file is None:
return None, "Keine Datei hochgeladen."
# Extrahieren des Dateinamens aus dem vollen Pfad
filename = os.path.basename(file.name)
# Datei zum Hugging Face Space hochladen
upload_path = f"kkg_dokumente/{filename}"
api.upload_file(
path_or_fileobj=file.name,
path_in_repo=upload_path,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
token=HF_WRITE
)
# Aktualisieren Sie die Dokumentenliste und die TF-IDF-Matrix
#pages_text = extract_text_from_pdf(upload_path)
#documents.append({"file": filename, "pages": pages_text})
return f"PDF '{filename}' erfolgreich hochgeladen."
def run_async_upload(file):
# Wrapper, um die asynchrone Funktion auszuführen
return asyncio.run(upload_pdf_word(file))
######################################################################
############### Anwendung starten ####################################
with gr.Blocks(css=
.results {
background-color: #f0f0f0;
padding: 10px;
border-radius: 5px;
overflow-y: auto;
max-height: 400px;
}
.no-results {
color: red;
}
.doc-name {
font-weight: bold;
color: #007BFF;
}
.page-number {
font-weight: bold;
color: #FF5733;
}
) as demo:
with gr.Tab("Suche"):
query_input = gr.Textbox(label="Suchbegriff")
with gr.Row():
with gr.Column(scale=2):
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False)
with gr.Column(scale=1):
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False)
query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text])
with gr.Tab("Datei hochladen"):
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen")
output_text = gr.Textbox(label="Status")
upload_button = gr.Button("Datei hochladen")
upload_button.click(fn=run_async_upload, inputs=upload_pdf_file, outputs=output_text)
demo.launch()
# Initialisieren der Gradio-Oberfläche
with gr.Blocks(css=
.results {
background-color: #f0f0f0;
padding: 10px;
border-radius: 5px;
overflow-y: auto;
max-height: 400px;
}
.no-results {
color: red;
}
.doc-name {
font-weight: bold;
color: #007BFF;
}
.page-number {
font-weight: bold;
color: #FF5733;
}
) as demo:
with gr.Tab("Suche"):
query_input = gr.Textbox(label="Suchbegriff")
with gr.Row():
with gr.Column(scale=2):
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False)
with gr.Column(scale=1):
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False)
query_input.submit(search_and_update, inputs=[query_input], outputs=[doc_links, relevant_text])
with gr.Tab("Datei Upload"):
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen")
upload_status = gr.Textbox(label="Status")
upload_button = gr.Button("Upload")
upload_button.click(upload_pdf_word, inputs=upload_pdf_file, outputs=upload_status)
with gr.Tab("PDF Auswahl und Anzeige"):
pdf_dropdown = gr.Dropdown(label="Wählen Sie eine PDF-Datei", choices=list_pdfs())
query = gr.Textbox(label="Suchanfrage", type="text")
display_status = gr.Textbox(label="Status")
display_button = gr.Button("Anzeigen")
with gr.Row():
pdf_image = gr.Image(label="PDF-Seite als Bild", type="filepath")
relevant_text = gr.Textbox(label="Relevanter Text", lines=10)
display_button.click(display_pdf, inputs=[pdf_dropdown], outputs=[pdf_image, display_status])
# Automatische Aktualisierung der Dropdown-Liste nach dem Hochladen einer PDF-Datei
#upload_button.click(update_dropdown, inputs=None, outputs=pdf_dropdown)
#upload_button.click(lambda: pdf_dropdown.update(choices=list_pdfs()), outputs=pdf_dropdown)
demo.launch(share=True)
"""
"""
# Funktion zum Extrahieren des Textes aus einer PDF-Datei
def extract_text_from_pdf(pdf_path):
doc = fitz.open(pdf_path)
text = []
for page in doc:
text.append(page.get_text())
return text
# Dynamische Erstellung der Dokumentenliste und Extraktion der Texte
documents = []
for file_name in os.listdir(SAVE_DIR):
if file_name.endswith(".pdf"):
pdf_path = os.path.join(SAVE_DIR, file_name)
pages_text = extract_text_from_pdf(pdf_path)
documents.append({"file": file_name, "pages": pages_text})
# TF-IDF Vectorizer vorbereiten
vectorizer = TfidfVectorizer()
tfidf_matrix = vectorizer.fit_transform([page for doc in documents for page in doc['pages']])
####################################################
def search_documents(query):
if not query:
return [doc['file'] for doc in documents], "", []
query_vector = vectorizer.transform([query])
cosine_similarities = cosine_similarity(query_vector, tfidf_matrix).flatten()
related_docs_indices = cosine_similarities.argsort()[::-1]
results = []
relevant_text = ""
relevant_pdfs = []
num_pages_per_doc = [len(doc['pages']) for doc in documents]
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))]
for i in related_docs_indices:
if cosine_similarities[i] > 0:
doc_index = next(idx for idx, cumulative in enumerate(cumulative_pages) if i < cumulative)
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1]
doc = documents[doc_index]
results.append(doc['file'])
page_content = doc['pages'][page_index]
index = page_content.lower().find(query.lower())
if index != -1:
start = max(0, index - 400)
end = min(len(page_content), index + 400)
relevant_text += f"Aus {doc['file']} (Seite {page_index + 1}):\n...{page_content[start:end]}...\n\n"
relevant_pdfs.append((doc['file'], page_index))
return results, relevant_text, relevant_pdfs
def download_link(doc_name):
file_path = os.path.join(SAVE_DIR, doc_name)
file_url = f"/kkg_dokumente/{doc_name}"
return f'{doc_name}'
def update_display(selected_pdf):
return display_document(selected_pdf)
def update_dropdown():
return gr.Dropdown.update(choices=list_pdfs())
def search_and_updateBack(query):
results, rel_text, relevant_pdfs = search_documents(query)
pdf_html = ""
images = []
temp_dir = tempfile.mkdtemp()
for pdf, page in relevant_pdfs:
pdf_path = os.path.join(SAVE_DIR, pdf)
document = fitz.open(pdf_path)
# Seite als Integer umwandeln
page_num = int(page)
page = document.load_page(page_num)
pix = page.get_pixmap()
img_path = os.path.join(temp_dir, f"{pdf}_page_{page.number}.png")
pix.save(img_path)
images.append(img_path)
return images, rel_text
def search_and_update(query):
relevant_docs, relevant_text, _ = search_documents(query)
doc_links = ""
for doc in relevant_docs:
doc_links += download_link(doc) + "
"
return doc_links, relevant_text
def upload_pdf(file):
if file is None:
return None, "Keine Datei hochgeladen."
# Extrahieren des Dateinamens aus dem vollen Pfad
filename = os.path.basename(file.name)
# Datei zum Hugging Face Space hochladen
upload_path = f"kkg_dokumente/{filename}"
api.upload_file(
path_or_fileobj=file.name,
path_in_repo=upload_path,
repo_id=REPO_ID,
repo_type=REPO_TYPE,
token=HF_WRITE
)
return f"PDF '{filename}' erfolgreich hochgeladen."
def list_pdfs():
if not os.path.exists(SAVE_DIR):
return []
return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')]
def display_pdf(selected_pdf):
pdf_path = os.path.join(SAVE_DIR, selected_pdf)
# PDF-URL im Hugging Face Space
encoded_pdf_name = urllib.parse.quote(selected_pdf)
pdf_url = f"https://huggingface.co/spaces/{REPO_ID}/resolve/main/kkg_dokumente/{encoded_pdf_name}"
# PDF von der URL herunterladen
headers = {"Authorization": f"Bearer {HF_READ}"}
response = requests.get(pdf_url, headers=headers)
if response.status_code == 200:
with open(pdf_path, 'wb') as f:
f.write(response.content)
else:
return None, f"Fehler beim Herunterladen der PDF-Datei von {pdf_url}"
# PDF in Bilder umwandeln
document = fitz.open(pdf_path)
temp_dir = tempfile.mkdtemp()
# Nur die erste Seite als Bild speichern
page = document.load_page(0)
pix = page.get_pixmap()
img_path = os.path.join(temp_dir, f"page_0.png")
pix.save(img_path)
status = f"PDF '{selected_pdf}' erfolgreich geladen und verarbeitet."
return img_path, status
##############################################################
with gr.Blocks() as demo:
with gr.Tab("Upload PDF"):
upload_pdf_file = gr.File(label="PDF-Datei hochladen")
upload_status = gr.Textbox(label="Status")
upload_button = gr.Button("Upload")
upload_button.click(upload_pdf, inputs=upload_pdf_file, outputs=upload_status)
with gr.Tab("PDF Auswahl und Anzeige"):
pdf_dropdown = gr.Dropdown(label="Wählen Sie eine PDF-Datei", choices=list_pdfs())
query = gr.Textbox(label="Suchanfrage", type="text")
display_status = gr.Textbox(label="Status")
display_button = gr.Button("Anzeigen")
with gr.Row():
pdf_image = gr.Image(label="PDF-Seite als Bild", type="filepath")
relevant_text = gr.Textbox(label="Relevanter Text", lines=10)
display_button.click(display_pdf, inputs=[pdf_dropdown], outputs=[pdf_image, display_status])
with gr.Tab("Suche"):
search_query = gr.Textbox(label="Suchanfrage")
search_button = gr.Button("Suchen")
with gr.Row():
search_results = gr.Gallery(label="Relevante PDFs", type="filepath")
search_text = gr.Textbox(label="Relevanter Text", lines=10)
search_button.click(search_and_update, inputs=search_query, outputs=[search_results, search_text])
# Automatische Aktualisierung der Dropdown-Liste nach dem Hochladen einer PDF-Datei
#upload_button.click(update_dropdown, inputs=None, outputs=pdf_dropdown)
#upload_button.click(lambda: pdf_dropdown.update(choices=list_pdfs()), outputs=pdf_dropdown)
demo.launch(share=True)
"""