Spaces:
Sleeping
Sleeping
# -*- coding: utf-8 -*- | |
"""Копія записника "Копія записника "Untitled8.ipynb"" | |
Automatically generated by Colab. | |
Original file is located at | |
https://colab.research.google.com/drive/1MdqEwbEry_Z-wnu7pkXI_bvJD0Qz26VU | |
""" | |
from datasets import load_dataset | |
from sentence_transformers import SentenceTransformer, util | |
import torch | |
from llama_index.core.node_parser import SentenceSplitter | |
from llama_index.core import Document | |
from rank_bm25 import BM25Okapi | |
from nltk.tokenize import word_tokenize | |
import gradio as gr | |
from llama_index.core.schema import NodeRelationship | |
from groq import Groq | |
import nltk | |
ds = load_dataset("lucadiliello/english_wikipedia", split='train[:5000]') | |
dataset=ds | |
arr=[Document(id_=i["url"], text=i["maintext"]) for i in dataset] | |
#documents = Document(id_=dataset["url"], text=dataset["maintext"]) | |
splitter = SentenceSplitter( | |
chunk_size=524, | |
chunk_overlap=20, | |
) | |
nodes = splitter.get_nodes_from_documents(arr) | |
"""### BM25 & Minilm""" | |
nltk.download('punkt_tab') | |
# Prepare corpus for BM25 | |
corpus = [node.text for node in nodes] | |
tokenized_corpus = [word_tokenize(doc.lower()) for doc in corpus] | |
# Initialize BM25 | |
bm25 = BM25Okapi(tokenized_corpus) | |
# Define BM25 search function | |
def search_bm25(query, top_k=5): | |
tokenized_query = word_tokenize(query.lower()) | |
scores = bm25.get_scores(tokenized_query) | |
ranked_indices = sorted(range(len(scores)), key=lambda i: scores[i], reverse=True) | |
return [(nodes[i], scores[i]) for i in ranked_indices[:top_k]] | |
corpus_embeddings=torch.load("./my_model (1)") | |
model = SentenceTransformer('all-MiniLM-L6-v2') | |
# Define semantic search function | |
def search_semantic(query, top_k=None): | |
""" | |
Perform semantic search to find relevant documents. | |
Args: | |
query (str): The search query. | |
top_k (int or None): Number of top results to return. If None, return scores for all documents. | |
Returns: | |
list: A list of tuples (Document, score) sorted by relevance. | |
""" | |
# Encode the query | |
query_embedding = model.encode(query, convert_to_tensor=True) | |
# Compute similarity scores | |
scores = util.cos_sim(query_embedding, corpus_embeddings)[0] | |
# Handle top_k=None | |
if top_k is None or top_k > len(scores): | |
top_k = len(scores) | |
# Get the top_k results | |
ranked_indices = torch.topk(scores, k=top_k).indices | |
return [(nodes[i], scores[i].item()) for i in ranked_indices] | |
def combined_search(query, bm25_weight=0.5, semantic_weight=0.5, top_k=5): | |
# BM25 results | |
bm25_results = search_bm25(query, top_k=None) # Get scores for all documents | |
# Semantic results | |
semantic_results = search_semantic(query, top_k=None) # Get scores for all documents | |
# Normalize scores | |
bm25_scores = {doc.doc_id: score for doc, score in bm25_results} | |
semantic_scores = {doc.doc_id: score for doc, score in semantic_results} | |
# Combine scores | |
combined_scores = {} | |
for doc in nodes: | |
bm25_score = bm25_scores.get(doc.doc_id, 0) | |
semantic_score = semantic_scores.get(doc.doc_id, 0) | |
combined_score = bm25_weight * bm25_score + semantic_weight * semantic_score | |
combined_scores[doc.doc_id] = combined_score | |
# Sort by combined score | |
sorted_docs = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) | |
# Retrieve top_k results | |
return [(next(node for node in nodes if node.doc_id == doc_id), score) for doc_id, score in sorted_docs[:top_k]] | |
def search(query, method="bm25", top_k=5, bm25_weight=0.5, semantic_weight=0.5): | |
""" | |
Search documents using BM25, semantic search, or a combined method. | |
Args: | |
query (str): The search query. | |
method (str): The retrieval method: "bm25", "semantic", or "combined". | |
top_k (int): Number of top results to return. | |
bm25_weight (float): Weight for BM25 in combined search (default: 0.5). | |
semantic_weight (float): Weight for semantic search in combined search (default: 0.5). | |
Returns: | |
list: A list of tuples containing (Document, score). | |
""" | |
if method == "bm25": | |
return search_bm25(query, top_k=top_k) | |
elif method == "semantic": | |
return search_semantic(query, top_k=top_k) | |
elif method == "combined": | |
# Get scores from both BM25 and semantic search | |
bm25_results = search_bm25(query, top_k=None) # Retrieve scores for all documents | |
semantic_results = search_semantic(query, top_k=None) # Retrieve scores for all documents | |
# Normalize and combine scores | |
bm25_scores = {doc.id_: score for doc, score in bm25_results} | |
semantic_scores = {doc.id_: score for doc, score in semantic_results} | |
combined_scores = {} | |
for doc in nodes: | |
bm25_score = bm25_scores.get(doc.id_, 0) | |
semantic_score = semantic_scores.get(doc.id_, 0) | |
combined_score = bm25_weight * bm25_score + semantic_weight * semantic_score | |
combined_scores[doc.id_] = combined_score | |
# Sort by combined score | |
sorted_docs = sorted(combined_scores.items(), key=lambda x: x[1], reverse=True) | |
# Retrieve top_k results | |
return [(next(node for node in nodes if node.id_ == id_), score) for id_, score in sorted_docs[:top_k]] | |
else: | |
raise ValueError("Invalid search method. Choose 'bm25', 'semantic', or 'combined'.") | |
def call_llm(documents, question, api_key): | |
# Питання, яке має бути вставлене в промпт question = "Where is George Town airport" | |
# Формуємо промпт з автоматично вставленими документами та питанням | |
prompt = f""" | |
Give me a concise answer to the following question: "{question}". | |
Use only the following documents: | |
""" | |
retrieved_docs = "" | |
# Додаємо кожен документ з посиланням | |
for iter,doc in enumerate(documents): | |
link=None | |
link = doc.relationships[NodeRelationship.SOURCE].node_id | |
retrieved_docs +=str(iter+1)+"."+doc.text +"\n"*3+"_"*100+"\n"*3 | |
prompt += f'```{doc.text}``` Link:```{link}```\n' | |
# Завершуємо промпт з інструкцією для відповіді | |
prompt += """ | |
In your response, specify which part of the answer was obtained from which document, marking them as [1], [2], etc. After your answer, provide a list of the links with captions, but only include the links that were actually used in your answer. | |
Use this for separating Links and Text:`--- Links Below ---` | |
In link section, write links like: [number of link] link. Do not write anything else in this section. | |
Explanation for LLM (Language Learning Model): | |
- The text in quotes, for example "```doc```" and "Link:```{link}```", is used to represent the names of documents and their corresponding URLs or links. | |
- In this case, "Link:```{link}```" refers to the title or name of a document, and "Link:```{link}```" refers to the URL or web address where the document can be found. | |
- Your task is to refer to the specific document and the corresponding link when generating the answer. For instance, if part of the answer is from "Document 1", you should indicate that in your response as [1]. | |
- After providing the answer, include only the links that you actually used in your response, and list them with captions. If you did not use a particular document, do not include its link. | |
""" | |
print(prompt) | |
client = Groq( | |
api_key=api_key, | |
) | |
chat_completion = client.chat.completions.create( | |
messages=[ | |
{ | |
"role": "user", | |
"content": prompt[:], | |
} | |
], | |
model="llama3-70b-8192", | |
) | |
response=chat_completion.choices[0].message.content | |
response_splitted = response.split("--- Links Below ---") | |
if len(response_splitted) > 1: | |
main_answer = response_splitted[0] | |
links_section = response_splitted[1] | |
else: | |
main_answer = response_splitted[0] | |
links_section = "" | |
print(links_section.strip()) | |
return main_answer.strip(), retrieved_docs, links_section.strip() | |
# Тепер у вас є main_answer для основної частини та links_section для лінків | |
#print("Main Answer:", main_answer.strip()) | |
#print("Links Section:", links_section.strip()) | |
# Function to process user input | |
def process_input(api_key, query, search_type, keyword_percentage, num_docs): | |
try: | |
if search_type == "Full Search" and (keyword_percentage < 0 or keyword_percentage > 100): | |
return "Invalid percentage. Please enter a value between 0 and 100.", [], [], "" | |
if num_docs <= 0: | |
return "Number of documents must be greater than 0.", [], [], "" | |
# Simulating search methods | |
retrieved_docs = [] | |
sources = [] | |
if search_type == "Keyword Search": | |
# BM25 | |
results = search(query, method="bm25", top_k=num_docs) | |
elif search_type == "Semantic Search": | |
# Semantic | |
results = search(query, method="semantic",top_k=num_docs) | |
elif search_type == "Full Search": | |
# Split documents based on percentage | |
keyword_count = int(keyword_percentage / 100 * num_docs) | |
semantic_count = num_docs - keyword_count | |
results = search(query, method="combined", bm25_weight=keyword_percentage/100, semantic_weight=(100-keyword_percentage)/100, top_k=num_docs) | |
docs=([i[0] for i in results]) | |
print(docs) | |
response, retrieved_docs, sources_text = call_llm(docs, query, api_key) | |
except Exception as e: | |
sources_text=e | |
print(e) | |
retrieved_docs="" | |
response="" | |
return response, retrieved_docs, sources_text | |
# Service description | |
description = """ | |
## Retrieval-augmented generation by **Vitalii Pikhotskii** 😎 and **Sviatoslav Shainoha** 😉 | |
Enter your LLM access key and a query in the text fields below. Use the checkboxes to enable/disable different search methods: | |
1. **Keyword Search:** Searches by keywords in documents (BM25). | |
2. **Semantic Search:** Uses semantic similarity for document retrieval. | |
3. **Full Search:** Combines both methods. Specify the percentage of keyword and semantic retrieval. | |
""" | |
with gr.Blocks() as demo: | |
gr.Markdown(description) | |
with gr.Row(): | |
api_key_input = gr.Textbox(label="LLM API Key", placeholder="Enter your key") | |
query_input = gr.Textbox(label="Query", placeholder="For example: Who is Marry Cassatt?") | |
with gr.Row(): | |
search_type_selector = gr.Radio( | |
choices=["Keyword Search", "Semantic Search", "Full Search"], | |
label="Search Type", | |
value="Keyword Search" | |
) | |
with gr.Row(visible=False) as full_search_options: | |
keyword_percentage_slider = gr.Slider( | |
label="Keyword Search Percentage", | |
minimum=0, | |
maximum=100, | |
step=1, | |
value=50 | |
) | |
search_type_selector.change( | |
lambda choice: gr.update(visible=choice == "Full Search"), | |
inputs=search_type_selector, | |
outputs=full_search_options | |
) | |
num_docs_input = gr.Number( | |
label="Number of Documents", | |
value=5, | |
precision=0, | |
minimum=1 | |
) | |
with gr.Row(): | |
submit_btn = gr.Button("Submit") | |
with gr.Row(): | |
response_output = gr.Textbox(label="System Response") | |
with gr.Row(): | |
sources_output = gr.Textbox(label="Sources", lines=5) | |
with gr.Row(): | |
retrieved_docs_output = gr.Textbox(label="Documents (Retriever)", lines=5) | |
#reranked_docs_output = gr.Textbox(label="Documents (Reranker)", lines=5) | |
submit_btn.click( | |
process_input, | |
inputs=[ | |
api_key_input, | |
query_input, | |
search_type_selector, | |
keyword_percentage_slider, | |
num_docs_input | |
], | |
outputs=[response_output, retrieved_docs_output, sources_output] | |
) | |
# Launching the app | |
demo.launch() | |