Spaces:
Sleeping
Sleeping
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/media_stores.ipynb. | |
# %% auto 0 | |
__all__ = ['rawtext_to_doc_split', 'files_to_text', 'youtube_to_text', 'save_text', 'get_youtube_transcript', | |
'website_to_text_web', 'website_to_text_unstructured', 'get_document_segments', 'create_local_vector_store'] | |
# %% ../nbs/media_stores.ipynb 3 | |
# import libraries here | |
import os | |
import itertools | |
from langchain.embeddings import OpenAIEmbeddings | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.document_loaders.unstructured import UnstructuredFileLoader | |
from langchain.document_loaders.generic import GenericLoader | |
from langchain.document_loaders.parsers import OpenAIWhisperParser | |
from langchain.document_loaders.blob_loaders.youtube_audio import YoutubeAudioLoader | |
from langchain.document_loaders import WebBaseLoader, UnstructuredURLLoader | |
from langchain.docstore.document import Document | |
from langchain.vectorstores import Chroma | |
from langchain.chains import RetrievalQAWithSourcesChain | |
# %% ../nbs/media_stores.ipynb 8 | |
def rawtext_to_doc_split(text, chunk_size=1500, chunk_overlap=150): | |
# Quick type checking | |
if not isinstance(text, list): | |
text = [text] | |
# Create splitter | |
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
add_start_index = True) | |
#Split into docs segments | |
if isinstance(text[0], Document): | |
doc_segments = text_splitter.split_documents(text) | |
else: | |
doc_segments = text_splitter.split_documents(text_splitter.create_documents(text)) | |
# Make into one big list | |
doc_segments = list(itertools.chain(*doc_segments)) if isinstance(doc_segments[0], list) else doc_segments | |
return doc_segments | |
# %% ../nbs/media_stores.ipynb 16 | |
## A single File | |
def _file_to_text(single_file, chunk_size = 1000, chunk_overlap=150): | |
# Create loader and get segments | |
loader = UnstructuredFileLoader(single_file) | |
doc_segments = loader.load_and_split(RecursiveCharacterTextSplitter(chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
add_start_index=True)) | |
return doc_segments | |
## Multiple files | |
def files_to_text(files_list, chunk_size=1000, chunk_overlap=150): | |
# Quick type checking | |
if not isinstance(files_list, list): | |
files_list = [files_list] | |
# This is currently a fix because the UnstructuredFileLoader expects a list of files yet can't split them correctly yet | |
all_segments = [_file_to_text(single_file, chunk_size=chunk_size, chunk_overlap=chunk_overlap) for single_file in files_list] | |
all_segments = list(itertools.chain(*all_segments)) if isinstance(all_segments[0], list) else all_segments | |
return all_segments | |
# %% ../nbs/media_stores.ipynb 20 | |
def youtube_to_text(urls, save_dir = "content"): | |
# Transcribe the videos to text | |
# save_dir: directory to save audio files | |
if not isinstance(urls, list): | |
urls = [urls] | |
youtube_loader = GenericLoader(YoutubeAudioLoader(urls, save_dir), OpenAIWhisperParser()) | |
youtube_docs = youtube_loader.load() | |
return youtube_docs | |
# %% ../nbs/media_stores.ipynb 24 | |
def save_text(text, text_name = None): | |
if not text_name: | |
text_name = text[:20] | |
text_path = os.path.join("/content",text_name+".txt") | |
with open(text_path, "x") as f: | |
f.write(text) | |
# Return the location at which the transcript is saved | |
return text_path | |
# %% ../nbs/media_stores.ipynb 25 | |
def get_youtube_transcript(yt_url, save_transcript = False, temp_audio_dir = "sample_data"): | |
# Transcribe the videos to text and save to file in /content | |
# save_dir: directory to save audio files | |
youtube_docs = youtube_to_text(yt_url, save_dir = temp_audio_dir) | |
# Combine doc | |
combined_docs = [doc.page_content for doc in youtube_docs] | |
combined_text = " ".join(combined_docs) | |
# Save text to file | |
video_path = youtube_docs[0].metadata["source"] | |
youtube_name = os.path.splitext(os.path.basename(video_path))[0] | |
save_path = None | |
if save_transcript: | |
save_path = save_text(combined_text, youtube_name) | |
return youtube_docs, save_path | |
# %% ../nbs/media_stores.ipynb 27 | |
def website_to_text_web(url, chunk_size = 1500, chunk_overlap=100): | |
# Url can be a single string or list | |
website_loader = WebBaseLoader(url) | |
website_raw = website_loader.load() | |
website_data = rawtext_to_doc_split(website_raw, chunk_size = chunk_size, chunk_overlap=chunk_overlap) | |
# Combine doc | |
return website_data | |
# %% ../nbs/media_stores.ipynb 33 | |
def website_to_text_unstructured(web_urls, chunk_size = 1500, chunk_overlap=100): | |
# Make sure it's a list | |
if not isinstance(web_urls, list): | |
web_urls = [web_urls] | |
# Url can be a single string or list | |
website_loader = UnstructuredURLLoader(web_urls) | |
website_raw = website_loader.load() | |
website_data = rawtext_to_doc_split(website_raw, chunk_size = chunk_size, chunk_overlap=chunk_overlap) | |
# Return individual docs or list | |
return website_data | |
# %% ../nbs/media_stores.ipynb 45 | |
def get_document_segments(context_info, data_type, chunk_size = 1500, chunk_overlap=100): | |
load_fcn = None | |
addtnl_params = {'chunk_size': chunk_size, 'chunk_overlap': chunk_overlap} | |
# Define function use to do the loading | |
if data_type == 'text': | |
load_fcn = rawtext_to_doc_split | |
elif data_type == 'web_page': | |
load_fcn = website_to_text_unstructured | |
elif data_type == 'youtube_video': | |
load_fcn = youtube_to_text | |
else: | |
load_fcn = files_to_text | |
# Get the document segments | |
doc_segments = load_fcn(context_info, **addtnl_params) | |
return doc_segments | |
# %% ../nbs/media_stores.ipynb 47 | |
def create_local_vector_store(document_segments, **retriever_kwargs): | |
embeddings = OpenAIEmbeddings() | |
db = Chroma.from_documents(document_segments, embeddings) | |
retriever = db.as_retriever(**retriever_kwargs) | |
return db, retriever | |