|
import argparse |
|
import os |
|
import json |
|
from openai import OpenAI |
|
from PyPDF2 import PdfReader |
|
from pinecone import Pinecone, ServerlessSpec |
|
from dotenv import load_dotenv |
|
import tiktoken |
|
|
|
load_dotenv() |
|
|
|
print("Starting the script...") |
|
|
|
|
|
parser = argparse.ArgumentParser(description="Process PDFs and match letters.") |
|
parser.add_argument("--test", action="store_true", help="Run in test mode") |
|
args = parser.parse_args() |
|
|
|
|
|
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY")) |
|
print("OpenAI client set up.") |
|
|
|
|
|
pc = Pinecone(api_key=os.getenv("PINECONE_API_KEY")) |
|
index_name = "match" |
|
print(f"Pinecone client set up. Using index name: {index_name}") |
|
|
|
|
|
if index_name not in pc.list_indexes().names(): |
|
print(f"Index '{index_name}' not found. Creating new index...") |
|
pc.create_index( |
|
name=index_name, |
|
dimension=3072, |
|
metric="cosine", |
|
spec=ServerlessSpec( |
|
cloud="aws", |
|
region="us-west-2" |
|
) |
|
) |
|
print(f"Created new index: {index_name}") |
|
else: |
|
print(f"Index '{index_name}' already exists.") |
|
|
|
index = pc.Index(index_name) |
|
print("Pinecone index initialized.") |
|
|
|
def get_embedding(text): |
|
print("Getting embedding for text...") |
|
response = client.embeddings.create(input=text, model="text-embedding-3-large") |
|
print("Embedding obtained.") |
|
return response.data[0].embedding |
|
|
|
def split_text(text, max_tokens=8000): |
|
encoding = tiktoken.encoding_for_model("text-embedding-3-large") |
|
tokens = encoding.encode(text) |
|
chunks = [] |
|
current_chunk = [] |
|
current_length = 0 |
|
|
|
for token in tokens: |
|
if current_length + 1 > max_tokens: |
|
chunks.append(encoding.decode(current_chunk)) |
|
current_chunk = [] |
|
current_length = 0 |
|
current_chunk.append(token) |
|
current_length += 1 |
|
|
|
if current_chunk: |
|
chunks.append(encoding.decode(current_chunk)) |
|
|
|
return chunks |
|
|
|
def save_pdf_to_pinecone(file_path, file_name): |
|
print(f"Processing file: {file_path}") |
|
pdf_reader = PdfReader(file_path) |
|
content = "" |
|
for page in pdf_reader.pages: |
|
content += page.extract_text() |
|
|
|
print(f"Extracted {len(content)} characters from the PDF.") |
|
chunks = split_text(content) |
|
print(f"Split content into {len(chunks)} chunks.") |
|
|
|
for i, chunk in enumerate(chunks): |
|
embedding = get_embedding(chunk) |
|
chunk_id = f"{file_name}_chunk_{i}" |
|
print(f"Upserting vector for {chunk_id} to Pinecone...") |
|
index.upsert(vectors=[ |
|
{ |
|
"id": chunk_id, |
|
"values": embedding, |
|
"metadata": {"content": chunk, "file_name": file_name, "chunk_index": i} |
|
} |
|
]) |
|
print(f"Vector for {chunk_id} upserted successfully.") |
|
|
|
def match_letter(letter_content): |
|
print("Matching letter content...") |
|
query_embedding = get_embedding(letter_content) |
|
search_results = index.query(vector=query_embedding, top_k=1, include_metadata=True) |
|
|
|
if search_results['matches']: |
|
best_match = search_results['matches'][0] |
|
print(f"Best match found: {best_match['metadata']['file_name']}") |
|
return best_match['metadata']['file_name'] |
|
else: |
|
print("No match found.") |
|
return None |
|
|
|
def process_matches_file(file_path, test_mode=False): |
|
print(f"Processing matches file: {file_path}") |
|
with open(file_path, 'r') as f: |
|
matches = json.load(f) |
|
|
|
print(f"Found {len(matches)} matches in the file.") |
|
for input_file, _ in matches.items(): |
|
input_path = os.path.join('docs', 'in', input_file) |
|
|
|
if test_mode: |
|
print(f"Test mode: Would process input file: {input_path}") |
|
else: |
|
if os.path.exists(input_path): |
|
print(f"Processing input file: {input_path}") |
|
save_pdf_to_pinecone(input_path, input_file) |
|
else: |
|
print(f"Input file not found: {input_path}") |
|
|
|
if test_mode: |
|
print("Test mode: Stopping after first match.") |
|
break |
|
|
|
def clear_index(): |
|
print(f"Clearing all vectors from index '{index_name}'...") |
|
try: |
|
|
|
index.delete(delete_all=True) |
|
print(f"All vectors deleted from index '{index_name}'.") |
|
except Exception as e: |
|
print(f"Error clearing index: {e}") |
|
|
|
def main(): |
|
print("Starting main function...") |
|
|
|
if not args.test: |
|
|
|
clear_db = input("Do you want to clear the database before processing? (y/n): ").lower() |
|
if clear_db == 'y': |
|
clear_index() |
|
|
|
|
|
process_matches_file('matches.json', args.test) |
|
|
|
|
|
print("Enter the content of your letter (or 'q' to quit):") |
|
while True: |
|
user_input = input() |
|
if user_input.lower() == 'q': |
|
break |
|
|
|
matched_file = match_letter(user_input) |
|
if matched_file: |
|
print(f"The best match for your letter is: {matched_file}") |
|
else: |
|
print("No match found for your letter.") |
|
|
|
if __name__ == "__main__": |
|
main() |