Spaces:
Sleeping
Sleeping
import os | |
from tqdm import tqdm | |
import numpy as np | |
from transformers import ViTModel, ViTFeatureExtractor, ViTImageProcessor | |
from PIL import Image | |
import re | |
from fpdf import FPDF | |
from datetime import datetime | |
import fitz | |
import joblib | |
import json | |
model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k') | |
processor = ViTImageProcessor.from_pretrained('google/vit-base-patch16-224-in21k') | |
def create_pdf(input_text): | |
# Create instance of FPDF class | |
pdf = FPDF() | |
# Add a page | |
pdf.add_page() | |
# Set font | |
pdf.set_font("Arial", size=10) | |
# Split the input text into multiple lines if necessary | |
# This ensures that the text fits the page and multiple pages are handled | |
pdf.multi_cell(0, 5, txt=input_text) | |
# Create a unique file name with the current time | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
file_name = f"temp/PDFs/{timestamp}.pdf" | |
# Create output directory if it doesn't exist | |
os.makedirs(os.path.dirname(file_name), exist_ok=True) | |
# Save the PDF | |
pdf.output(file_name) | |
# Return the file path | |
return file_name | |
def pdf_to_image(pdf_path, zoom=2.0): | |
# Open the PDF file | |
pdf_document = fitz.open(pdf_path) | |
# Create a list to store image paths | |
image_paths = [] | |
# Create an 'Images' directory if it doesn't exist | |
os.makedirs("temp/Images", exist_ok=True) | |
# Iterate over PDF pages and convert each to an image | |
for page_num in range(len(pdf_document)): | |
page = pdf_document.load_page(page_num) # Load the page | |
# Set zoom level to improve quality | |
mat = fitz.Matrix(zoom, zoom) # Create a transformation matrix with the zoom level | |
pix = page.get_pixmap(matrix=mat) # Render the page to an image with the specified zoom | |
image_file = f'temp/Images/{os.path.basename(pdf_path)}_page_{page_num}.png' | |
pix.save(image_file) # Save the image as PNG | |
image_paths.append(image_file) | |
# Return the list containing paths of all images | |
return image_paths | |
def sanitize_text(text): | |
""" | |
Cleans and standardizes text by keeping only alphanumeric characters and spaces. | |
Args: | |
text (str): Text to sanitize. | |
Returns: | |
str: Sanitized text. | |
""" | |
if isinstance(text, str): | |
# Use regex to keep only alphanumeric characters and spaces | |
text = re.sub(r'[^a-zA-Z0-9\s]', '', text) | |
# Optionally, collapse multiple spaces into a single space | |
text = re.sub(r'\s+', ' ', text).strip() | |
return text | |
def text_to_images(text): | |
text = sanitize_text(text) | |
pdf_path = create_pdf(text) | |
image_paths = pdf_to_image(pdf_path) | |
return image_paths | |
def documents_to_images(path): | |
document_set = [] | |
for filename in os.listdir(path): | |
file_path = os.path.join(path, filename) | |
if os.path.isfile(file_path): | |
with open(file_path, "r") as f: | |
content = f.read() | |
document_set.append(content) | |
document_image_paths = [] | |
for document in document_set: | |
image_paths = text_to_images(document) | |
document_image_paths.append(image_paths) | |
return document_image_paths | |
def single_unit_embedding(text): | |
image_paths = text_to_images(text) | |
temp = [] | |
for image_path in image_paths: | |
image = Image.open(image_path) | |
inputs = processor(images=image, return_tensors="pt") | |
outputs = model(**inputs) | |
vector = outputs.last_hidden_state.mean(dim=1).detach().numpy() | |
temp.append(vector) | |
return np.mean(np.array(temp), axis=0) | |
def single_image_embedding(image): | |
inputs = processor(images=image, return_tensors="pt") | |
outputs = model(**inputs) | |
vector = outputs.last_hidden_state.mean(dim=1).detach().numpy() | |
return vector | |
def documents_to_vision_embeddings(documents): | |
document_vision_embeddings = [] | |
for document in tqdm(documents): | |
vector = single_unit_embedding(document) | |
document_vision_embeddings.append(vector) | |
return document_vision_embeddings | |
def queries_to_vision_embeddings(queries): | |
query_vision_embeddings = [] | |
for query in tqdm(queries): | |
vector = single_unit_embedding(query) | |
query_vision_embeddings.append(vector) | |
return query_vision_embeddings | |
def get_documents_from_scores(scores): | |
rankings = [] | |
for score in scores: | |
rankings.append(score[0]) | |
return rankings | |
def cosine_similarity(v1, v2): | |
v1 = np.array(v1) | |
v2 = np.array(v2) | |
if(np.linalg.norm(v1) != 0 and np.linalg.norm(v2) != 0): | |
sim = np.dot(v1, v2) / (np.linalg.norm(v1) * np.linalg.norm(v2)) | |
else: | |
sim = 0 | |
return sim | |
def vision_rankings(query_embedding, document_embeddings, k): | |
# query_embedding = single_unit_embedding(query) | |
scores = [] | |
for idx, embedding in enumerate(document_embeddings): | |
scores.append((idx, cosine_similarity(query_embedding[0], embedding[0]))) | |
scores = sorted(scores, key=lambda x: x[1], reverse=True) | |
scores = scores[:k] | |
rankings = get_documents_from_scores(scores) | |
return rankings, scores | |
def vision_pipeline(query, document_embeddings_path="Retrieval/savedModels/document-vision-embeddings.json", ids_path="Retrieval/savedModels/ids.pkl", k=100): | |
# document_embeddings = joblib.load(document_embeddings_path) | |
ids = joblib.load(ids_path) | |
with open(document_embeddings_path, "r") as f: | |
document_vision_embeddings2 = json.load(f) | |
document_vision_embeddings = [] | |
for embedding in tqdm(document_vision_embeddings2): | |
document_vision_embeddings.append(np.array(embedding)) | |
print("loaded embeddings") | |
query_embedding = single_unit_embedding(query) | |
rankings, scores = vision_rankings(query_embedding, document_vision_embeddings, k) | |
rankings2 = [] | |
for ranking in rankings: | |
rankings2.append(ids[ranking]) | |
return rankings2 |