pt-assistant-demo / youtube_to_docstore.py
eklyman's picture
revert to python3.9
089d38a
raw
history blame contribute delete
No virus
3.75 kB
import json
import os
from dotenv import load_dotenv
from langchain.embeddings.cache import CacheBackedEmbeddings
from langchain_openai import OpenAIEmbeddings
from langchain.storage import LocalFileStore
from langchain.text_splitter import RecursiveCharacterTextSplitter
import pandas as pd
from pinecone import Pinecone, ServerlessSpec
import requests
import scrapetube
from uuid import uuid4
from youtube_transcript_api import YouTubeTranscriptApi
load_dotenv()
BATCH_LIMIT = 100
def get_youtube_data(video_id):
url = f"https://www.youtube.com/watch?v={video_id}"
try:
raw = YouTubeTranscriptApi.get_transcript(video_id)
except:
print(f"No transcript found for {url}")
return False
# Get metadata
response = requests.get(
f"https://noembed.com/embed?dataType=json&url={url}")
data = json.loads(response.content)
title = data["title"]
# ' is a reserved character
title = title.replace("'", "")
df = pd.DataFrame(raw)
# Generate the transcript string with timestamps
transcript = ' '.join(
f"{row['text']}<{row['start']}>" for _, row in df.iterrows())
return transcript, title
def index_video(video_id, embedder, index):
try:
print(f"Getting transcript & text for video: {video_id}")
transcript, title = get_youtube_data(video_id)
except Exception as e:
print(f"""Error getting transcript for video {video_id}: {e}""")
return False
url = f"https://www.youtube.com/watch?v={video_id}"
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000,
chunk_overlap=100,
length_function=len,
separators=["\n\n", "\n", " ", ""]
)
texts = []
metadatas = []
metadata = {
'source_document': title,
'link': url
}
record_texts = text_splitter.split_text(transcript)
print(f"Split documents into {len(record_texts)} chunks")
record_metadatas = [{"chunk": j, "text": text, **metadata}
for j, text in enumerate(record_texts)]
print(f"Uploading {len(record_texts)} chunks to Pinecone...")
texts.extend(record_texts)
metadatas.extend(record_metadatas)
ids = [str(uuid4()) for _ in range(len(texts))]
embeds = embedder.embed_documents(texts)
try:
print("Upserting data to pinecone...")
index.upsert(vectors=zip(ids, embeds, metadatas))
except Exception as e:
print(f"Error upserting data to Pinecone: {e}")
if len(texts) >= BATCH_LIMIT:
texts = []
metadatas = []
def index_channel(channel_id, embedder, index):
print("Indexing channel...")
videos = scrapetube.get_channel(channel_id)
for video in videos:
print(f"Ready to process {video['videoId']}")
index_video(video["videoId"], embedder, index)
def configure_vector_database():
print("Configuring Pinecone...")
pc = Pinecone(
api_key=os.getenv("PINECONE_API_KEY")
)
if INDEX_NAME not in pc.list_indexes().names():
pc.create_index(
index=os.getenv("INDEX_NAME"),
metric='cosine',
dimension=1536,
spec=ServerlessSpec(
cloud="aws", region="us-east-1"
)
)
index = pc.Index(os.getenv("INDEX_NAME"))
store = LocalFileStore("./cache/")
# default model is text-embedding-ada-002
core_embeddings_model = OpenAIEmbeddings()
embedder = CacheBackedEmbeddings.from_bytes_store(
core_embeddings_model,
store,
namespace=core_embeddings_model.model
)
return embedder, index
embedder, index = configure_vector_database()
index_channel(os.getenv["CHANNEL_ID"], embedder, index)