Spaces:
Runtime error
Runtime error
from fastapi import FastAPI | |
#from pydantic import BaseModel | |
# from transformers import pipeline | |
from txtai.embeddings import Embeddings | |
from txtai.pipeline import Extractor | |
from langchain.document_loaders import WebBaseLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain import HuggingFaceHub | |
from langchain.prompts import PromptTemplate | |
from langchain.chains import LLMChain | |
from txtai.embeddings import Embeddings | |
from txtai.pipeline import Extractor | |
import pandas as pd | |
import sqlite3 | |
import os | |
# NOTE - we configure docs_url to serve the interactive Docs at the root path | |
# of the app. This way, we can use the docs as a landing page for the app on Spaces. | |
app = FastAPI(docs_url="/") | |
# app = FastAPI() | |
os.environ["HUGGINGFACEHUB_API_TOKEN"] = "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" | |
# pipe = pipeline("text2text-generation", model="google/flan-t5-small") | |
# @app.get("/generate") | |
# def generate(text: str): | |
# """ | |
# Using the text2text-generation pipeline from `transformers`, generate text | |
# from the given input text. The model used is `google/flan-t5-small`, which | |
# can be found [here](https://huggingface.co/google/flan-t5-small). | |
# """ | |
# output = pipe(text) | |
# return {"output": output[0]["generated_text"]} | |
def load_embeddings( | |
domain: str = "", | |
db_present: bool = True, | |
path: str = "sentence-transformers/all-MiniLM-L6-v2", | |
index_name: str = "index", | |
): | |
# Create embeddings model with content support | |
embeddings = Embeddings({"path": path, "content": True}) | |
# if Vector DB is not present | |
if not db_present: | |
return embeddings | |
else: | |
if domain == "": | |
embeddings.load(index_name) # change this later | |
else: | |
print(3) | |
embeddings.load(f"{index_name}/{domain}") | |
return embeddings | |
def _check_if_db_exists(db_path: str) -> bool: | |
return os.path.exists(db_path) | |
def _text_splitter(doc): | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=500, | |
chunk_overlap=50, | |
length_function=len, | |
) | |
return text_splitter.transform_documents(doc) | |
def _load_docs(path: str): | |
load_doc = WebBaseLoader(path).load() | |
doc = _text_splitter(load_doc) | |
return doc | |
def _stream(dataset, limit, index: int = 0): | |
for row in dataset: | |
yield (index, row.page_content, None) | |
index += 1 | |
if index >= limit: | |
break | |
def _max_index_id(path): | |
db = sqlite3.connect(path) | |
table = "sections" | |
df = pd.read_sql_query(f"select * from {table}", db) | |
return {"max_index": df["indexid"].max()} | |
def _upsert_docs(doc, embeddings, vector_doc_path: str, db_present: bool): | |
print(vector_doc_path) | |
if db_present: | |
print(1) | |
max_index = _max_index_id(f"{vector_doc_path}/documents") | |
print(max_index) | |
embeddings.upsert(_stream(doc, 500, max_index["max_index"])) | |
print("Embeddings done!!") | |
embeddings.save(vector_doc_path) | |
print("Embeddings done - 1!!") | |
else: | |
print(2) | |
embeddings.index(_stream(doc, 500, 0)) | |
embeddings.save(vector_doc_path) | |
max_index = _max_index_id(f"{vector_doc_path}/documents") | |
print(max_index) | |
# check | |
# max_index = _max_index_id(f"{vector_doc_path}/documents") | |
# print(max_index) | |
return max_index | |
# def prompt(question): | |
# return f"""Answer the following question using only the context below. Say 'no answer' when the question can't be answered. | |
# Question: {question} | |
# Context: """ | |
# def search(query, question=None): | |
# # Default question to query if empty | |
# if not question: | |
# question = query | |
# return extractor([("answer", query, prompt(question), False)])[0][1] | |
# @app.get("/rag") | |
# def rag(question: str): | |
# # question = "what is the document about?" | |
# answer = search(question) | |
# # print(question, answer) | |
# return {answer} | |
# @app.get("/index") | |
# def get_url_file_path(url_path: str): | |
# embeddings = load_embeddings() | |
# doc = _load_docs(url_path) | |
# embeddings, max_index = _upsert_docs(doc, embeddings) | |
# return max_index | |
def get_domain_file_path(domain: str, file_path: str): | |
print(domain, file_path) | |
print(os.getcwd()) | |
bool_value = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") | |
print(bool_value) | |
if bool_value: | |
embeddings = load_embeddings(domain=domain, db_present=bool_value) | |
print(embeddings) | |
doc = _load_docs(file_path) | |
max_index = _upsert_docs( | |
doc=doc, | |
embeddings=embeddings, | |
vector_doc_path=f"{os.getcwd()}/index/{domain}", | |
db_present=bool_value, | |
) | |
# print("-------") | |
else: | |
embeddings = load_embeddings(domain=domain, db_present=bool_value) | |
doc = _load_docs(file_path) | |
max_index = _upsert_docs( | |
doc=doc, | |
embeddings=embeddings, | |
vector_doc_path=f"{os.getcwd()}/index/{domain}", | |
db_present=bool_value, | |
) | |
# print("Final - output : ", max_index) | |
return "Executed Successfully!!" | |
def _check_if_db_exists(db_path: str) -> bool: | |
return os.path.exists(db_path) | |
def _load_embeddings_from_db( | |
db_present: bool, | |
domain: str, | |
#path: str = "sentence-transformers/all-MiniLM-L6-v2", | |
path: str = "sentence-transformers/nli-mpnet-base-v2", | |
): | |
# Create embeddings model with content support | |
embeddings = Embeddings({"path": path, "content": True}) | |
# if Vector DB is not present | |
if not db_present: | |
print("db not present") | |
return embeddings | |
else: | |
if domain == "": | |
print("domain empty") | |
embeddings.load("index") # change this later | |
else: | |
print(3) | |
embeddings.load(f"{os.getcwd()}/index/{domain}") | |
return embeddings | |
def _prompt(question): | |
return f"""Answer the following question using only the context below. Say 'Could not find answer within the context' when the question can't be answered. | |
Question: {question} | |
Context: """ | |
def _search(query, extractor, question=None): | |
# Default question to query if empty | |
if not question: | |
question = query | |
# template = f"""Answer the following question using only the context below. Say 'no answer' when the question can't be answered. | |
# Question: {question} | |
# Context: """ | |
# prompt = PromptTemplate(template=template, input_variables=["question"]) | |
# llm_chain = LLMChain(prompt=prompt, llm=extractor) | |
# return {"question": question, "answer": llm_chain.run(question)} | |
print(extractor([("answer", query, _prompt(question), False)])) | |
return extractor([("answer", query, _prompt(question), False)])[0][1] | |
# class ModelOutputEvaluate(BaseModel): | |
# question: str | |
# answer: str | |
# domain: str | |
# context: str | |
class BasePromptContext: | |
def __init__(self): | |
self.variables_list = ["question","answer","context"] | |
self.base_template = """Please act as an impartial judge and evaluate the quality of the provided answer which attempts to answer the provided question based on a provided context. | |
And you'll need to submit your grading for the correctness, comprehensiveness and readability of the answer, using JSON format with the 2 items in parenthesis: | |
("score": [your score number for the correctness of the answer], "reasoning": [your one line step by step reasoning about the correctness of the answer]) | |
Below is your grading rubric: | |
- Correctness: If the answer correctly answer the question, below are the details for different scores: | |
- Score 0: the answer is completely incorrect, doesn’t mention anything about the question or is completely contrary to the correct answer. | |
- For example, when asked “How to terminate a databricks cluster”, the answer is empty string, or content that’s completely irrelevant, or sorry I don’t know the answer. | |
- Score 4: the answer provides some relevance to the question and answer one aspect of the question correctly. | |
- Example: | |
- Question: How to terminate a databricks cluster | |
- Answer: Databricks cluster is a cloud-based computing environment that allows users to process big data and run distributed data processing tasks efficiently. | |
- Or answer: In the Databricks workspace, navigate to the "Clusters" tab. And then this is a hard question that I need to think more about it | |
- Score 7: the answer mostly answer the question but is missing or hallucinating on one critical aspect. | |
- Example: | |
- Question: How to terminate a databricks cluster” | |
- Answer: “In the Databricks workspace, navigate to the "Clusters" tab. | |
Find the cluster you want to terminate from the list of active clusters. | |
And then you’ll find a button to terminate all clusters at once” | |
- Score 10: the answer correctly answer the question and not missing any major aspect | |
- Example: | |
- Question: How to terminate a databricks cluster | |
- Answer: In the Databricks workspace, navigate to the "Clusters" tab. | |
Find the cluster you want to terminate from the list of active clusters. | |
Click on the down-arrow next to the cluster name to open the cluster details. | |
Click on the "Terminate" button. A confirmation dialog will appear. Click "Terminate" again to confirm the action.” | |
Provided question: | |
{question} | |
Provided answer: | |
{answer} | |
Provided context: | |
{context} | |
Please provide your grading for the correctness and explain you gave the particular grading""" | |
class Evaluater: | |
def __init__(self, item): | |
self.question = item["question"] | |
self.answer = item["answer"] | |
self.domain = item["domain"] | |
self.context = item["context"] | |
self.llm=HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature":1, "max_length":1000000}) | |
def get_prompt_template(self): | |
prompt = BasePromptContext() | |
template = prompt.base_template | |
varialbles = prompt.variables_list | |
eval_template = PromptTemplate(input_variables=varialbles, template=template) | |
return eval_template | |
def evaluate(self): | |
prompt = self.get_prompt_template().format(question = self.question, answer = self.answer, context = self.context) | |
score = self.llm(prompt) | |
return score | |
# Create extractor instance | |
def _create_evaluation_scenario(item): | |
output = { | |
"input": item, | |
"score" : Evaluater(item).evaluate() | |
} | |
return output | |
def rag(domain: str, question: str, evaluate: bool): | |
print() | |
db_exists = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") | |
print(db_exists) | |
bool_value = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") | |
print(bool_value) | |
# if db_exists: | |
embeddings = _load_embeddings_from_db(db_exists, domain) | |
# Create extractor instance | |
#extractor = Extractor(embeddings, "google/flan-t5-base") | |
#extractor = Extractor(embeddings, "TheBloke/Llama-2-7B-GGUF") | |
print("before calling extractor") | |
#extractor = Extractor(embeddings, "distilbert-base-cased-distilled-squad") | |
extractor = Extractor(embeddings, "google/flan-t5-base") | |
# llm = HuggingFaceHub( | |
# repo_id="google/flan-t5-xxl", | |
# model_kwargs={"temperature": 1, "max_length": 1000000}, | |
# ) | |
# else: | |
print("before doing Q&A") | |
answer = _search(question, extractor) | |
#text = _prompt(question) | |
#text += "\n" + "\n".join(x["text"] for x in embeddings.search(question)) | |
context_list = sorted(embeddings.search(question), key=lambda x:x['id']) | |
context = "\n".join(x["text"] for x in context_list) | |
scored_value = "" | |
if evaluate: | |
scored_value = _create_evaluation_scenario({ | |
"question": question, | |
"answer": answer, | |
"domain": domain, | |
"context": context | |
}) | |
else: | |
scored_value = { | |
"input": {"question": question, "answer": answer, "domain": domain, "context": context}, | |
"score": "Evaluation is Turned OFF" | |
} | |
return {"question": question, "answer": answer, "context": context, "score": scored_value["score"]} |