App / app.py
Jai12345's picture
Update app.py
2dcebd3
from sentence_transformers import SentenceTransformer, CrossEncoder, util
import re
import pandas as pd
from newspaper import Article
import docx2txt
from io import StringIO
from PyPDF2 import PdfFileReader
import validators
import nltk
import streamlit as st
import pickle
nltk.download('punkt')
from nltk import sent_tokenize
def extract_text_from_url(url: str):
'''Extract text from url'''
article = Article(url)
article.download()
article.parse()
# get text
text = article.text
# get article title
title = article.title
return title, text
def extract_text_from_file(file):
'''Extract text from uploaded file'''
# read text file
if file.type == "text/plain":
# To convert to a string based IO:
stringio = StringIO(file.getvalue().decode("utf-8"))
# To read file as string:
file_text = stringio.read()
return file_text, None
# read pdf file
elif file.type == "application/pdf":
pdfReader = PdfFileReader(file)
count = pdfReader.numPages
all_text = ""
pdf_title = pdfReader.getDocumentInfo().title
for i in range(count):
try:
page = pdfReader.getPage(i)
all_text += page.extractText()
except:
continue
file_text = all_text
return file_text, pdf_title
# read docx file
elif (
file.type
== "application/vnd.openxmlformats-officedocument.wordprocessingml.document"
):
file_text = docx2txt.process(file)
return file_text, None
def preprocess_plain_text(text, window_size=3):
text = text.encode("ascii", "ignore").decode() # unicode
text = re.sub(r"https*\S+", " ", text) # url
text = re.sub(r"@\S+", " ", text) # mentions
text = re.sub(r"#\S+", " ", text) # hastags
text = re.sub(r"\s{2,}", " ", text) # over spaces
text = re.sub("[^.,!?%$A-Za-z0-9]+", " ", text) # special characters except .,!?
# break into lines and remove leading and trailing space on each
lines = [line.strip() for line in text.splitlines()]
# #break multi-headlines into a line each
chunks = [phrase.strip() for line in lines for phrase in line.split(" ")]
# drop blank lines
text = '\n'.join(chunk for chunk in chunks if chunk)
# We split this article into paragraphs and then every paragraph into sentences
paragraphs = []
for paragraph in text.replace('\n', ' ').split("\n\n"):
if len(paragraph.strip()) > 0:
paragraphs.append(sent_tokenize(paragraph.strip()))
window_size = 3
passages = []
for paragraph in paragraphs:
for start_idx in range(0, len(paragraph), window_size):
end_idx = min(start_idx + window_size, len(paragraph))
passages.append(" ".join(paragraph[start_idx:end_idx]))
return passages
def bi_encode(bi_enc,passages):
global bi_encoder
# We use the Bi-Encoder to encode all passages, so that we can use it with sematic search
bi_encoder = SentenceTransformer(bi_enc)
# Compute the embeddings
with st.spinner('Encoding passages into a vector space...'):
corpus_embeddings = bi_encoder.encode(passages, convert_to_tensor=True, show_progress_bar=True)
st.success(f"Embeddings computed.")
return bi_encoder, corpus_embeddings
def cross_encode():
global cross_encoder
# We use a cross-encoder, to re-rank the results list to improve the quality
cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-12-v2')
return cross_encoder
def display_as_table(model, score='score'):
# Display the df with text and scores as a table
df = pd.DataFrame([(hit[score],passages[hit['corpus_id']]) for hit in model[0:2]],columns=['Score','Text'])
df['Score'] = round(df['Score'],2)
return df
st.title("Search Your Query Here")
window_size = 3
bi_encoder_type = "multi-qa-mpnet-base-dot-v1"
# This will search articles for passages to answer the query
def search_func(query):
global bi_encoder, cross_encoder
st.subheader(f"Search Query: {query}")
if url_text:
st.write(f"Document Header: {title}")
elif pdf_title:
st.write(f"Document Header: {pdf_title}")
# Encode the query using the bi-encoder and find relevant answers
question_embedding = bi_encoder.encode(query, convert_to_tensor=True)
question_embedding = question_embedding.cpu()
hits = util.semantic_search(question_embedding, corpus_embeddings, top_k=2, score_function=util.dot_score)
hits = hits[0] # Get the hits for the first query
# Now, score all retrieved passages with the cross_encoder
cross_inp = [[query, passages[hit['corpus_id']]] for hit in hits]
cross_scores = cross_encoder.predict(cross_inp)
# Sort results by the cross-encoder scores
for idx in range(len(cross_scores)):
hits[idx]['cross-score'] = cross_scores[idx]
# Output of top hits from cross encoder
st.markdown("\n-------------------------\n")
st.subheader(f"Top 2 Results")
hits = sorted(hits, key=lambda x: x['cross-score'], reverse=True)
rerank_df = display_as_table(hits, 'cross-score')
st.write(rerank_df.to_html(index=False), unsafe_allow_html=True)
def clear_text():
st.session_state["text_url"] = ""
st.session_state["text_input"] = ""
def clear_search_text():
st.session_state["text_input"] = ""
url_text = st.text_input("Please Enter a url here",value="https://en.wikipedia.org/wiki/Virat_Kohli",key='text_url', on_change=clear_search_text)
st.markdown(
"<h3 style='text-align: center; color: red;'>OR</h3>",
unsafe_allow_html=True,
)
upload_doc = st.file_uploader("Upload a .txt, .pdf, .docx file", key="upload")
search_query = st.text_input("Please Enter your search query here",
value="Who is Virat Kohli?", key="text_input")
if validators.url(url_text):
# if input is URL
title, text = extract_text_from_url(url_text)
passages = preprocess_plain_text(text, window_size=3)
elif upload_doc:
text, pdf_title = extract_text_from_file(upload_doc)
passages = preprocess_plain_text(text, window_size=3)
col1, col2 = st.columns(2)
with col1:
search = st.button("Search", key='search_but', help='Click to Search!!')
with col2:
clear = st.button("Clear Text Input", on_click=clear_text, key='clear',help='Click to clear the URL and query')
if search:
if bi_encoder_type:
with st.spinner(
text=f"Loading..........................."
):
bi_encoder, corpus_embeddings = bi_encode(bi_encoder_type,passages)
cross_encoder = cross_encode()
with st.spinner(
text="Embedding completed, searching for relevant text for given query and hits..."):
search_func(search_query)
st.markdown("""
""")