File size: 6,574 Bytes
4e00df7 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 |
import os
from typing import (
Any,
Union,
)
import zipfile
import streamlit as st
from streamlit.runtime.uploaded_file_manager import (
UploadedFile,
UploadedFileRec,
UploadedFileManager,
)
from streamlit.runtime.scriptrunner import get_script_run_ctx
from supabase.client import Client
from langchain.vectorstores.supabase import SupabaseVectorStore
from components_keys import ComponentsKeys
from loaders.audio import process_audio
from loaders.txt import process_txt
from loaders.csv import process_csv
from loaders.markdown import process_markdown
from loaders.pdf import process_pdf
from loaders.html import (
create_html_file,
delete_tempfile,
get_html,
process_html,
)
from loaders.powerpoint import process_powerpoint
from loaders.docx import process_docx
from utils import compute_sha1_from_content
ctx = get_script_run_ctx()
manager = UploadedFileManager()
file_processors = {
".txt": process_txt,
".csv": process_csv,
".md": process_markdown,
".markdown": process_markdown,
".m4a": process_audio,
".mp3": process_audio,
".webm": process_audio,
".mp4": process_audio,
".mpga": process_audio,
".wav": process_audio,
".mpeg": process_audio,
".pdf": process_pdf,
".html": process_html,
".pptx": process_powerpoint,
".docx": process_docx
}
def file_uploader(supabase, vector_store):
# Omit zip file support if the `st.secrets.self_hosted` != "true" because
# a zip file can consist of multiple files so the limit on 1 file uploaded
# at a time in the demo can be circumvented.
accepted_file_extensions = list(file_processors.keys())
accept_multiple_files = st.secrets.self_hosted == "true"
if accept_multiple_files:
accepted_file_extensions += [".zip"]
files = st.file_uploader(
"**Upload a file**",
accept_multiple_files=accept_multiple_files,
type=accepted_file_extensions,
key=ComponentsKeys.FILE_UPLOADER,
)
if st.secrets.self_hosted == "false":
st.markdown("**In demo mode, the max file size is 1MB**")
if st.button("Add to Database"):
# Single file upload
if isinstance(files, UploadedFile):
filter_file(files, supabase, vector_store)
# Multiple files upload
elif isinstance(files, list):
for file in files:
filter_file(file, supabase, vector_store)
def file_already_exists(supabase, file):
file_sha1 = compute_sha1_from_content(file.getvalue())
response = supabase.table("documents").select("id").eq("metadata->>file_sha1", file_sha1).execute()
return len(response.data) > 0
def file_to_uploaded_file(file: Any) -> Union[None, UploadedFile]:
"""Convert a file to a streamlit `UploadedFile` object.
This allows us to unzip files and treat them the same way
streamlit treats files uploaded through the file uploader.
Parameters
---------
file : Any
The file. Can be any file supported by this app.
Returns
-------
Union[None, UploadedFile]
The file converted to a streamlit `UploadedFile` object.
Returns `None` if the script context cannot be grabbed.
"""
if ctx is None:
print("script context not found, skipping uploading file:", file.name)
return
file_extension = os.path.splitext(file.name)[-1]
file_name = file.name
file_data = file.read()
# The file manager will automatically assign an ID so pass `None`
# Reference: https://github.com/streamlit/streamlit/blob/9a6ce804b7977bdc1f18906d1672c45f9a9b3398/lib/streamlit/runtime/uploaded_file_manager.py#LL98C6-L98C6
uploaded_file_rec = UploadedFileRec(None, file_name, file_extension, file_data)
uploaded_file_rec = manager.add_file(
ctx.session_id,
ComponentsKeys.FILE_UPLOADER,
uploaded_file_rec,
)
return UploadedFile(uploaded_file_rec)
def filter_zip_file(
file: UploadedFile,
supabase: Client,
vector_store: SupabaseVectorStore,
) -> None:
"""Unzip the zip file then filter each unzipped file.
Parameters
----------
file : UploadedFile
The uploaded file from the file uploader.
supabase : Client
The supabase client.
vector_store : SupabaseVectorStore
The vector store in the database.
"""
with zipfile.ZipFile(file, "r") as z:
unzipped_files = z.namelist()
for unzipped_file in unzipped_files:
with z.open(unzipped_file, "r") as f:
filter_file(f, supabase, vector_store)
def filter_file(file, supabase, vector_store):
# Streamlit file uploads are of type `UploadedFile` which has the
# necessary methods and attributes for this app to work.
if not isinstance(file, UploadedFile):
file = file_to_uploaded_file(file)
file_extension = os.path.splitext(file.name)[-1]
if file_extension == ".zip":
filter_zip_file(file, supabase, vector_store)
return True
if file_already_exists(supabase, file):
st.write(f"π {file.name} is already in the database.")
return False
if file.size < 1:
st.write(f"π¨ {file.name} is empty.")
return False
if file_extension in file_processors:
if st.secrets.self_hosted == "false":
file_processors[file_extension](vector_store, file, stats_db=supabase)
else:
file_processors[file_extension](vector_store, file, stats_db=None)
st.write(f"β
{file.name} ")
return True
st.write(f"β {file.name} is not a valid file type.")
return False
def url_uploader(supabase, vector_store):
url = st.text_area("**Add an url**",placeholder="https://www.quivr.app")
button = st.button("Add the URL to the database")
if button:
if not st.session_state["overused"]:
html = get_html(url)
if html:
st.write(f"Getting content ... {url} ")
try:
file, temp_file_path = create_html_file(url, html)
except UnicodeEncodeError as e:
st.write(f"β Error encoding character: {e}")
file, temp_file_path = create_html_file(url, html)
ret = filter_file(file, supabase, vector_store)
delete_tempfile(temp_file_path, url, ret)
else:
st.write(f"β Failed to access to {url} .")
else:
st.write("You have reached your daily limit. Please come back later or self host the solution.") |