|
import gradio as gr |
|
import fitz |
|
import os |
|
import re |
|
import requests |
|
from huggingface_hub import HfApi |
|
import base64 |
|
from io import BytesIO |
|
import urllib.parse |
|
import tempfile |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
from docx import Document |
|
import asyncio |
|
import docx |
|
from transformers import AutoTokenizer, AutoModel |
|
from sentence_transformers import SentenceTransformer, util |
|
from fuzzywuzzy import fuzz |
|
import nltk |
|
from nltk.stem import PorterStemmer |
|
from nltk.corpus import stopwords |
|
from nltk.tokenize import RegexpTokenizer |
|
from transformers import BertModel, BertTokenizer |
|
from nltk.tokenize import word_tokenize |
|
from nltk.stem.snowball import SnowballStemmer |
|
import torch |
|
|
|
|
|
|
|
HF_READ = os.getenv("HF_READ") |
|
HF_WRITE = os.getenv("HF_WRITE") |
|
|
|
DOCS_DIR = "kkg_dokumente" |
|
|
|
|
|
REPO_ID = "alexkueck/kkg_suche" |
|
REPO_TYPE = "space" |
|
|
|
api = HfApi() |
|
|
|
|
|
nltk.download('punkt') |
|
nltk.download('stopwords') |
|
german_stopwords = set(stopwords.words('german')) |
|
|
|
|
|
|
|
|
|
|
|
|
|
model = SentenceTransformer('paraphrase-MiniLM-L6-v2') |
|
|
|
|
|
|
|
|
|
|
|
def extract_text_from_pdf(pdf_path): |
|
doc = fitz.open(pdf_path) |
|
pages = [] |
|
for page in doc: |
|
text = page.get_text() |
|
|
|
lines = text.split('\n') |
|
header = lines[0] if lines else '' |
|
content = '\n'.join(lines[1:]) if len(lines) > 1 else '' |
|
pages.append({'header': header, 'content': content}) |
|
return pages |
|
|
|
def extract_text_from_docx(docx_path): |
|
doc = Document(docx_path) |
|
pages = [] |
|
current_page = [] |
|
header = '' |
|
for para in doc.paragraphs: |
|
if para.style.name.startswith('Heading'): |
|
if current_page: |
|
pages.append({'header': header, 'content': '\n'.join(current_page)}) |
|
current_page = [] |
|
header = para.text |
|
else: |
|
current_page.append(para.text) |
|
if current_page: |
|
pages.append({'header': header, 'content': '\n'.join(current_page)}) |
|
return pages |
|
|
|
|
|
|
|
def initialize_documents(): |
|
documents = [] |
|
if os.path.exists(DOCS_DIR): |
|
for file_name in os.listdir(DOCS_DIR): |
|
if file_name.endswith(".pdf"): |
|
pdf_path = os.path.join(DOCS_DIR, file_name) |
|
pages = extract_text_from_pdf(pdf_path) |
|
documents.append({"file": file_name, "pages": pages}) |
|
elif file_name.endswith(".docx"): |
|
docx_path = os.path.join(DOCS_DIR, file_name) |
|
pages = extract_text_from_docx(docx_path) |
|
documents.append({"file": file_name, "pages": pages}) |
|
return documents |
|
|
|
|
|
|
|
|
|
def download_link(doc_name): |
|
|
|
file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}" |
|
return f'<b><a href="{file_url}" target="_blank" style="color: #BB70FC; font-weight: bold;">{doc_name}</a></b>' |
|
|
|
|
|
|
|
|
|
def remove_line_breaks(text): |
|
|
|
|
|
|
|
text = re.sub(r'\n{2,}', '\n', text) |
|
return text |
|
|
|
|
|
|
|
|
|
def upload_pdf(file): |
|
if file is None: |
|
return None, "Keine Datei hochgeladen." |
|
|
|
|
|
filename = os.path.basename(file.name) |
|
|
|
|
|
upload_path = f"kkg_dokumente/{filename}" |
|
api.upload_file( |
|
path_or_fileobj=file.name, |
|
path_in_repo=upload_path, |
|
repo_id=REPO_ID, |
|
repo_type=REPO_TYPE, |
|
token=HF_WRITE |
|
) |
|
return f"PDF '{filename}' erfolgreich hochgeladen." |
|
|
|
|
|
|
|
def display_files(): |
|
files = os.listdir(DOCS_DIR) |
|
files_table = "<table style='width:100%; border-collapse: collapse;'>" |
|
files_table += "<tr style='background-color: #930BBA; color: white; font-weight: bold; font-size: larger;'><th>Dateiname</th><th>Größe (KB)</th></tr>" |
|
for i, file in enumerate(files): |
|
file_path = os.path.join(DOCS_DIR, file) |
|
file_size = os.path.getsize(file_path) / 1024 |
|
row_color = "#4f4f4f" if i % 2 == 0 else "#3a3a3a" |
|
files_table += f"<tr style='background-color: {row_color}; border-bottom: 1px solid #ddd;'>" |
|
files_table += f"<td><b>{download_link(file)}</b></td>" |
|
files_table += f"<td>{file_size:.2f}</td></tr>" |
|
files_table += "</table>" |
|
return files_table |
|
|
|
|
|
|
|
def list_pdfs(): |
|
if not os.path.exists(DOCS_DIR): |
|
return [] |
|
return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] |
|
|
|
|
|
|
|
|
|
|
|
|
|
def preprocess_text(text): |
|
if not text: |
|
return "" |
|
|
|
text = text.lower() |
|
tokenizer = RegexpTokenizer(r'\w+') |
|
word_tokens = tokenizer.tokenize(text) |
|
filtered_words = [word for word in word_tokens if word not in german_stopwords] |
|
stemmer = SnowballStemmer("german") |
|
stemmed_words = [stemmer.stem(word) for word in filtered_words] |
|
return " ".join(stemmed_words) |
|
|
|
|
|
def clean_text(text): |
|
|
|
text = re.sub(r'[^\x00-\x7F]+', ' ', text) |
|
|
|
text = re.sub(r'\s+', ' ', text) |
|
return text.strip() |
|
|
|
|
|
|
|
def search_documents(query): |
|
documents = initialize_documents() |
|
|
|
|
|
texts = [page['content'] for doc in documents for page in doc['pages']] |
|
|
|
|
|
|
|
|
|
all_texts = [] |
|
for doc in documents: |
|
for page in doc['pages']: |
|
combined_text = page['header'] + " " + page['content'] |
|
preprocessed_text = preprocess_text(combined_text) |
|
if preprocessed_text: |
|
all_texts.append(preprocessed_text) |
|
|
|
prepro_query = preprocess_text(query) |
|
|
|
if not all_texts or not prepro_query: |
|
return "", "" |
|
else: |
|
|
|
document_embeddings = model.encode(all_texts, convert_to_tensor=True) |
|
|
|
|
|
query_embedding = model.encode(prepro_query, convert_to_tensor=True) |
|
|
|
|
|
similarities = util.pytorch_cos_sim(query_embedding, document_embeddings)[0] |
|
|
|
|
|
|
|
|
|
|
|
sorted_indices = similarities.argsort(descending=True) |
|
|
|
results = [] |
|
relevant_text = "" |
|
relevant_docs = {} |
|
num_pages_per_doc = [len(doc['pages']) for doc in documents] |
|
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] |
|
|
|
for i in sorted_indices: |
|
if similarities[i] > 0.3: |
|
doc_index = None |
|
for idx, cumulative in enumerate(cumulative_pages): |
|
if i < cumulative: |
|
doc_index = idx |
|
break |
|
if doc_index is None: |
|
continue |
|
|
|
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] |
|
doc = documents[doc_index] |
|
page = doc['pages'][page_index] |
|
page_content = page['content'] |
|
header_content = page.get('header', '') |
|
|
|
|
|
index_in_content = page_content.lower().find(prepro_query.lower()) |
|
index_in_header = header_content.lower().find(prepro_query.lower()) |
|
|
|
|
|
|
|
words_in_query = prepro_query.split() |
|
page_words = preprocess_text(page_content).split() |
|
header_words = preprocess_text(header_content).split() |
|
|
|
if (index_in_content != -1 or index_in_header != -1 or |
|
any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words) or |
|
any(fuzz.ratio(word, header_word) > 80 for word in words_in_query for header_word in header_words)): |
|
|
|
|
|
|
|
start = max(0, index_in_content - 400) if index_in_content != -1 else 0 |
|
end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content) |
|
snippet = f"Aus <span class='doc-name'>{doc['file']}</span> (Seite <span class='page-number'>{page_index + 1}</span>):<br>" |
|
|
|
|
|
if header_content: |
|
snippet += f"<span style='color: #0EDC0E; font-weight: bold;'>Überschrift: {header_content}</span> <br>" |
|
snippet += f"{remove_line_breaks(page_content[start:end])}<br><hr>" |
|
|
|
relevant_text += snippet |
|
|
|
if doc['file'] not in relevant_docs: |
|
relevant_docs[doc['file']] = [] |
|
relevant_docs[doc['file']].append(snippet) |
|
|
|
|
|
results = sorted(results, key=lambda x: x[1], reverse=True) |
|
results = [res[0] for res in results] |
|
|
|
results = list(relevant_docs.keys()) |
|
return results, relevant_text |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def search_and_update(query): |
|
if not query.strip(): |
|
return "<div class='no-results'>Bitte geben Sie einen Suchbegriff ein.</div>", "<div class='no-results'>Bitte geben Sie einen Suchbegriff ein.</div>" |
|
|
|
relevant_docs, relevant_text = search_documents(query) |
|
|
|
if not relevant_docs: |
|
doc_links = "<div class='no-results'>Keine passenden Dokumente gefunden.</div>" |
|
else: |
|
doc_links = "" |
|
for doc in relevant_docs: |
|
doc_links += download_link(doc) + "<br>" |
|
|
|
if not relevant_text: |
|
relevant_text = "<div class='no-results'>Kein relevanter Text gefunden.</div>" |
|
|
|
return "", doc_links, relevant_text |
|
|
|
|
|
def show_progress(): |
|
return gr.update(value="Suche läuft...", visible=True) |
|
|
|
def hide_progress(): |
|
return gr.update(value="", visible=False) |
|
|
|
|
|
|
|
|
|
with gr.Blocks(css=""" |
|
.results { |
|
background-color: #f0f0f0; |
|
padding: 10px; |
|
border-radius: 5px; |
|
overflow-y: auto; |
|
/*max-height: 400px;*/ |
|
width: 100%; /* Volle Breite */ |
|
} |
|
.no-results { |
|
color: red; |
|
} |
|
.doc-name { |
|
font-weight: bold; |
|
color: #B05DF9; /* Dunkleres Lila für verlinkte Dokumente */ |
|
} |
|
.page-number { |
|
font-weight: bold; |
|
color: #FF5733; |
|
} |
|
#doc_links, #relevant_text { |
|
background-color: #333333; /* Sehr dunkles Grau */ |
|
padding: 10px; /* Innenabstand */ |
|
border-radius: 5px; /* Abgerundete Ecken */ |
|
overflow-y: auto; /* Vertikale Scrollbalken */ |
|
white-space: pre-wrap; /* Textumbruch innerhalb des Feldes */ |
|
height: auto; /* Automatische Höhe */ |
|
width: 100%; /* Volle Breite */ |
|
} |
|
#doc_links a { |
|
color: #BB70FC; /* Helles Lila für Links im doc_links Feld */ |
|
font-weight: bold; |
|
width: 100%; /* Volle Breite */ |
|
} |
|
""") as demo: |
|
|
|
with gr.Tab("Suche"): |
|
progress = gr.Markdown(value="") |
|
query_input = gr.Textbox(label="Suchanfrage") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) |
|
with gr.Column(scale=2): |
|
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) |
|
|
|
query_input.submit(show_progress, inputs=[], outputs=[progress], show_progress="false") |
|
query_input.submit(search_and_update, inputs=[query_input], outputs=[progress, doc_links, relevant_text], show_progress="true").then( |
|
hide_progress, |
|
inputs=[], |
|
outputs=[progress] |
|
) |
|
|
|
|
|
with gr.Tab("Datei hochladen"): |
|
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") |
|
output_text = gr.Textbox(label="Status") |
|
|
|
file_list = gr.HTML(elem_id="file_list", show_label=False) |
|
|
|
|
|
|
|
upload_pdf_file.change(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) |
|
|
|
|
|
demo.load(display_files, outputs=file_list) |
|
|
|
demo.queue(default_concurrency_limit=10).launch(debug=True) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
""" |
|
######################################################## |
|
##########Ki Modell für Embeddings der Suchanfrage nutzen |
|
# Laden des Sentence-Transformer-Modells |
|
model = SentenceTransformer('paraphrase-MiniLM-L6-v2') |
|
|
|
|
|
######################################################## |
|
######## Hilfsfunktionen für die Suche ################# |
|
# Funktion zum Extrahieren von Text aus PDF - und Word |
|
def extract_text_from_pdf(pdf_path): |
|
doc = fitz.open(pdf_path) |
|
pages = [] |
|
for page in doc: |
|
text = page.get_text() |
|
# Hier eine einfache Annahme, dass die erste Zeile der Seite die Überschrift ist |
|
lines = text.split('\n') |
|
header = lines[0] if lines else '' |
|
content = '\n'.join(lines[1:]) if len(lines) > 1 else '' |
|
pages.append({'header': header, 'content': content}) |
|
return pages |
|
|
|
def extract_text_from_docx(docx_path): |
|
doc = Document(docx_path) |
|
pages = [] |
|
current_page = [] |
|
header = '' |
|
for para in doc.paragraphs: |
|
if para.style.name.startswith('Heading'): # Annahme, dass alle Überschriften Stile haben, die mit 'Heading' beginnen |
|
if current_page: |
|
pages.append({'header': header, 'content': '\n'.join(current_page)}) |
|
current_page = [] |
|
header = para.text |
|
else: |
|
current_page.append(para.text) |
|
if current_page: # Letzte Seite hinzufügen |
|
pages.append({'header': header, 'content': '\n'.join(current_page)}) |
|
return pages |
|
|
|
|
|
# Initialisierung der Dokumente - Dictionary um die Dokuemtneteninhalte, Switen und Überschriften zu halten |
|
def initialize_documents(): |
|
documents = [] |
|
if os.path.exists(DOCS_DIR): |
|
for file_name in os.listdir(DOCS_DIR): |
|
if file_name.endswith(".pdf"): |
|
pdf_path = os.path.join(DOCS_DIR, file_name) |
|
pages = extract_text_from_pdf(pdf_path) |
|
documents.append({"file": file_name, "pages": pages}) |
|
elif file_name.endswith(".docx"): |
|
docx_path = os.path.join(DOCS_DIR, file_name) |
|
pages = extract_text_from_docx(docx_path) |
|
documents.append({"file": file_name, "pages": pages}) |
|
return documents |
|
|
|
|
|
####################################################### |
|
#nach relevanten suche -> download Link der passenden Dokuemtne erstellen |
|
def download_link(doc_name): |
|
# URL für das Herunterladen der Datei |
|
file_url = f"https://huggingface.co/spaces/alexkueck/kkg_suche/resolve/main/kkg_dokumente/{doc_name}?token={HF_READ}" |
|
return f'<b><a href="{file_url}" target="_blank" style="color: #BB70FC; font-weight: bold;">{doc_name}</a></b>' |
|
|
|
|
|
# Zeitelumbrüche entfernen - bei einzelnen, mehrere hinterienander zu einem zusammenfassen |
|
#zur Ziet nicht im Einsatz |
|
def remove_line_breaks(text): |
|
# Entfernt alle einzelnen Zeilenumbrüche |
|
#text = re.sub(r'(?<!\n)\n(?!\n)', ' ', text) |
|
# Fasst mehrere Zeilenumbrüche zu einem einzigen zusammen |
|
text = re.sub(r'\n{2,}', '\n', text) |
|
return text |
|
|
|
######################################################## |
|
######## Hilfsfunktionen Datei-Upload ################## |
|
# Hochladen von Dateien |
|
def upload_pdf(file): |
|
if file is None: |
|
return None, "Keine Datei hochgeladen." |
|
|
|
# Extrahieren des Dateinamens aus dem vollen Pfad |
|
filename = os.path.basename(file.name) |
|
|
|
# Datei zum Hugging Face Space hochladen |
|
upload_path = f"kkg_dokumente/{filename}" |
|
api.upload_file( |
|
path_or_fileobj=file.name, |
|
path_in_repo=upload_path, |
|
repo_id=REPO_ID, |
|
repo_type=REPO_TYPE, |
|
token=HF_WRITE |
|
) |
|
return f"PDF '{filename}' erfolgreich hochgeladen." |
|
|
|
def display_files(): |
|
files = os.listdir(DOCS_DIR) |
|
files_table = "<table style='width:100%; border-collapse: collapse;'>" |
|
files_table += "<tr style='background-color: #930BBA; color: white; font-weight: bold; font-size: larger;'><th>Dateiname</th><th>Größe (KB)</th></tr>" |
|
for i, file in enumerate(files): |
|
file_path = os.path.join(DOCS_DIR, file) |
|
file_size = os.path.getsize(file_path) / 1024 # Größe in KB |
|
row_color = "#4f4f4f" if i % 2 == 0 else "#3a3a3a" # Wechselnde Zeilenfarben |
|
files_table += f"<tr style='background-color: {row_color}; border-bottom: 1px solid #ddd;'>" |
|
files_table += f"<td><b>{download_link(file)}</b></td>" |
|
files_table += f"<td>{file_size:.2f}</td></tr>" |
|
files_table += "</table>" |
|
return files_table |
|
|
|
|
|
# gefundene relevante Dokumente auflisten (links) |
|
def list_pdfs(): |
|
if not os.path.exists(DOCS_DIR): |
|
return [] |
|
return [f for f in os.listdir(SAVE_DIR) if f.endswith('.pdf')] |
|
|
|
|
|
########################################################### |
|
############# KI um Suchanfrage zu Embedden ############### |
|
# Funktion zur Entfernung von Stopwörtern und Tokenisierung - um bei längeren suchanfragen auf relevante wörter zu konzentrieren |
|
def preprocess_textback(text): |
|
if not text: |
|
return [] |
|
|
|
stop_words = set(stopwords.words('german')) |
|
tokenizer = RegexpTokenizer(r'\w+') |
|
word_tokens = tokenizer.tokenize(text) |
|
filtered_words = [word for word in word_tokens if word.lower() not in stop_words] |
|
return filtered_words |
|
|
|
def preprocess_text(text): |
|
if not text: |
|
return "" |
|
|
|
# Konvertiere den Text zu Kleinbuchstaben |
|
text = text.lower() |
|
|
|
# Tokenisierung |
|
tokenizer = RegexpTokenizer(r'\w+') |
|
word_tokens = tokenizer.tokenize(text) |
|
|
|
# Entfernen von Stoppwörtern |
|
filtered_words = [word for word in word_tokens if word not in stop_words] |
|
|
|
# Stemming |
|
stemmer = SnowballStemmer("german") |
|
stemmed_words = [stemmer.stem(word) for word in filtered_words] |
|
|
|
return " ".join(stemmed_words) |
|
|
|
# Funktion zur Bereinigung des Textes aus den Pdfs und Word Dokuemtne, um den Tokenizer nicht zu überfordern |
|
def clean_text(text): |
|
# Entfernen nicht druckbarer Zeichen |
|
text = re.sub(r'[^\x00-\x7F]+', ' ', text) |
|
# Ersetzen ungewöhnlicher Leerzeichen durch normale Leerzeichen |
|
text = re.sub(r'\s+', ' ', text) |
|
return text.strip() |
|
|
|
# Funktion zur Berechnung der Embeddings |
|
def get_embeddings(texts): |
|
return model.encode(texts, convert_to_tensor=True) |
|
|
|
#um ähnliche Wörter anhand ihres Wortstammes zu erkennen |
|
# Funktion zur Stemmatisierung des Textes |
|
def stem_text(text): |
|
if not text: |
|
return "" |
|
|
|
stemmer = SnowballStemmer("german") |
|
tokenizer = RegexpTokenizer(r'\w+') |
|
word_tokens = tokenizer.tokenize(text) |
|
stemmed_words = [stemmer.stem(word) for word in word_tokens] |
|
return " ".join(stemmed_words) |
|
|
|
# Durchsuchen von Dokumenten |
|
def search_documents(query): |
|
documents = initialize_documents() |
|
|
|
# Texte und Überschriften in die Embeddings aufnehmen |
|
texts = [page['content'] for doc in documents for page in doc['pages']] |
|
stemmed_texts = [stem_text(text) for text in texts] |
|
text_embeddings = get_embeddings(stemmed_texts) |
|
|
|
|
|
# Stemming des Queries - and vorher unwichtige Wörter entfernen - um suchergebnis zu verbessern |
|
prepro_query = preprocess_text(query) #stem_text(" ".join(preprocess_text(query))) |
|
# Embedding des Queries |
|
query_embedding = get_embeddings(prepro_query) |
|
|
|
|
|
# Sicherstellen, dass die Embeddings 2D-Arrays sind |
|
if len(query_embedding.shape) == 1: |
|
query_embedding = query_embedding.reshape(1, -1) |
|
if len(text_embeddings.shape) == 1: |
|
text_embeddings = text_embeddings.reshape(1, -1) |
|
|
|
# Berechnung der Ähnlichkeit |
|
similarities = cosine_similarity(query_embedding.cpu(), text_embeddings.cpu()).flatten() |
|
|
|
# Sortieren nach Relevanz |
|
related_docs_indices = similarities.argsort()[::-1] |
|
|
|
results=[] |
|
relevant_text = "" |
|
relevant_docs = {} |
|
num_pages_per_doc = [len(doc['pages']) for doc in documents] |
|
cumulative_pages = [sum(num_pages_per_doc[:i+1]) for i in range(len(num_pages_per_doc))] |
|
|
|
for i in related_docs_indices: |
|
if similarities[i] > 0.3: |
|
doc_index = None |
|
for idx, cumulative in enumerate(cumulative_pages): |
|
if i < cumulative: |
|
doc_index = idx |
|
break |
|
if doc_index is None: |
|
continue |
|
|
|
page_index = i if doc_index == 0 else i - cumulative_pages[doc_index-1] |
|
doc = documents[doc_index] |
|
page = doc['pages'][page_index] |
|
page_content = page['content'] |
|
header_content = page.get('header', '') |
|
|
|
# Überprüfen, ob der Suchtext in der Überschrift oder im Seiteninhalt enthalten ist |
|
index_in_content = page_content.lower().find(prepro_query.lower()) |
|
index_in_header = header_content.lower().find(prepro_query.lower()) |
|
|
|
|
|
|
|
# Berücksichtigung der Levenshtein-Distanz |
|
words_in_query = prepro_query |
|
page_words = stem_text(page_content).split() |
|
if index_in_content != -1 or index_in_header != -1 or any(fuzz.ratio(word, page_word) > 80 for word in words_in_query for page_word in page_words): # <--- Integration von fuzz.ratio für jedes Wort |
|
# Erstellen Sie einen Snippet für die Suchergebnisse |
|
start = max(0, index_in_content - 400) if index_in_content != -1 else 0 |
|
end = min(len(page_content), index_in_content + 400) if index_in_content != -1 else len(page_content) |
|
snippet = f"Aus <span class='doc-name'>{doc['file']}</span> (Seite <span class='page-number'>{page_index + 1}</span>):<br>" |
|
|
|
# Fügen Sie die Überschrift hinzu, falls vorhanden |
|
if header_content: |
|
snippet += f"<span style='color: #0EDC0E; font-weight: bold;''>Überschrift: {header_content}</span> <br>" |
|
|
|
snippet += f"{remove_line_breaks(page_content[start:end])}<br><hr>" |
|
relevant_text += snippet |
|
|
|
if doc['file'] not in relevant_docs: |
|
relevant_docs[doc['file']] = [] |
|
relevant_docs[doc['file']].append(snippet) |
|
|
|
# Sortieren nach Relevanz |
|
results = sorted(results, key=lambda x: x[1], reverse=True) |
|
results = [res[0] for res in results] |
|
|
|
results = list(relevant_docs.keys()) |
|
return results, relevant_text |
|
|
|
|
|
########################################################### |
|
############## Vorbereitung View in gradio ################ |
|
####################################### |
|
#Suche starten und View aktialisieren |
|
def search_and_update(query): |
|
if not query.strip(): |
|
return "<div class='no-results'>Bitte geben Sie einen Suchbegriff ein.</div>", "<div class='no-results'>Bitte geben Sie einen Suchbegriff ein.</div>" |
|
|
|
relevant_docs, relevant_text = search_documents(query) |
|
|
|
if not relevant_docs: |
|
doc_links = "<div class='no-results'>Keine passenden Dokumente gefunden.</div>" |
|
else: |
|
doc_links = "" |
|
for doc in relevant_docs: |
|
doc_links += download_link(doc) + "<br>" |
|
|
|
if not relevant_text: |
|
relevant_text = "<div class='no-results'>Kein relevanter Text gefunden.</div>" |
|
|
|
return "", doc_links, relevant_text |
|
|
|
#Fortschritt anzeigen beim Warten auf Suchergebnisse |
|
def show_progress(): |
|
return gr.update(value="Suche läuft...", visible=True) |
|
|
|
def hide_progress(): |
|
return gr.update(value="", visible=False) |
|
|
|
|
|
###################################################################### |
|
############### Anwendung starten #################################### |
|
with gr.Blocks(css= |
|
.results { |
|
background-color: #f0f0f0; |
|
padding: 10px; |
|
border-radius: 5px; |
|
overflow-y: auto; |
|
/*max-height: 400px;*/ |
|
width: 100%; /* Volle Breite */ |
|
} |
|
.no-results { |
|
color: red; |
|
} |
|
.doc-name { |
|
font-weight: bold; |
|
color: #B05DF9; /* Dunkleres Lila für verlinkte Dokumente */ |
|
} |
|
.page-number { |
|
font-weight: bold; |
|
color: #FF5733; |
|
} |
|
#doc_links, #relevant_text { |
|
background-color: #333333; /* Sehr dunkles Grau */ |
|
padding: 10px; /* Innenabstand */ |
|
border-radius: 5px; /* Abgerundete Ecken */ |
|
overflow-y: auto; /* Vertikale Scrollbalken */ |
|
white-space: pre-wrap; /* Textumbruch innerhalb des Feldes */ |
|
height: auto; /* Automatische Höhe */ |
|
width: 100%; /* Volle Breite */ |
|
} |
|
#doc_links a { |
|
color: #BB70FC; /* Helles Lila für Links im doc_links Feld */ |
|
font-weight: bold; |
|
width: 100%; /* Volle Breite */ |
|
} |
|
) as demo: |
|
|
|
with gr.Tab("Suche"): |
|
progress = gr.Markdown(value="") |
|
query_input = gr.Textbox(label="Suchanfrage") |
|
with gr.Row(): |
|
with gr.Column(scale=1): |
|
doc_links = gr.HTML(label="Relevante Dokumente", elem_id="doc_links", show_label=False) |
|
with gr.Column(scale=2): |
|
relevant_text = gr.HTML(label="Relevanter Text", elem_id="relevant_text", show_label=False) |
|
|
|
query_input.submit(show_progress, inputs=[], outputs=[progress], show_progress="false") |
|
query_input.submit(search_and_update, inputs=[query_input], outputs=[progress, doc_links, relevant_text], show_progress="true").then( |
|
hide_progress, |
|
inputs=[], |
|
outputs=[progress] |
|
) |
|
|
|
|
|
with gr.Tab("Datei hochladen"): |
|
upload_pdf_file = gr.File(label="PDF- oder Word-Datei hochladen") |
|
output_text = gr.Textbox(label="Status") |
|
#upload_button = gr.Button("Datei hochladen") |
|
file_list = gr.HTML(elem_id="file_list", show_label=False) |
|
|
|
#upload_button.click(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) |
|
# Automatisches Ausführen der Upload-Funktion, wenn eine Datei hochgeladen wird |
|
upload_pdf_file.change(fn=upload_pdf, inputs=upload_pdf_file, outputs=output_text) |
|
#gr.HTML(update=display_files, elem_id="file_list", show_label=False) |
|
|
|
demo.load(display_files, outputs=file_list) |
|
|
|
demo.launch() |
|
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|