from fastapi import FastAPI #from pydantic import BaseModel # from transformers import pipeline from txtai.embeddings import Embeddings from txtai.pipeline import Extractor from langchain.document_loaders import WebBaseLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain import HuggingFaceHub from langchain.prompts import PromptTemplate from langchain.chains import LLMChain from txtai.embeddings import Embeddings from txtai.pipeline import Extractor import pandas as pd import sqlite3 import os # NOTE - we configure docs_url to serve the interactive Docs at the root path # of the app. This way, we can use the docs as a landing page for the app on Spaces. app = FastAPI(docs_url="/") # app = FastAPI() os.environ["HUGGINGFACEHUB_API_TOKEN"] = "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" # pipe = pipeline("text2text-generation", model="google/flan-t5-small") # @app.get("/generate") # def generate(text: str): # """ # Using the text2text-generation pipeline from `transformers`, generate text # from the given input text. The model used is `google/flan-t5-small`, which # can be found [here](https://huggingface.co/google/flan-t5-small). # """ # output = pipe(text) # return {"output": output[0]["generated_text"]} def load_embeddings( domain: str = "", db_present: bool = True, path: str = "sentence-transformers/all-MiniLM-L6-v2", index_name: str = "index", ): # Create embeddings model with content support embeddings = Embeddings({"path": path, "content": True}) # if Vector DB is not present if not db_present: return embeddings else: if domain == "": embeddings.load(index_name) # change this later else: print(3) embeddings.load(f"{index_name}/{domain}") return embeddings def _check_if_db_exists(db_path: str) -> bool: return os.path.exists(db_path) def _text_splitter(doc): text_splitter = RecursiveCharacterTextSplitter( chunk_size=500, chunk_overlap=50, length_function=len, ) return text_splitter.transform_documents(doc) def _load_docs(path: str): load_doc = WebBaseLoader(path).load() doc = _text_splitter(load_doc) return doc def _stream(dataset, limit, index: int = 0): for row in dataset: yield (index, row.page_content, None) index += 1 if index >= limit: break def _max_index_id(path): db = sqlite3.connect(path) table = "sections" df = pd.read_sql_query(f"select * from {table}", db) return {"max_index": df["indexid"].max()} def _upsert_docs(doc, embeddings, vector_doc_path: str, db_present: bool): print(vector_doc_path) if db_present: print(1) max_index = _max_index_id(f"{vector_doc_path}/documents") print(max_index) embeddings.upsert(_stream(doc, 500, max_index["max_index"])) print("Embeddings done!!") embeddings.save(vector_doc_path) print("Embeddings done - 1!!") else: print(2) embeddings.index(_stream(doc, 500, 0)) embeddings.save(vector_doc_path) max_index = _max_index_id(f"{vector_doc_path}/documents") print(max_index) # check # max_index = _max_index_id(f"{vector_doc_path}/documents") # print(max_index) return max_index # def prompt(question): # return f"""Answer the following question using only the context below. Say 'no answer' when the question can't be answered. # Question: {question} # Context: """ # def search(query, question=None): # # Default question to query if empty # if not question: # question = query # return extractor([("answer", query, prompt(question), False)])[0][1] # @app.get("/rag") # def rag(question: str): # # question = "what is the document about?" # answer = search(question) # # print(question, answer) # return {answer} # @app.get("/index") # def get_url_file_path(url_path: str): # embeddings = load_embeddings() # doc = _load_docs(url_path) # embeddings, max_index = _upsert_docs(doc, embeddings) # return max_index @app.get("/index/{domain}/") def get_domain_file_path(domain: str, file_path: str): print(domain, file_path) print(os.getcwd()) bool_value = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") print(bool_value) if bool_value: embeddings = load_embeddings(domain=domain, db_present=bool_value) print(embeddings) doc = _load_docs(file_path) max_index = _upsert_docs( doc=doc, embeddings=embeddings, vector_doc_path=f"{os.getcwd()}/index/{domain}", db_present=bool_value, ) # print("-------") else: embeddings = load_embeddings(domain=domain, db_present=bool_value) doc = _load_docs(file_path) max_index = _upsert_docs( doc=doc, embeddings=embeddings, vector_doc_path=f"{os.getcwd()}/index/{domain}", db_present=bool_value, ) # print("Final - output : ", max_index) return "Executed Successfully!!" def _check_if_db_exists(db_path: str) -> bool: return os.path.exists(db_path) def _load_embeddings_from_db( db_present: bool, domain: str, #path: str = "sentence-transformers/all-MiniLM-L6-v2", path: str = "sentence-transformers/nli-mpnet-base-v2", ): # Create embeddings model with content support embeddings = Embeddings({"path": path, "content": True}) # if Vector DB is not present if not db_present: print("db not present") return embeddings else: if domain == "": print("domain empty") embeddings.load("index") # change this later else: print(3) embeddings.load(f"{os.getcwd()}/index/{domain}") return embeddings def _prompt(question): return f"""Answer the following question using only the context below. Say 'Could not find answer within the context' when the question can't be answered. Question: {question} Context: """ def _search(query, extractor, question=None): # Default question to query if empty if not question: question = query # template = f"""Answer the following question using only the context below. Say 'no answer' when the question can't be answered. # Question: {question} # Context: """ # prompt = PromptTemplate(template=template, input_variables=["question"]) # llm_chain = LLMChain(prompt=prompt, llm=extractor) # return {"question": question, "answer": llm_chain.run(question)} print(extractor([("answer", query, _prompt(question), False)])) return extractor([("answer", query, _prompt(question), False)])[0][1] # class ModelOutputEvaluate(BaseModel): # question: str # answer: str # domain: str # context: str class BasePromptContext: def __init__(self): self.variables_list = ["question","answer","context"] self.base_template = """Please act as an impartial judge and evaluate the quality of the provided answer which attempts to answer the provided question based on a provided context. And you'll need to submit your grading for the correctness, comprehensiveness and readability of the answer, using JSON format with the 2 items in parenthesis: ("score": [your score number for the correctness of the answer], "reasoning": [your one line step by step reasoning about the correctness of the answer]) Below is your grading rubric: - Correctness: If the answer correctly answer the question, below are the details for different scores: - Score 0: the answer is completely incorrect, doesn’t mention anything about the question or is completely contrary to the correct answer. - For example, when asked “How to terminate a databricks cluster”, the answer is empty string, or content that’s completely irrelevant, or sorry I don’t know the answer. - Score 4: the answer provides some relevance to the question and answer one aspect of the question correctly. - Example: - Question: How to terminate a databricks cluster - Answer: Databricks cluster is a cloud-based computing environment that allows users to process big data and run distributed data processing tasks efficiently. - Or answer: In the Databricks workspace, navigate to the "Clusters" tab. And then this is a hard question that I need to think more about it - Score 7: the answer mostly answer the question but is missing or hallucinating on one critical aspect. - Example: - Question: How to terminate a databricks cluster” - Answer: “In the Databricks workspace, navigate to the "Clusters" tab. Find the cluster you want to terminate from the list of active clusters. And then you’ll find a button to terminate all clusters at once” - Score 10: the answer correctly answer the question and not missing any major aspect - Example: - Question: How to terminate a databricks cluster - Answer: In the Databricks workspace, navigate to the "Clusters" tab. Find the cluster you want to terminate from the list of active clusters. Click on the down-arrow next to the cluster name to open the cluster details. Click on the "Terminate" button. A confirmation dialog will appear. Click "Terminate" again to confirm the action.” Provided question: {question} Provided answer: {answer} Provided context: {context} Please provide your grading for the correctness and explain you gave the particular grading""" class Evaluater: def __init__(self, item): self.question = item["question"] self.answer = item["answer"] self.domain = item["domain"] self.context = item["context"] self.llm=HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature":1, "max_length":1000000}) def get_prompt_template(self): prompt = BasePromptContext() template = prompt.base_template varialbles = prompt.variables_list eval_template = PromptTemplate(input_variables=varialbles, template=template) return eval_template def evaluate(self): prompt = self.get_prompt_template().format(question = self.question, answer = self.answer, context = self.context) score = self.llm(prompt) return score # Create extractor instance def _create_evaluation_scenario(item): output = { "input": item, "score" : Evaluater(item).evaluate() } return output @app.get("/rag") def rag(domain: str, question: str, evaluate: bool): print() db_exists = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") print(db_exists) bool_value = _check_if_db_exists(db_path=f"{os.getcwd()}/index/{domain}/documents") print(bool_value) # if db_exists: embeddings = _load_embeddings_from_db(db_exists, domain) # Create extractor instance #extractor = Extractor(embeddings, "google/flan-t5-base") #extractor = Extractor(embeddings, "TheBloke/Llama-2-7B-GGUF") print("before calling extractor") #extractor = Extractor(embeddings, "distilbert-base-cased-distilled-squad") extractor = Extractor(embeddings, "google/flan-t5-base") # llm = HuggingFaceHub( # repo_id="google/flan-t5-xxl", # model_kwargs={"temperature": 1, "max_length": 1000000}, # ) # else: print("before doing Q&A") answer = _search(question, extractor) #text = _prompt(question) #text += "\n" + "\n".join(x["text"] for x in embeddings.search(question)) context_list = sorted(embeddings.search(question), key=lambda x:x['id']) context = "\n".join(x["text"] for x in context_list) scored_value = "" if evaluate: scored_value = _create_evaluation_scenario({ "question": question, "answer": answer, "domain": domain, "context": context }) else: scored_value = { "input": {"question": question, "answer": answer, "domain": domain, "context": context}, "score": "Evaluation is Turned OFF" } return {"question": question, "answer": answer, "context": context, "score": scored_value["score"]}