from google.cloud import storage #storage_client = storage.Client() storage_client = storage.Client.create_anonymous_client() bucket_name = "docs-axio-clara" from langchain_community.vectorstores import Annoy from langchain_community.document_loaders import TextLoader from langchain_text_splitters import CharacterTextSplitter from climateqa.engine.embeddings import get_embeddings_function embeddings_function = get_embeddings_function() import os import pdfplumber def get_PDF_Names_from_GCP(): listName = [] # Récupération des fichier depuis GCP storage blobs = storage_client.list_blobs(bucket_name, prefix='sources/') for blob in blobs: listName.append(blob.name) return listName def get_PDF_from_GCP(folder_path, pdf_folder="./PDF"): # Récupération des fichier depuis GCP storage blobs = storage_client.list_blobs(bucket_name, prefix='sources/') for blob in blobs: print( "\n"+blob.name+":") print( " <- Téléchargement Depuis GCP") blob.download_to_filename(pdf_folder+"/"+blob.name) # Extraction des textes dpuis les fichiers PDF print(" >>> Extraction PDF") for pdf_file in os.listdir(pdf_folder): if pdf_file.startswith("."): continue print(" > "+pdf_folder+"/"+pdf_file) pdf_total_pages = 0 with pdfplumber.open(pdf_folder+"/"+pdf_file) as pdf: pdf_total_pages = len(pdf.pages) # Fuite mémoire pour les gros fichiers # Reouvrir le fichier à chaque N page semble rélgler le problème N_page = 300 page_number = 0 while page_number < pdf_total_pages: print(" -- ouverture du fichier pour "+str(N_page)+ " pages --" ) with pdfplumber.open(pdf_folder+"/"+pdf_file) as pdf: npage = 0 while (npage < N_page and page_number < pdf_total_pages) : print(" >>> "+str(page_number+1)) f = open(folder_path+"/"+pdf_file+"..:page:.."+str(page_number+1), "w") for char_pdf in pdf.pages[page_number].chars: f.write(char_pdf["text"]) f.close() npage = npage + 1 page_number = page_number + 1 print(" X removing: " + blob.name ) os.remove(pdf_folder+"/"+blob.name) def build_vectores_stores(folder_path, pdf_folder="./PDF", vectors_path = "./vectors"): if os.path.isfile(vectors_path+"/index.annoy"): return Annoy.load_local(vectors_path, embeddings_function,allow_dangerous_deserialization=True) try: os.mkdir(vectors_path) except: pass try: # Récupération des fichier depuis GCP storage blobs = storage_client.list_blobs(bucket_name, prefix='testvectors/') for blob in blobs: print( "\n"+blob.name.split("/")[-1]+":") print( " <- Téléchargement Depuis GCP") blob.download_to_filename(vectors_path+"/"+blob.name.split("/")[-1]) except: pass # TODO A FUNCTION FOR THAT TO AVOID CODE DUPLICATION if os.path.isfile(vectors_path+"/index.annoy"): return Annoy.load_local(vectors_path, embeddings_function,allow_dangerous_deserialization=True) print("MISSING VECTORS") exit(0) # get_PDF_from_GCP(folder_path, pdf_folder) # print(" Vectorisation ...") # docs = [] # vector_store_from_docs = () # Créer un nouvel objet Annoy ou utiliser celui déjà initialisé selon votre code existant # for filename in os.listdir(folder_path): # if filename.startswith("."): # continue # file_path = os.path.join(folder_path, filename) # if os.path.isfile(file_path): # loader = TextLoader(file_path) # documents = loader.load() # # for doc in documents: # if (doc.metadata): # doc.metadata["ax_page"] = doc.metadata['source'].split("..:page:..")[-1] # doc.metadata["ax_name"] = doc.metadata['source'].split("..:page:..")[0].split("/")[-1] # doc.metadata["ax_url"] = "https://storage.googleapis.com/docs-axio-clara/sources/"+doc.metadata["ax_name"] # # text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=0) # docs += text_splitter.split_documents(documents) # vector_store_from_docs = Annoy.from_documents(docs, embeddings_function) # vector_store_from_docs.save_local(vectors_path) # return vector_store_from_docs # Pinecone # More info at https://docs.pinecone.io/docs/langchain # And https://python.langchain.com/docs/integrations/vectorstores/pinecone #import os #from pinecone import Pinecone #from langchain_community.vectorstores import Pinecone as PineconeVectorstore # LOAD ENVIRONMENT VARIABLES #try: # from dotenv import load_dotenv # load_dotenv() #except: # pass #def get_pinecone_vectorstore(embeddings,text_key = "content"): # # initialize pinecone # pinecone.init( # api_key=os.getenv("PINECONE_API_KEY"), # find at app.pinecone.io # environment=os.getenv("PINECONE_API_ENVIRONMENT"), # next to api key in console # ) # index_name = os.getenv("PINECONE_API_INDEX") # vectorstore = Pinecone.from_existing_index(index_name, embeddings,text_key = text_key) # return vectorstore # pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) # index = pc.Index(os.getenv("PINECONE_API_INDEX")) # vectorstore = PineconeVectorstore( # index, embeddings, text_key, # ) # return vectorstore # def get_pinecone_retriever(vectorstore,k = 10,namespace = "vectors",sources = ["IPBES","IPCC"]): # assert isinstance(sources,list) # # Check if all elements in the list are either IPCC or IPBES # filter = { # "source": { "$in":sources}, # } # retriever = vectorstore.as_retriever(search_kwargs={ # "k": k, # "namespace":"vectors", # "filter":filter # }) # return retriever