File size: 8,690 Bytes
51fe9d2
 
 
 
 
 
60017a4
51fe9d2
 
 
02556c2
51fe9d2
 
 
 
 
60017a4
59359cb
 
51fe9d2
02556c2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51fe9d2
 
 
 
 
 
02556c2
59359cb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51fe9d2
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
02556c2
 
 
 
 
 
 
 
 
 
 
 
 
51fe9d2
 
59359cb
51fe9d2
59359cb
51fe9d2
 
02556c2
59359cb
02556c2
59359cb
02556c2
59359cb
 
51fe9d2
59359cb
51fe9d2
 
 
a96d162
 
51fe9d2
 
cda0f94
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
51fe9d2
 
a96d162
51fe9d2
59359cb
 
 
 
 
02556c2
 
 
51fe9d2
02556c2
51fe9d2
60017a4
59359cb
 
02556c2
51fe9d2
 
 
 
02556c2
51fe9d2
 
 
59359cb
 
 
 
 
 
 
51fe9d2
60017a4
 
51fe9d2
59359cb
60017a4
cda0f94
 
59359cb
 
 
 
 
 
 
 
60017a4
51fe9d2
 
 
 
 
 
59359cb
 
51fe9d2
 
 
 
 
59359cb
51fe9d2
 
 
 
 
 
 
59359cb
51fe9d2
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores.faiss import FAISS
from langchain import OpenAI
from langchain.chains.qa_with_sources import load_qa_with_sources_chain
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.llms import OpenAI
from langchain.chat_models import ChatOpenAI
from langchain.docstore.document import Document
from langchain.vectorstores import FAISS, VectorStore
import docx2txt
from typing import List, Dict, Any, Union, Text, Tuple, Iterable
import re
from io import BytesIO
import streamlit as st
from .prompts import STUFF_PROMPT
from pypdf import PdfReader
from langchain.memory import ConversationBufferWindowMemory
import openai


class PDFFile:
    """A PDF file class for typing purposes."""
    @classmethod
    def is_pdf(file:Any) -> bool:
        return file.name.endswith(".pdf")
    
class DocxFile:
    """A Docx file class for typing purposes."""
    @classmethod
    def is_docx(file:Any) -> bool:
        return file.name.endswith(".docx")
    
class TxtFile:
    """A Txt file class for typing purposes."""
    @classmethod
    def is_txt(file:Any) -> bool:
        return file.name.endswith(".txt")
    
class CodeFile:
    """A scripting-file class for typing purposes."""
    @classmethod
    def is_code(file:Any) -> bool:
        return file.name.split(".")[1] in [".py", ".json", ".html", ".css", ".md"]


class HashDocument(Document):
    """A document that uses the page content as the hash."""
    def __hash__(self):
        content = self.page_content + "".join(self.metadata[k] for k in self.metadata.keys())
        return hash(content)


@st.cache_data
def check_openai_api_key(api_key:str)->bool:
    """This function checks the given OpenAI API key and returns True if it is valid, False otherwise.
    Checking is performed using"""
    if not (api_key.startswith('sk-') and len(api_key)==51):
        st.error("Invalid OpenAI API key! Please provide a valid key.")
        return False
    # setting the openai api key to the given value
    openai.api_key = api_key
    try:
        _ = openai.Completion.create(
            engine="davinci",
            prompt="This is a call test to test out the API Key.",
            max_tokens=5
        )
    except openai.error.AuthenticationError:
        return False
    return True

@st.cache_data
def parse_docx(file: BytesIO) -> str:
    text = docx2txt.process(file)
    # Remove multiple newlines
    text = re.sub(r"\n\s*\n", "\n\n", text)
    return text


@st.cache_data
def parse_pdf(file: BytesIO) -> List[str]:
    pdf = PdfReader(file)
    output = []
    for page in pdf.pages:
        text = page.extract_text()
        # Merge hyphenated words
        text = re.sub(r"(\w+)-\n(\w+)", r"\1\2", text)
        # Fix newlines in the middle of sentences
        text = re.sub(r"(?<!\n\s)\n(?!\s\n)", " ", text.strip())
        # Remove multiple newlines
        text = re.sub(r"\n\s*\n", "\n\n", text)

        output.append(text)
    return output


@st.cache_data
def parse_txt(file: BytesIO) -> str:
    text = file.read().decode("utf-8")
    # Remove multiple newlines
    text = re.sub(r"\n\s*\n", "\n\n", text)
    return text

@st.cache_data
def get_text_splitter(
    chunk_size:int=500, 
    chunk_overlap:int=50, 
    separators:Iterable[Text]= ["\n\n", "\n", ".", "!", "?", ",", " ", ""])->RecursiveCharacterTextSplitter:
    """Returns a text splitter instance with the given parameters. Cached for performance."""
    # text splitter to split the text into chunks
    text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=chunk_size,  # a limited chunk size ensures smaller chunks and more precise answers
            separators=separators,  # a list of separators to split the text on
            chunk_overlap=chunk_overlap,  # minimal overlap to capture sematic overlap across chunks
        )
    return text_splitter

@st.cache_data
def text_to_docs(pages: Union[Text, Tuple[Text]], **kwargs) -> List[HashDocument]:
    """
    Converts a string or frozenset of pages content to a list of HashDocuments (for efficient caching)
    with metadata.
    """
    # sanity check on the input provided
    if not isinstance(pages, (str, tuple)):
        raise ValueError("Text must be either a string or a list of strings. Got: {type(text)}")
    elif isinstance(pages, str):
        # Take a single string as one page - make it a tuple so that is hashable
        pages = (pages, )
    if isinstance(pages, tuple):
        # map each page into a document instance
        page_docs = [HashDocument(page_content=page) for page in pages]
        # Add page numbers as metadata
        for i, doc in enumerate(page_docs):
            doc.metadata["page"] = i + 1
            doc.metadata["file_name"] = kwargs.get("file_name", "")
        
        # Split pages into chunks
        doc_chunks = []
        for ntokens in [50,250,500,750]:
            # Get the text splitter
            text_splitter = get_text_splitter(chunk_size=ntokens, chunk_overlap=ntokens//10)
            for doc in page_docs:
                # this splits the page into chunks
                chunks = text_splitter.split_text(doc.page_content)
                for i, chunk in enumerate(chunks):
                    # Create a new document for each individual chunk
                    new_doc = HashDocument(
                        page_content=chunk, 
                        metadata={"file_name": doc.metadata["file_name"], "page": doc.metadata["page"], "chunk": i}
                    )
                    # Add sources to metadata for retrieval later on
                    new_doc.metadata["source"] = \
                        f"{new_doc.metadata['file_name']}/Page-{new_doc.metadata['page']}/Chunk-{new_doc.metadata['chunk']}/Chunksize-{ntokens}"
                    doc_chunks.append(new_doc)
        
        return doc_chunks

@st.cache_data
def embed_docs(file_name:Text, _docs: Tuple[Document]) -> VectorStore:
    """
    Embeds a list of Documents and returns a FAISS index. 
    Adds a dummy file_name variable to permit caching.
    """
    # Embed the chunks
    embeddings = OpenAIEmbeddings(openai_api_key=st.session_state.get("OPENAI_API_KEY"))
    index = FAISS.from_documents(list(_docs), embeddings)

    return index


# removing caching - consider to reintroduce it afterwise considering performance 
# @st.cache_data 
def search_docs(_index: VectorStore, query: str, k:int=5) -> List[Document]:
    """Searches a FAISS index for similar chunks to the query
    and returns a list of Documents."""

    # Search for similar chunks
    docs = _index.similarity_search(query, k=k)
    return docs


# removing caching - consider to reintroduce it afterwise considering performance 
# @st.cache_data 
def get_answer(
        _docs: List[Document],
        query: str,
        model: str="gpt-4",
        stream_answer:bool=True) -> Dict[str, Any]:
    """Gets an answer to a question from a list of Documents."""
    
    # Create the chain to be used in this specific setting
    chain = load_qa_with_sources_chain(
        ChatOpenAI(temperature=0, openai_api_key=st.session_state.get("OPENAI_API_KEY"), model=model, streaming=stream_answer), 
        chain_type="stuff",
        prompt=STUFF_PROMPT,
        verbose=True,
        # chain_type_kwargs={
        #     "verbose": True,
        #     "prompt": query,
        #     "memory": ConversationBufferWindowMemory(
        #         k=5,
        #         memory_key="history",
        #         input_key="question"),
        # }
        )
    # also returnig the text of the source used to form the answer
    answer = chain(
        {"input_documents": _docs, "question": query}
    )
    return answer

# removing caching - consider to reintroduce it afterwise considering performance 
# @st.cache_data 
def get_sources(answer: Dict[str, Any], docs: List[Document]) -> List[Document]:
    """Gets the source documents for an answer."""

    # Get sources for the answer
    source_keys = [s for s in answer["output_text"].split("SOURCES: ")[-1].split(", ")]
    # Retrieving the documents the actual sources refer to
    source_docs = []
    for doc in docs:
        if doc.metadata["source"] in source_keys:
            source_docs.append(doc)

    return source_docs

# this function could be removed - it is not used anymore
def wrap_text_in_html(text: str) -> str:
    """Wraps each text block separated by newlines in <p> tags"""
    if isinstance(text, list):
        # Add horizontal rules between pages
        text = "\n<hr/>\n".join(text)
    return "".join([f"<p>{line}</p>" for line in text.split("\n")])