Spaces:
Runtime error
Runtime error
File size: 8,690 Bytes
51fe9d2 60017a4 51fe9d2 02556c2 51fe9d2 60017a4 59359cb 51fe9d2 02556c2 51fe9d2 02556c2 59359cb 51fe9d2 02556c2 51fe9d2 59359cb 51fe9d2 59359cb 51fe9d2 02556c2 59359cb 02556c2 59359cb 02556c2 59359cb 51fe9d2 59359cb 51fe9d2 a96d162 51fe9d2 cda0f94 51fe9d2 a96d162 51fe9d2 59359cb 02556c2 51fe9d2 02556c2 51fe9d2 60017a4 59359cb 02556c2 51fe9d2 02556c2 51fe9d2 59359cb 51fe9d2 60017a4 51fe9d2 59359cb 60017a4 cda0f94 59359cb 60017a4 51fe9d2 59359cb 51fe9d2 59359cb 51fe9d2 59359cb 51fe9d2 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 |
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from langchain import OpenAI
from langchain.chains.qa_with_sources import load_qa_with_sources_chain
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.docstore.document import Document
from langchain.vectorstores import FAISS, VectorStore
import docx2txt
from typing import List, Dict, Any, Union, Text, Tuple, Iterable
import re
from io import BytesIO
import streamlit as st
from .prompts import STUFF_PROMPT
from pypdf import PdfReader
from langchain.memory import ConversationBufferWindowMemory
import openai
class PDFFile:
"""A PDF file class for typing purposes."""
@classmethod
def is_pdf(file:Any) -> bool:
return file.name.endswith(".pdf")
class DocxFile:
"""A Docx file class for typing purposes."""
@classmethod
def is_docx(file:Any) -> bool:
return file.name.endswith(".docx")
class TxtFile:
"""A Txt file class for typing purposes."""
@classmethod
def is_txt(file:Any) -> bool:
return file.name.endswith(".txt")
class CodeFile:
"""A scripting-file class for typing purposes."""
@classmethod
def is_code(file:Any) -> bool:
return file.name.split(".")[1] in [".py", ".json", ".html", ".css", ".md"]
class HashDocument(Document):
"""A document that uses the page content as the hash."""
def __hash__(self):
content = self.page_content + "".join(self.metadata[k] for k in self.metadata.keys())
return hash(content)
@st.cache_data
def check_openai_api_key(api_key:str)->bool:
"""This function checks the given OpenAI API key and returns True if it is valid, False otherwise.
Checking is performed using"""
if not (api_key.startswith('sk-') and len(api_key)==51):
st.error("Invalid OpenAI API key! Please provide a valid key.")
return False
# setting the openai api key to the given value
openai.api_key = api_key
try:
_ = openai.Completion.create(
engine="davinci",
prompt="This is a call test to test out the API Key.",
max_tokens=5
)
except openai.error.AuthenticationError:
return False
return True
@st.cache_data
def parse_docx(file: BytesIO) -> str:
text = docx2txt.process(file)
# Remove multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
return text
@st.cache_data
def parse_pdf(file: BytesIO) -> List[str]:
pdf = PdfReader(file)
output = []
for page in pdf.pages:
text = page.extract_text()
# Merge hyphenated words
text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text)
# Fix newlines in the middle of sentences
text = re.sub(r"(?<!\n\s)\n(?!\s\n)", " ", text.strip())
# Remove multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
output.append(text)
return output
@st.cache_data
def parse_txt(file: BytesIO) -> str:
text = file.read().decode("utf-8")
# Remove multiple newlines
text = re.sub(r"\n\s*\n", "\n\n", text)
return text
@st.cache_data
def get_text_splitter(
chunk_size:int=500,
chunk_overlap:int=50,
separators:Iterable[Text]= ["\n\n", "\n", ".", "!", "?", ",", " ", ""])->RecursiveCharacterTextSplitter:
"""Returns a text splitter instance with the given parameters. Cached for performance."""
# text splitter to split the text into chunks
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size, # a limited chunk size ensures smaller chunks and more precise answers
separators=separators, # a list of separators to split the text on
chunk_overlap=chunk_overlap, # minimal overlap to capture sematic overlap across chunks
)
return text_splitter
@st.cache_data
def text_to_docs(pages: Union[Text, Tuple[Text]], **kwargs) -> List[HashDocument]:
"""
Converts a string or frozenset of pages content to a list of HashDocuments (for efficient caching)
with metadata.
"""
# sanity check on the input provided
if not isinstance(pages, (str, tuple)):
raise ValueError("Text must be either a string or a list of strings. Got: {type(text)}")
elif isinstance(pages, str):
# Take a single string as one page - make it a tuple so that is hashable
pages = (pages, )
if isinstance(pages, tuple):
# map each page into a document instance
page_docs = [HashDocument(page_content=page) for page in pages]
# Add page numbers as metadata
for i, doc in enumerate(page_docs):
doc.metadata["page"] = i + 1
doc.metadata["file_name"] = kwargs.get("file_name", "")
# Split pages into chunks
doc_chunks = []
for ntokens in [50,250,500,750]:
# Get the text splitter
text_splitter = get_text_splitter(chunk_size=ntokens, chunk_overlap=ntokens//10)
for doc in page_docs:
# this splits the page into chunks
chunks = text_splitter.split_text(doc.page_content)
for i, chunk in enumerate(chunks):
# Create a new document for each individual chunk
new_doc = HashDocument(
page_content=chunk,
metadata={"file_name": doc.metadata["file_name"], "page": doc.metadata["page"], "chunk": i}
)
# Add sources to metadata for retrieval later on
new_doc.metadata["source"] = \
f"{new_doc.metadata['file_name']}/Page-{new_doc.metadata['page']}/Chunk-{new_doc.metadata['chunk']}/Chunksize-{ntokens}"
doc_chunks.append(new_doc)
return doc_chunks
@st.cache_data
def embed_docs(file_name:Text, _docs: Tuple[Document]) -> VectorStore:
"""
Embeds a list of Documents and returns a FAISS index.
Adds a dummy file_name variable to permit caching.
"""
# Embed the chunks
embeddings = OpenAIEmbeddings(openai_api_key=st.session_state.get("OPENAI_API_KEY"))
index = FAISS.from_documents(list(_docs), embeddings)
return index
# removing caching - consider to reintroduce it afterwise considering performance
# @st.cache_data
def search_docs(_index: VectorStore, query: str, k:int=5) -> List[Document]:
"""Searches a FAISS index for similar chunks to the query
and returns a list of Documents."""
# Search for similar chunks
docs = _index.similarity_search(query, k=k)
return docs
# removing caching - consider to reintroduce it afterwise considering performance
# @st.cache_data
def get_answer(
_docs: List[Document],
query: str,
model: str="gpt-4",
stream_answer:bool=True) -> Dict[str, Any]:
"""Gets an answer to a question from a list of Documents."""
# Create the chain to be used in this specific setting
chain = load_qa_with_sources_chain(
ChatOpenAI(temperature=0, openai_api_key=st.session_state.get("OPENAI_API_KEY"), model=model, streaming=stream_answer),
chain_type="stuff",
prompt=STUFF_PROMPT,
verbose=True,
# chain_type_kwargs={
# "verbose": True,
# "prompt": query,
# "memory": ConversationBufferWindowMemory(
# k=5,
# memory_key="history",
# input_key="question"),
# }
)
# also returnig the text of the source used to form the answer
answer = chain(
{"input_documents": _docs, "question": query}
)
return answer
# removing caching - consider to reintroduce it afterwise considering performance
# @st.cache_data
def get_sources(answer: Dict[str, Any], docs: List[Document]) -> List[Document]:
"""Gets the source documents for an answer."""
# Get sources for the answer
source_keys = [s for s in answer["output_text"].split("SOURCES: ")[-1].split(", ")]
# Retrieving the documents the actual sources refer to
source_docs = []
for doc in docs:
if doc.metadata["source"] in source_keys:
source_docs.append(doc)
return source_docs
# this function could be removed - it is not used anymore
def wrap_text_in_html(text: str) -> str:
"""Wraps each text block separated by newlines in <p> tags"""
if isinstance(text, list):
# Add horizontal rules between pages
text = "\n<hr/>\n".join(text)
return "".join([f"<p>{line}</p>" for line in text.split("\n")]) |