scisearch / app.py
ola13's picture
remove lang info
98cc9d3
import json
import os
import traceback
from typing import List, Tuple
import gradio as gr
import requests
from huggingface_hub import HfApi
hf_api = HfApi()
roots_datasets = {
dset.id.split("/")[-1]: dset
for dset in hf_api.list_datasets(
author="bigscience-data", use_auth_token=os.environ.get("bigscience_data_token")
)
}
def get_docid_html(docid):
data_org, dataset, docid = docid.split("/")
metadata = roots_datasets[dataset]
if metadata.private:
docid_html = """
<a title="This dataset is private. See the introductory text for more information"
style="color:#AA4A44; font-weight: bold; text-decoration:none"
onmouseover="style='color:#AA4A44; font-weight: bold; text-decoration:underline'"
onmouseout="style='color:#AA4A44; font-weight: bold; text-decoration:none'"
href="https://huggingface.co/datasets/bigscience-data/{dataset}"
target="_blank">
πŸ”’{dataset}
</a>
<span style="color:#7978FF; ">/{docid}</span>""".format(
dataset=dataset, docid=docid
)
else:
docid_html = """
<a title="This dataset is licensed {metadata}"
style="color:#7978FF; font-weight: bold; text-decoration:none"
onmouseover="style='color:#7978FF; font-weight: bold; text-decoration:underline'"
onmouseout="style='color:#7978FF; font-weight: bold; text-decoration:none'"
href="https://huggingface.co/datasets/bigscience-data/{dataset}"
target="_blank">
{dataset}
</a>
<span style="color:#7978FF; ">/{docid}</span>""".format(
metadata=metadata.tags[0].split(":")[-1], dataset=dataset, docid=docid
)
return docid_html
PII_TAGS = {"KEY", "EMAIL", "USER", "IP_ADDRESS", "ID", "IPv4", "IPv6"}
PII_PREFIX = "PI:"
def process_pii(text):
for tag in PII_TAGS:
text = text.replace(
PII_PREFIX + tag,
"""<b><mark style="background: Fuchsia; color: Lime;">REDACTED {}</mark></b>""".format(
tag
),
)
return text
def flag(query, language, num_results, issue_description):
try:
post_data = {
"query": query,
"k": num_results,
"flag": True,
"description": issue_description,
}
if language != "detect_language":
post_data["lang"] = language
output = requests.post(
os.environ.get("address"),
headers={"Content-type": "application/json"},
data=json.dumps(post_data),
timeout=120,
)
results = json.loads(output.text)
except:
print("Error flagging")
return ""
def format_result(result, highlight_terms, exact_search, datasets_filter=None):
text, url, docid = result
if datasets_filter is not None:
datasets_filter = set(datasets_filter)
dataset = docid.split("/")[1]
if not dataset in datasets_filter:
return ""
if exact_search:
query_start = text.find(highlight_terms)
query_end = query_start + len(highlight_terms)
tokens_html = text[0:query_start]
tokens_html += "<b>{}</b>".format(text[query_start:query_end])
tokens_html += text[query_end:]
else:
tokens = text.split()
tokens_html = []
for token in tokens:
if token in highlight_terms:
tokens_html.append("<b>{}</b>".format(token))
else:
tokens_html.append(token)
tokens_html = " ".join(tokens_html)
tokens_html = process_pii(tokens_html)
meta_html = (
"""<p class='underline-on-hover' style='font-size:12px; font-family: Arial; color:#585858; text-align: left;'>
<a href='{}' target='_blank'>{}</a></p>""".format(
url, url
)
if url is not None
else ""
)
docid_html = get_docid_html(docid)
language = "FIXME"
return """{}
<p style='font-size:14px; font-family: Arial; color:#7978FF; text-align: left;'>Document ID: {}</p>
<!-- <p style='font-size:12px; font-family: Arial; color:MediumAquaMarine'>Language: {}</p> -->
<p style='font-family: Arial;'>{}</p>
<br>
""".format(
meta_html, docid_html, language, tokens_html
)
def format_result_page(
language, results, highlight_terms, num_results, exact_search, datasets_filter=None
) -> gr.HTML:
filtered_num_results = 0
header_html = ""
# FIX lang detection by normalizing format on the backend
if language == "detect_language" and not exact_search:
header_html += """<p style='font-family: Arial; color:MediumAquaMarine; text-align: center; line-height: 3em'>
Detected language: <b> FIX MEEEE !!! </b><hr></p><br>"""
results_html = ""
for lang, results_for_lang in results.items():
if len(results_for_lang) == 0:
if exact_search:
results_html += """<p style='font-family: Arial; color:Silver; text-align: left; line-height: 3em'>
No results found.<hr></p>"""
else:
results_html += """<p style='font-family: Arial; color:Silver; text-align: left; line-height: 3em'>
No results for language: <b>{}</b><hr></p>""".format(
lang
)
continue
results_for_lang_html = ""
for result in results_for_lang:
result_html = format_result(
result, highlight_terms, exact_search, datasets_filter
)
if result_html != "":
filtered_num_results += 1
results_for_lang_html += result_html
if language == "all" and not exact_search:
results_for_lang_html = f"""
<details>
<summary style='font-family: Arial; color:MediumAquaMarine; text-align: left; line-height: 3em'>
Results for language: <b>{lang}</b><hr>
</summary>
{results_for_lang_html}
</details>"""
results_html += results_for_lang_html
if num_results is not None:
header_html += """<p style='font-family: Arial; color:MediumAquaMarine; text-align: center; line-height: 3em'>
Total number of matches: <b>{}</b><hr></p><br>""".format(
filtered_num_results
)
return header_html + results_html
def extract_results_from_payload(query, language, payload, exact_search):
results = payload["results"]
processed_results = dict()
datasets = set()
highlight_terms = None
num_results = None
if exact_search:
highlight_terms = query
num_results = payload["num_results"]
results = {language: results}
else:
highlight_terms = payload["highlight_terms"]
# unify format - might be best fixed on server side
if language != "all":
results = {language: results}
for lang, results_for_lang in results.items():
processed_results[lang] = list()
for result in results_for_lang:
text = result["text"]
url = (
result["meta"]["url"]
if "meta" in result
and result["meta"] is not None
and "url" in result["meta"]
else None
)
docid = result["docid"]
_, dataset, _ = docid.split("/")
datasets.add(dataset)
processed_results[lang].append((text, url, docid))
return processed_results, highlight_terms, num_results, list(datasets)
def process_error(error_type):
if error_type == "unsupported_lang":
detected_lang = payload["err"]["meta"]["detected_lang"]
return f"""
<p style='font-size:18px; font-family: Arial; color:MediumVioletRed; text-align: center;'>
Detected language <b>{detected_lang}</b> is not supported.<br>
Please choose a language from the dropdown or type another query.
</p><br><hr><br>"""
def extract_error_from_payload(payload):
if "err" in payload:
return payload["err"]["type"]
return None
def request_payload(query, language, exact_search, num_results=10):
post_data = {"query": query, "k": num_results}
if language != "detect_language":
post_data["lang"] = language
address = "http://34.105.160.81:8080" if exact_search else os.environ.get("address")
output = requests.post(
address,
headers={"Content-type": "application/json"},
data=json.dumps(post_data),
timeout=60,
)
payload = json.loads(output.text)
return payload
description = """# <p style="text-align: center;"> 🌸 πŸ”Ž ROOTS search tool πŸ” 🌸 </p>
The ROOTS corpus was developed during the [BigScience workshop](https://bigscience.huggingface.co/) for the purpose
of training the Multilingual Large Language Model [BLOOM](https://huggingface.co/bigscience/bloom). This tool allows
you to search through the ROOTS corpus. We serve a BM25 index for each language or group of languages included in
ROOTS. You can read more about the details of the tool design
[here](https://huggingface.co/spaces/bigscience-data/scisearch/blob/main/roots_search_tool_specs.pdf). For more
information and instructions on how to access the full corpus check [this form](https://forms.gle/qyYswbEL5kA23Wu99)."""
if __name__ == "__main__":
demo = gr.Blocks(
css=".underline-on-hover:hover { text-decoration: underline; } .flagging { font-size:12px; color:Silver; }"
)
with demo:
processed_results_state = gr.State([])
highlight_terms_state = gr.State([])
num_results_state = gr.State(0)
exact_search_state = gr.State(False)
lang_state = gr.State("")
with gr.Row():
gr.Markdown(value=description)
with gr.Row():
query = gr.Textbox(
lines=1,
max_lines=1,
placeholder="Put your query in double quotes for exact search.",
label="Query",
)
with gr.Row():
lang = gr.Dropdown(
choices=[
"ar",
"ca",
"code",
"en",
"es",
"eu",
"fr",
"id",
"indic",
"nigercongo",
"pt",
"vi",
"zh",
"detect_language",
"all",
],
value="en",
label="Language",
)
with gr.Row():
k = gr.Slider(1, 100, value=10, step=1, label="Max Results")
with gr.Row():
submit_btn = gr.Button("Submit")
with gr.Row(visible=False) as datasets_filter:
available_datasets = gr.Dropdown(
type="value",
choices=[],
value=[],
label="Datasets Filter",
multiselect=True,
)
with gr.Row():
results = gr.HTML(label="Results")
with gr.Column(visible=False) as flagging_form:
flag_txt = gr.Textbox(
lines=1,
placeholder="Type here...",
label="""If you choose to flag your search, we will save the query, language and the number of results
you requested. Please consider adding relevant additional context below:""",
)
flag_btn = gr.Button("Flag Results")
flag_btn.click(flag, inputs=[query, lang, k, flag_txt], outputs=[flag_txt])
def submit(query, lang, k, dropdown_input):
print("submitting", query, lang, k)
query = query.strip()
exact_search = False
if query.startswith('"') and query.endswith('"') and len(query) >= 2:
exact_search = True
query = query[1:-1]
else:
query = " ".join(query.split())
if query == "" or query is None:
return None
results_html = ""
payload = request_payload(query, lang, exact_search, k)
err = extract_error_from_payload(payload)
if err is not None:
return process_error(err)
(
processed_results,
highlight_terms,
num_results,
datasets,
) = extract_results_from_payload(query, lang, payload, exact_search)
results_html = format_result_page(
lang, processed_results, highlight_terms, num_results, exact_search
)
return {
processed_results_state: processed_results,
highlight_terms_state: highlight_terms,
num_results_state: num_results,
exact_search_state: exact_search,
results: results_html,
flagging_form: gr.update(visible=True),
datasets_filter: gr.update(visible=True),
available_datasets: gr.Dropdown.update(
choices=datasets, value=datasets
),
}
def filter_datasets(
lang,
processed_results,
highlight_terms,
num_results,
exact_search,
datasets_filter,
):
results_html = format_result_page(
lang,
processed_results,
highlight_terms,
num_results,
exact_search,
datasets_filter,
)
return {results: results_html}
query.submit(
fn=submit,
inputs=[query, lang, k, available_datasets],
outputs=[
processed_results_state,
highlight_terms_state,
num_results_state,
exact_search_state,
results,
flagging_form,
datasets_filter,
available_datasets,
],
)
submit_btn.click(
submit,
inputs=[query, lang, k, available_datasets],
outputs=[
processed_results_state,
highlight_terms_state,
num_results_state,
exact_search_state,
results,
flagging_form,
datasets_filter,
available_datasets,
],
)
available_datasets.change(
filter_datasets,
inputs=[
lang,
processed_results_state,
highlight_terms_state,
num_results_state,
exact_search_state,
available_datasets,
],
outputs=[results],
)
demo.launch(enable_queue=True, debug=True)