Spaces:
Sleeping
Sleeping
import gradio as gr | |
import hnswlib | |
from cryptography.fernet import Fernet | |
from sentence_transformers import SentenceTransformer, CrossEncoder | |
from dotenv import load_dotenv | |
import os | |
import gzip | |
import io | |
import pandas as pd | |
from bs4 import BeautifulSoup | |
load_dotenv() | |
fernet = Fernet(os.environ.get("KEY").encode("utf-8")) | |
#read data | |
with gzip.open("title_metadata.gz",'rb') as f: | |
bytes_enc = f.read() | |
pq_bytes = fernet.decrypt(bytes_enc) | |
pq_file = io.BytesIO(pq_bytes) | |
meta_title = pd.read_parquet(pq_file) | |
#read data | |
with gzip.open("corpus_metadata.gz",'rb') as f: | |
bytes_enc = f.read() | |
pq_bytes = fernet.decrypt(bytes_enc) | |
pq_file = io.BytesIO(pq_bytes) | |
meta_corpus = pd.read_parquet(pq_file) | |
#load models | |
model = SentenceTransformer("KennethTM/MiniLM-L6-danish-encoder", device="cpu") | |
embedding_size = model.get_sentence_embedding_dimension() | |
crossencoder = CrossEncoder("KennethTM/MiniLM-L6-danish-reranker", device="cpu") | |
#set up indexes | |
title_index_path = "title.index" | |
title_index = hnswlib.Index(space = 'cosine', dim = embedding_size) | |
title_index.load_index(title_index_path) | |
title_index.set_ef(40) | |
corpus_index_path = "corpus.index" | |
corpus_index = hnswlib.Index(space = 'cosine', dim = embedding_size) | |
corpus_index.load_index(corpus_index_path) | |
corpus_index.set_ef(40) | |
#create dict with metadata and index | |
data = {"title": {"index": title_index, "meta": meta_title, "column": "title"}, | |
"corpus": {"index": corpus_index, "meta": meta_corpus, "column": "text_chunks"}} | |
#init state dict | |
state = {"query": None} | |
#function find most similar candidates | |
def get_hits(query, index_name, top_k, top_k_multiplier = 2): | |
#get nearest neightbor ids | |
query_embedding = model.encode(query) | |
ids, _ = data[index_name]["index"].knn_query(query_embedding, k = int(top_k*top_k_multiplier)) | |
ids = ids[0] | |
#rerank candidates | |
results = data[index_name]["meta"].iloc[ids].copy() | |
column_name = data[index_name]["column"] | |
rerank_list = [(query, i) for i in results[column_name]] | |
results["scores"] = crossencoder.predict(rerank_list) | |
results = results.sort_values("scores", ascending=False) | |
results = results[:int(top_k)] | |
return results | |
#functions for formatting hits | |
def format_hits(hits, index_name): | |
if index_name == "title": | |
formatted = "### {id}. " + hits['title'] + " ([Link til " + hits['type_label'] + "](" + hits['url'] + "))" + "\nSag ID: " + hits['id'] | |
elif index_name == "corpus": | |
column_name = data[index_name]["column"] | |
formatted = "### {id}. " + hits['title'] + " ([Link til " + hits['type_label'] + "](" + hits['url'] + "))" + "\nSag ID: " + hits['id'] + "\n\n" + hits[column_name] | |
merged = "\n\n".join([text.format(id=i+1) for i, text in enumerate(formatted)]) | |
merged = f"## Resultater\n{merged}" | |
return(merged) | |
#main entry function for search | |
def search(query, index_name, top_k): | |
hits = get_hits(query, index_name, top_k) | |
hits_formatted = format_hits(hits, index_name) | |
state["query"] = query | |
return(hits_formatted) | |
def update_description(): | |
return(f"Nuværende søgning: {state['query']}") | |
def analyse_doc(id): | |
meta_doc = meta_title.query(f"id == '{id}'") | |
if meta_doc.empty or state["query"] is None: | |
return("<b>Ingen sager fundet...</b>") | |
html_body = meta_doc["text_html"].iloc[0] | |
html_title = meta_doc["title"].iloc[0] | |
html = f"<html>\n<body>\n<h1>{html_title}</h1>\n{html_body}\n</body>\n</html>" | |
soup = BeautifulSoup(html, 'lxml') | |
min_characters = 100 | |
p_list = [i for i in soup.body.find_all('p', recursive=False) if len(i.get_text(strip=True)) > min_characters] | |
rerank_list = [(state["query"], i.get_text(strip=True)) for i in p_list] | |
rerank_scores = crossencoder.predict(rerank_list) | |
rerank_scores_norm = (rerank_scores - rerank_scores.min()) / (rerank_scores.max() - rerank_scores.min()) | |
for element, score in zip(p_list, rerank_scores_norm): | |
element['style'] = f'background-color:rgba(173, 216, 230, {score});' | |
html_doc = f"""<iframe style="width: 100%; height: 480px" srcdoc='{str(soup)}'></iframe>""" | |
return(html_doc) | |
#define interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# DEMO Semantisk søgning i Miljø- og Fødevareklagenævnets afgørelser") | |
with gr.Tab("Søgning"): | |
gr.Markdown("## Søgning i afgørelser") | |
gr.Markdown("Anvend søgefelt til at søge i titel eller tekst for afgørelser.") | |
gr.Markdown("Brug spørgsmål, kort beskrivelser eller stikord til søgningen.") | |
with gr.Row(): | |
textbox = gr.Textbox(placeholder="Skriv her...", lines=1, label="Søgning", scale=6) | |
name = gr.Radio([("Titel", "title"), ("Tekst", "corpus")], value="corpus", label="Søgning i titel eller tekst?", scale=2) | |
num = gr.Number(5, label="Antal hits", scale=1) | |
btn = gr.Button("Søg!", size="sm", scale=1) | |
with gr.Row(): | |
output = gr.Markdown() | |
btn.click(search, [textbox, name, num], output) | |
textbox.submit(search, [textbox, name, num], output) | |
with gr.Tab("Analyse"): | |
gr.Markdown("## Analyse af hele dokumenter") | |
#description = gr.Markdown("Ingen søgning foretaget endnu - søg efter afgørelser ved at angive et sags ID.") | |
gr.Markdown("Relevans for tekst angives ved farveintensitet - mere blå er mere relevant.") | |
with gr.Row(): | |
id_textbox = gr.Textbox(placeholder="Indsæt sags ID her...", lines=1, label="Søgning", scale=8) | |
id_btn = gr.Button("Søg!", size="sm", scale=2) | |
with gr.Row(): | |
html_output = gr.HTML() | |
#output.change(update_description, [], description) | |
id_btn.click(analyse_doc, id_textbox, html_output) | |
id_textbox.submit(analyse_doc, id_textbox, html_output) | |
demo.launch() | |