Spaces:
Runtime error
Runtime error
import json | |
import os | |
from dotenv import load_dotenv | |
from langchain.embeddings.cache import CacheBackedEmbeddings | |
from langchain_openai import OpenAIEmbeddings | |
from langchain.storage import LocalFileStore | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
import pandas as pd | |
from pinecone import Pinecone, ServerlessSpec | |
import requests | |
import scrapetube | |
from uuid import uuid4 | |
from youtube_transcript_api import YouTubeTranscriptApi | |
load_dotenv() | |
BATCH_LIMIT = 100 | |
def get_youtube_data(video_id): | |
url = f"https://www.youtube.com/watch?v={video_id}" | |
try: | |
raw = YouTubeTranscriptApi.get_transcript(video_id) | |
except: | |
print(f"No transcript found for {url}") | |
return False | |
# Get metadata | |
response = requests.get( | |
f"https://noembed.com/embed?dataType=json&url={url}") | |
data = json.loads(response.content) | |
title = data["title"] | |
# ' is a reserved character | |
title = title.replace("'", "") | |
df = pd.DataFrame(raw) | |
# Generate the transcript string with timestamps | |
transcript = ' '.join( | |
f"{row['text']}<{row['start']}>" for _, row in df.iterrows()) | |
return transcript, title | |
def index_video(video_id, embedder, index): | |
try: | |
print(f"Getting transcript & text for video: {video_id}") | |
transcript, title = get_youtube_data(video_id) | |
except Exception as e: | |
print(f"""Error getting transcript for video {video_id}: {e}""") | |
return False | |
url = f"https://www.youtube.com/watch?v={video_id}" | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=1000, | |
chunk_overlap=100, | |
length_function=len, | |
separators=["\n\n", "\n", " ", ""] | |
) | |
texts = [] | |
metadatas = [] | |
metadata = { | |
'source_document': title, | |
'link': url | |
} | |
record_texts = text_splitter.split_text(transcript) | |
print(f"Split documents into {len(record_texts)} chunks") | |
record_metadatas = [{"chunk": j, "text": text, **metadata} | |
for j, text in enumerate(record_texts)] | |
print(f"Uploading {len(record_texts)} chunks to Pinecone...") | |
texts.extend(record_texts) | |
metadatas.extend(record_metadatas) | |
ids = [str(uuid4()) for _ in range(len(texts))] | |
embeds = embedder.embed_documents(texts) | |
try: | |
print("Upserting data to pinecone...") | |
index.upsert(vectors=zip(ids, embeds, metadatas)) | |
except Exception as e: | |
print(f"Error upserting data to Pinecone: {e}") | |
if len(texts) >= BATCH_LIMIT: | |
texts = [] | |
metadatas = [] | |
def index_channel(channel_id, embedder, index): | |
print("Indexing channel...") | |
videos = scrapetube.get_channel(channel_id) | |
for video in videos: | |
print(f"Ready to process {video['videoId']}") | |
index_video(video["videoId"], embedder, index) | |
def configure_vector_database(): | |
print("Configuring Pinecone...") | |
pc = Pinecone( | |
api_key=os.getenv("PINECONE_API_KEY") | |
) | |
if INDEX_NAME not in pc.list_indexes().names(): | |
pc.create_index( | |
index=os.getenv("INDEX_NAME"), | |
metric='cosine', | |
dimension=1536, | |
spec=ServerlessSpec( | |
cloud="aws", region="us-east-1" | |
) | |
) | |
index = pc.Index(os.getenv("INDEX_NAME")) | |
store = LocalFileStore("./cache/") | |
# default model is text-embedding-ada-002 | |
core_embeddings_model = OpenAIEmbeddings() | |
embedder = CacheBackedEmbeddings.from_bytes_store( | |
core_embeddings_model, | |
store, | |
namespace=core_embeddings_model.model | |
) | |
return embedder, index | |
embedder, index = configure_vector_database() | |
index_channel(os.getenv["CHANNEL_ID"], embedder, index) | |