import json import os from dotenv import load_dotenv from langchain.embeddings.cache import CacheBackedEmbeddings from langchain_openai import OpenAIEmbeddings from langchain.storage import LocalFileStore from langchain.text_splitter import RecursiveCharacterTextSplitter import pandas as pd from pinecone import Pinecone, ServerlessSpec import requests import scrapetube from uuid import uuid4 from youtube_transcript_api import YouTubeTranscriptApi load_dotenv() BATCH_LIMIT = 100 def get_youtube_data(video_id): url = f"https://www.youtube.com/watch?v={video_id}" try: raw = YouTubeTranscriptApi.get_transcript(video_id) except: print(f"No transcript found for {url}") return False # Get metadata response = requests.get( f"https://noembed.com/embed?dataType=json&url={url}") data = json.loads(response.content) title = data["title"] # ' is a reserved character title = title.replace("'", "") df = pd.DataFrame(raw) # Generate the transcript string with timestamps transcript = ' '.join( f"{row['text']}<{row['start']}>" for _, row in df.iterrows()) return transcript, title def index_video(video_id, embedder, index): try: print(f"Getting transcript & text for video: {video_id}") transcript, title = get_youtube_data(video_id) except Exception as e: print(f"""Error getting transcript for video {video_id}: {e}""") return False url = f"https://www.youtube.com/watch?v={video_id}" text_splitter = RecursiveCharacterTextSplitter( chunk_size=1000, chunk_overlap=100, length_function=len, separators=["\n\n", "\n", " ", ""] ) texts = [] metadatas = [] metadata = { 'source_document': title, 'link': url } record_texts = text_splitter.split_text(transcript) print(f"Split documents into {len(record_texts)} chunks") record_metadatas = [{"chunk": j, "text": text, **metadata} for j, text in enumerate(record_texts)] print(f"Uploading {len(record_texts)} chunks to Pinecone...") texts.extend(record_texts) metadatas.extend(record_metadatas) ids = [str(uuid4()) for _ in range(len(texts))] embeds = embedder.embed_documents(texts) try: print("Upserting data to pinecone...") index.upsert(vectors=zip(ids, embeds, metadatas)) except Exception as e: print(f"Error upserting data to Pinecone: {e}") if len(texts) >= BATCH_LIMIT: texts = [] metadatas = [] def index_channel(channel_id, embedder, index): print("Indexing channel...") videos = scrapetube.get_channel(channel_id) for video in videos: print(f"Ready to process {video['videoId']}") index_video(video["videoId"], embedder, index) def configure_vector_database(): print("Configuring Pinecone...") pc = Pinecone( api_key=os.getenv("PINECONE_API_KEY") ) if INDEX_NAME not in pc.list_indexes().names(): pc.create_index( index=os.getenv("INDEX_NAME"), metric='cosine', dimension=1536, spec=ServerlessSpec( cloud="aws", region="us-east-1" ) ) index = pc.Index(os.getenv("INDEX_NAME")) store = LocalFileStore("./cache/") # default model is text-embedding-ada-002 core_embeddings_model = OpenAIEmbeddings() embedder = CacheBackedEmbeddings.from_bytes_store( core_embeddings_model, store, namespace=core_embeddings_model.model ) return embedder, index embedder, index = configure_vector_database() index_channel(os.getenv["CHANNEL_ID"], embedder, index)