File size: 13,433 Bytes
c2b923e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
"""

testing my own vectors



list comprehension whenever possible

main function

if name == main

reusable functions that do just one specific task

type checking

def my_function(in_one: str, in_two: int) -> None:

pip install mypy for static typechecking.



O Gebäudebetrieb

Reinigung





FM Prozesse nicht für klassifizierung

Phase auch nicht. IMMER 53!!



VISION: AUTOMATISCHE BENENNUNG BEI ECODOMUS UPLOAD

Automatische metadatenzuodrdnung









"""
import json

import ingest
import my_1_writer
import my_2_sim_search
import my_vectors
import setup_db
import my_new_openai
import time
import streamlit as st
import os
from PIL import Image
import json
from typing import Any, Dict


def read_json_file(file_path: str) -> Dict[str, Any]:
    """

    Diese Funktion liest den Inhalt einer JSON-Datei und gibt ihn als Wörterbuch zurück.



    Argumente:

    file_path (str): Der Dateipfad zur JSON-Datei.



    Rückgabewert:

    Dict[str, Any]: Der Inhalt der JSON-Datei als DICT

    ANY ist oft ein VECTOR = list[float]

    """
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = json.load(file)
        return content
    except Exception as e:
        return {"error": str(e)}


#test this:
def extract_tables_from_page_advanced(page):
    """Extrahiert einfache Tabellenstrukturen basierend auf Textblöcken einer Seite."""
    text_blocks = page.get_text("blocks")
    text_blocks = sorted(text_blocks, key=lambda block: (block[1], block[0]))  # Nach Y, dann X sortieren

    # Erstellen eines Histogramms der X-Startpunkte, um Spalten zu identifizieren
    column_threshold = 10  # Mindestabstand zwischen verschiedenen Spalten
    columns = {}
    for block in text_blocks:
        x_start = block[0]
        found_column = False
        for col in columns.keys():
            if abs(x_start - col) < column_threshold:
                columns[col].append(block)
                found_column = True
                break
        if not found_column:
            columns[x_start] = [block]

    # Tabellenzeilen basierend auf den identifizierten Spalten extrahieren
    tables = []
    for col, blocks in columns.items():
        table = []
        for block in sorted(blocks, key=lambda block: block[1]):  # Nach Y sortieren
            table.append(block[4].strip())  # Text des Blocks hinzufügen
        tables.append(table)

    return tables



def merge_indices(index1, index2):
    """

    Merge two indices into a new index, assuming both are of the same type and dimensionality.

    """
    pass


def handle_userinput(user_question):
    pass


def save_uploaded_file(uploaded_file):
    try:
        # Create a static folder if it doesn't exist
        if not os.path.exists('static'):
            os.makedirs('static')

        # Write the uploaded file to a new file in the static directory
        with open(os.path.join('static', uploaded_file.name), "wb") as f:
            f.write(uploaded_file.getbuffer())
        return True
    except Exception as e:
        print(e)
        return False


def main():
    st.set_page_config(page_title="Anna Seiler Haus KI-Assistent", page_icon=":hospital:")
    if True:
        if "conversation" not in sst:
            sst.conversation = None
        if "chat_history" not in sst:
            sst.chat_history = None
        if "page" not in sst:
            sst.page = "home"
        if "openai" not in sst:
            sst.openai = True
        if "login" not in sst:
            sst.login = False
        if 'submitted_user_query' not in sst:
            sst.submitted_user_query = ''
        if 'submitted_user_safe' not in sst:
            sst.submitted_user_safe = ''
        if 'submitted_user_load' not in sst:
            sst.submitted_user_load = ''
        if 'widget_user_load' not in sst:
            sst.widget_user_load = 'U3_alle'  # Init the vectorstore
        if 'vectorstore' not in sst:
            sst.vectorstore = None

    def submit_user_query():
        sst.submitted_user_query = sst.widget_user_query
        sst.widget_user_query = ''

    def submit_user_safe():
        sst.submitted_user_safe = sst.widget_user_safe
        sst.widget_user_safe = ''
        if sst.vectorstore is not None:
            my_vectors.save_local(sst.vectorstore, path=sst.submitted_user_safe)
            st.sidebar.success("saved")
        else:
            st.sidebar.warning("No embeddings to save. Please process documents first.")

    def submit_user_load():
        sst.submitted_user_load = sst.widget_user_load
        sst.widget_user_load = ''
        if os.path.exists(sst.submitted_user_load):
            new_db = my_vectors.load_local(f"{sst.submitted_user_load}/faiss_index.index")
            if sst.vectorstore is not None:
                if new_db is not None:  # Check if this is working
                    st.sidebar.success("Vectors loaded")
            else:
                if new_db is not None:  # Check if this is working
                    sst.vectorstore = new_db
                    st.sidebar.success("Vectors loaded")
        else:
            st.sidebar.warning("Couldn't load/find embeddings")

    st.header("Anna Seiler Haus KI-Assistent ASH :hospital:")
    if st.toggle("show README"):

        st.subheader("Funktion: ")
        st.write("dieses proof-of-concept von Elia Wäfler demonstriert das Potential von RAG (Retrival Augmented Generation) für BIM2FM Dokumentenablagen am Beispiel Dokumente U3 ASH (Anna Seiler Haus, Inselspital Bern). chatte mit den Dokumenten, oder lade selber ein oder mehrere PDF-Dokumente hoch, um RAG auszuprobieren. die vektoren werden lokal oder im st.session_state gespeichert. Feedback und Bugs gerne an elia.waefler@insel.ch")
        st.write("Vielen Dank.")
        st.write("")

        st.subheader("Licence and credits")
        st.write("THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.")
        st.write("special thanks to OpenAI, STREAMLIT, HUGGINGFACE, LANGCHAIN and alejandro-ao")
        l, r = st.columns(2)
        with l:
            st.subheader("Limitationen: ")
            st.write("bisher nur Text aus PDFs")
            st.write("macht Fehler, kann falsche Informationen geben")
            st.write("prompts werden bisher nicht geprüft")
            st.write("")
        with r:
            st.subheader("geplante Erweiterungen:")
            st.write("Tabellen, Bilder werden auch vektorisiert, um die retrival qualität zu verbessern")
            st.write("on premise anwendung mit mistral 7b oder vergleichbar")
            st.write("Ecodomus API einbinden, um alle Dokumente einzubinden.")
            st.write("")

    if sst.login:
        if st.toggle("RAG / classifier"):
            #user_question = st.text_input("Ask a question about your documents:", key="user_query", on_change=handle_query)
            st.text_input('Ask a question about your documents:', key='widget_user_query', on_change=submit_user_query)
            #sst.openai = st.toggle(label="use openai?")
            if sst.submitted_user_query:
                if sst.vectorstore is not None:
                    handle_userinput(sst.submitted_user_query)
                    sst.submitted_user_query = False
                else:
                    st.warning("no vectorstore loaded.")

            with st.sidebar:
                st.subheader("Your documents")
                pdf_docs = st.file_uploader("Upload your PDFs here and click on 'Process'", accept_multiple_files=True)
                if st.button("Process"):
                    with st.spinner("Processing"):
                        vec = ingest.get_text_chunks(ingest.get_pdf_text(pdf_docs))
                        st.warning("only text")
                        sst.vectorstore = vec
                        sst.conversation = vec
                    st.success("embedding complete")
                st.text_input('Safe Embeddings to: (copy path of folder)', key='widget_user_safe',
                              on_change=submit_user_safe)
                st.text_input('Load Embeddings from: (copy path of folder)', key='widget_user_load',
                              on_change=submit_user_load)
                if st.toggle("reset vectorstore?"):
                    if st.button("Yes, reset"):
                        sst.vectorstore = None
                        st.warning("vectorstore reset complete")
                    else:
                        st.warning("unsaved embeddings will be lost.")
        else:
            #vec_store = setup_db.load_vectorstore_from_excel("data/KBOB_Klassifizierung.xlsx")
            #my_1_writer.safe_my_dict_as_json("data/KBOB_klassen_codes.json", vec_store)
            vec_store = read_json_file("data/KBOB_klassen_codes.json")

            sst.page = "home"
            file = st.file_uploader("upload file", accept_multiple_files=False)
            if st.button("classify me!"):
                with st.spinner("Classifying..."):
                    query_vecs = []
                    if file.type == "application/pdf":
                        one, two, three, four, five = st.columns(5)
                        text = ingest.get_pdf_text(file)
                        with one:
                            st.success("text")
                        # ONE FILE ONLY OR MULTIPLE AT THE SAME TIME?
                        images = ingest.get_pdf_images(file.getvalue())
                        if type(images) != list:
                            images = [images]
                        for img in images:
                            text += my_new_openai.img_to_text(img_base64=my_new_openai.image_bytes_to_base64(img))
                        with two:
                            st.success("images")

                        tabs = ingest.get_pdf_tables(file.getvalue())

                        if type(tabs) != list:
                            tabs = [tabs]
                        for tab in tabs:
                            text += my_new_openai.table_to_text(table=tab)
                        with three:
                            st.success("tabs")

                        # ONE VECTOR PER PDF OR MULTIPLE (CHUNKS IMGS ...) IS THE QUESTION
                        full_search = my_new_openai.vectorize_data(text)
                        detail_search = [my_new_openai.vectorize_data(_) for _ in ingest.get_text_chunks(text)]
                        with four:
                             st.success("embedded document")
                        st.write(len(list(vec_store.keys())))
                        with one:
                            sorted_vec_table = my_2_sim_search.sim_search_fly(
                                vec_table=vec_store, term=full_search)
                            st.write(f"len of list of categories {len(list(sorted_vec_table.keys()))}")
                            st.write(f"the most fitting category is {next(iter(sorted_vec_table))}")
                        with two:
                            sorted_vecs_two = my_2_sim_search.sim_search_fly(
                                vec_table=read_json_file("vecs/Fachbereiche_vecs.json"), term=full_search)
                            st.write(f"len of list of categories {len(list(sorted_vecs_two.keys()))}")
                            st.write(f"the most fitting Fachbereich is {next(iter(sorted_vecs_two))}")
                        with three:
                            sorted_vecs_three = my_2_sim_search.sim_search_fly(
                                vec_table=read_json_file("vecs/SIA-PHASEN 1-5 OUTPUT_vecs.json"), term=full_search)
                            st.write(f"len of list of categories {len(list(sorted_vecs_three.keys()))}")
                            st.write(f"the most fitting SIA-Phase is {next(iter(sorted_vecs_three))}")
                        for vec in detail_search:
                            pass
                        with four:
                            st.success("classification complete")
                    else:
                        st.error()
    else:
        user_pw = st.text_input("ASK_ASH_PASSWORD: ", type="password")
        if st.button("check"):
            time.sleep(0.5)
            if user_pw == ASK_ASH_PASSWORD:
                sst.login = True
                if "first_load" not in sst:
                    submit_user_load()
                    sst.first_load = True
                    st.rerun()



if __name__ == '__main__':
    if True:
        OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
        OPENAI_ORG_ID = os.environ["OPENAI_ORG_ID"]
        HUGGINGFACEHUB_API_TOKEN = os.environ["HUGGINGFACEHUB_API_TOKEN"]
        sst = st.session_state
        ASK_ASH_PASSWORD = os.environ["ASK_ASH_PASSWORD"]
    main()