Spaces:
Sleeping
Sleeping
import gradio as gr | |
import PyPDF2 | |
import io | |
import os | |
from dotenv import load_dotenv | |
from pinecone import Pinecone, ServerlessSpec | |
from openai import OpenAI | |
import uuid | |
import re | |
# Load environment variables from .env file | |
load_dotenv() | |
# Initialize OpenAI client | |
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) | |
# Initialize Pinecone | |
PINECONE_API_KEY = os.getenv("PINECONE_API_KEY") | |
PINECONE_ENVIRONMENT = os.getenv("PINECONE_ENVIRONMENT") | |
INDEX_NAME = "ghana" | |
EMBEDDING_MODEL = "text-embedding-3-large" | |
EMBEDDING_DIMENSION = 3072 | |
# Initialize Pinecone | |
pc = Pinecone(api_key=PINECONE_API_KEY) | |
# Check if the index exists | |
if INDEX_NAME not in pc.list_indexes().names(): | |
# Create the index with updated dimensions | |
pc.create_index( | |
name=INDEX_NAME, | |
dimension=EMBEDDING_DIMENSION, | |
metric="cosine", | |
spec=ServerlessSpec( | |
cloud=PINECONE_ENVIRONMENT.split('-')[0], # Assuming environment is in format 'gcp-starter' | |
region=PINECONE_ENVIRONMENT.split('-')[1] | |
) | |
) | |
else: | |
# Optionally, verify the existing index's dimension matches | |
existing_index = pc.describe_index(INDEX_NAME) | |
if existing_index.dimension != EMBEDDING_DIMENSION: | |
raise ValueError(f"Existing index '{INDEX_NAME}' has dimension {existing_index.dimension}, expected {EMBEDDING_DIMENSION}. Please choose a different index name or adjust accordingly.") | |
# Connect to the Pinecone index | |
index = pc.Index(INDEX_NAME) | |
def transcribe_pdf(pdf_file): | |
# Read PDF and extract text | |
pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_file)) | |
text = "" | |
for page in pdf_reader.pages: | |
page_text = page.extract_text() | |
if page_text: | |
text += page_text + "\n" | |
# Dynamic Chunking | |
chunks = dynamic_chunking(text, max_tokens=500, overlap=50) | |
# Generate embeddings for each chunk | |
embeddings = get_embeddings(chunks) | |
# Prepare upsert data | |
upsert_data = [ | |
(str(uuid.uuid4()), emb, {"text": chunk}) | |
for chunk, emb in zip(chunks, embeddings) | |
] | |
# Upsert to Pinecone | |
index.upsert(vectors=upsert_data) | |
return f"Successfully upserted {len(chunks)} chunks to Pinecone index '{INDEX_NAME}'." | |
def dynamic_chunking(text, max_tokens=500, overlap=50): | |
""" | |
Splits text into chunks with a maximum number of tokens and a specified overlap. | |
""" | |
# Simple tokenization based on whitespace | |
tokens = re.findall(r'\S+', text) | |
chunks = [] | |
start = 0 | |
while start < len(tokens): | |
end = start + max_tokens | |
chunk = ' '.join(tokens[start:end]) | |
chunks.append(chunk) | |
start += max_tokens - overlap | |
return chunks | |
def get_embeddings(chunks): | |
""" | |
Generates embeddings for each chunk using OpenAI's embedding API. | |
""" | |
response = client.embeddings.create( | |
input=chunks, | |
model=EMBEDDING_MODEL | |
) | |
embeddings = [data.embedding for data in response.data] | |
return embeddings | |
iface = gr.Interface( | |
fn=transcribe_pdf, | |
inputs=gr.File(label="Upload PDF", type="binary"), | |
outputs=gr.Textbox(label="Transcription"), | |
title="PDF Transcription and Upsert to Pinecone", | |
description="Upload a PDF file to extract its text content, chunk it dynamically, and upsert the chunks to a Pinecone index named 'ghana'." | |
) | |
if __name__ == "__main__": | |
iface.launch() |