MedChatBot / question_processing.py
Pedrampedram's picture
Upload 4 files
37e6628
# Import necessary libraries
import os
import textwrap
import pandas as pd
from langchain import HuggingFaceHub
from langchain.document_loaders import TextLoader
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.text_splitter import CharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain.chains.question_answering import load_qa_chain
from transformers import AutoTokenizer
def wrap_text_preserve_newlines(text, width=110):
lines = text.split('\n')
wrapped_lines = [textwrap.fill(line, width=width) for line in lines]
wrapped_text = '\n'.join(wrapped_lines)
return wrapped_text
def split_into_chunks(text, tokenizer, max_tokens=500):
tokens = tokenizer.encode(text, return_tensors="pt").squeeze()
token_chunks = []
current_chunk = []
current_chunk_len = 0
for token in tokens:
token_len = len(tokenizer.decode(token.item()))
if current_chunk_len + token_len + 1 > max_tokens:
token_chunks.append(tokenizer.decode(current_chunk))
current_chunk = []
current_chunk_len = 0
current_chunk.append(token.item())
current_chunk_len += token_len + 1
if current_chunk:
token_chunks.append(tokenizer.decode(current_chunk))
return token_chunks
tokenizer = AutoTokenizer.from_pretrained("google/flan-t5-xxl")
class TextDocument:
def __init__(self, content, id, metadata=None):
self.page_content = content
self.metadata = metadata if metadata is not None else {}
self.metadata['id'] = id
os.environ["HUGGINGFACEHUB_API_TOKEN"] = "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"
data_frame = pd.read_csv("dataset.tsv", sep="\t", nrows=1000)
data = data_frame.to_dict(orient="records")
documents = [TextDocument(content=str(item["answer"]), id=item["id"]) for item in data]
text_splitter = CharacterTextSplitter(chunk_size=500, chunk_overlap=0)
docs = text_splitter.split_documents(documents)
embeddings = HuggingFaceEmbeddings()
db = FAISS.from_documents(docs, embeddings)
llm = HuggingFaceHub(repo_id="google/flan-t5-xxl", model_kwargs={"temperature": 0.75, "max_length": 2048})
chain = load_qa_chain(llm, chain_type="refine")
def truncate_answer(answer, question, tokenizer, max_total_tokens=1000):
special_tokens = 2
question_tokens = len(tokenizer.encode(question, return_tensors="pt").squeeze())
max_answer_tokens = max_total_tokens - question_tokens - special_tokens
answer_tokens = tokenizer.encode(answer, return_tensors="pt").squeeze()
truncated_answer = tokenizer.decode(answer_tokens[:max_answer_tokens])
return truncated_answer
def combined_length_exceeds_limit(question, answer, tokenizer, model_token_limit=1024):
special_tokens = 2
question_tokens = len(tokenizer.encode(question, return_tensors="pt").squeeze())
answer_tokens = len(tokenizer.encode(answer, return_tensors="pt").squeeze())
return question_tokens + answer_tokens > (model_token_limit - special_tokens)
def process_question(query):
answers = []
docs = db.similarity_search(query)
most_similar_doc = docs[0]
print(f"Most similar answer: \n{wrap_text_preserve_newlines(str(most_similar_doc.page_content))}\n")
query_chunks = split_into_chunks(query, tokenizer, max_tokens=500)
for query_chunk in query_chunks:
if combined_length_exceeds_limit(query_chunk, str(docs[0].page_content), tokenizer):
print("The combined length of the question and answer exceeds the model's token limit.")
else:
truncated_answer = truncate_answer(str(docs[0].page_content), query_chunk, tokenizer, max_total_tokens=500)
result = chain.run(input_documents=[TextDocument(content=truncated_answer, id=docs[0].metadata['id'])], question=query_chunk)
answers.append(result)
return answers