Spaces:
Sleeping
Sleeping
import xml.etree.ElementTree as ET | |
from elasticsearch import Elasticsearch, helpers | |
import torch | |
from transformers import CLIPProcessor, CLIPModel | |
import numpy as np | |
from server.utils.database import get_db | |
from server.utils.model import get_clip_model | |
from server.models.database import DocumentModel | |
# Load CLIP model globally for reuse | |
clip_model, clip_processor = get_clip_model() | |
def insert_data(bulk_data, db=get_db(), index_name="patents"): | |
if bulk_data: | |
helpers.bulk(db, bulk_data) | |
return f"Inserted {len(bulk_data)} patent records with embeddings into Elasticsearch." | |
else: | |
return "No patent records found to insert." | |
def search_data(embedding: list[float] = None, db=get_db(), top_k=5, index_name="patents"): | |
if embedding is None or len(embedding) != 512: | |
raise ValueError("Embedding must be a list of 512 floats.") | |
body = { | |
"size": top_k, | |
"query": { | |
"script_score": { | |
"query": {"match_all": {}}, | |
"script": { | |
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", | |
"params": {"query_vector": embedding} | |
} | |
} | |
} | |
} | |
res = db.search(index=index_name, body=body) | |
results = [] | |
for hit in res['hits']['hits']: | |
doc = hit['_source'] | |
results.append({ | |
"patent_id": doc.get('doc_numbers', [''])[0] if doc.get('doc_numbers') else '', | |
"invention_title": doc.get('invention_title', ''), | |
"assignors": doc.get('assignors', ''), | |
"assignees": doc.get('assignees', ''), | |
"conveyance_text": doc.get('conveyance_text', ''), | |
"reel_no": doc.get('reel_no', ''), | |
"frame_no": doc.get('frame_no', '') | |
}) | |
return results | |
# CRUD: Insert patent data from XML | |
def insert_patent_data(xml_file, index_name: str = "patents"): | |
tree = ET.parse(xml_file) | |
root = tree.getroot() | |
es = get_db() | |
# Create index if not exists | |
if not es.indices.exists(index=index_name): | |
es.indices.create(index=index_name, body={ | |
"mappings": { | |
"properties": { | |
"reel_no": {"type": "keyword"}, | |
"frame_no": {"type": "keyword"}, | |
"assignors": {"type": "text"}, | |
"assignees": {"type": "text"}, | |
"invention_title": {"type": "text"}, | |
"conveyance_text": {"type": "text"}, | |
"doc_numbers": {"type": "keyword"}, | |
"raw_text": {"type": "text"}, | |
"embedding": {"type": "dense_vector", "dims": 512, "index": True, "similarity": "cosine"} | |
} | |
} | |
}) | |
get_text = lambda el: el.text.strip() if el is not None and el.text else "" | |
bulk_data = [] | |
for pa in root.findall('.//patent-assignment'): | |
record = pa.find('assignment-record') | |
if record is None: | |
continue | |
reel_no = get_text(record.find('reel-no')) | |
frame_no = get_text(record.find('frame-no')) | |
conveyance_text = get_text(record.find('conveyance-text')) | |
assignors = ", ".join([ | |
get_text(a.find('name')) for a in pa.findall('.//patent-assignor') if get_text(a.find('name')) | |
]) | |
assignees = ", ".join([ | |
get_text(a.find('name')) for a in pa.findall('.//patent-assignee') if get_text(a.find('name')) | |
]) | |
invention_title = "" | |
doc_numbers = [] | |
for prop in pa.findall('.//patent-property'): | |
title = prop.find('invention-title') | |
if title is not None: | |
invention_title = get_text(title) | |
for doc in prop.findall('document-id'): | |
doc_num = get_text(doc.find('doc-number')) | |
if doc_num: | |
doc_numbers.append(doc_num) | |
embedding = None | |
if invention_title: | |
inputs = clip_processor(text=[invention_title], return_tensors="pt", padding=True, truncation=True) | |
with torch.no_grad(): | |
embedding = clip_model.get_text_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist() | |
else: | |
embedding = [0.0]*512 | |
doc = { | |
"reel_no": reel_no, | |
"frame_no": frame_no, | |
"assignors": assignors, | |
"assignees": assignees, | |
"invention_title": invention_title, | |
"conveyance_text": conveyance_text, | |
"doc_numbers": doc_numbers, | |
"raw_text": invention_title, | |
"embedding": embedding | |
} | |
bulk_data.append({"_index": index_name, "_source": doc}) | |
if bulk_data: | |
helpers.bulk(es, bulk_data) | |
return f"Inserted {len(bulk_data)} patent records with embeddings into Elasticsearch." | |
else: | |
return "No patent records found to insert." | |
# CRUD: Search patents by text or image | |
def search_patents(query=None, image_path=None, top_k=5, index_name="patents"): | |
es = get_db() | |
if query: | |
inputs = clip_processor(text=[query], return_tensors="pt", padding=True, truncation=True) | |
with torch.no_grad(): | |
embedding = clip_model.get_text_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist() | |
elif image_path: | |
from PIL import Image | |
image = Image.open(image_path).convert("RGB") | |
inputs = clip_processor(images=image, return_tensors="pt") | |
with torch.no_grad(): | |
embedding = clip_model.get_image_features(**inputs)[0].cpu().numpy().astype(np.float32).tolist() | |
else: | |
return [] | |
body = { | |
"size": top_k, | |
"query": { | |
"script_score": { | |
"query": {"match_all": {}}, | |
"script": { | |
"source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", | |
"params": {"query_vector": embedding} | |
} | |
} | |
} | |
} | |
res = es.search(index=index_name, body=body) | |
results = [] | |
for hit in res['hits']['hits']: | |
doc = hit['_source'] | |
results.append({ | |
"patent_id": doc.get('doc_numbers', [''])[0] if doc.get('doc_numbers') else '', | |
"invention_title": doc.get('invention_title', ''), | |
"assignors": doc.get('assignors', ''), | |
"assignees": doc.get('assignees', ''), | |
"conveyance_text": doc.get('conveyance_text', ''), | |
"reel_no": doc.get('reel_no', ''), | |
"frame_no": doc.get('frame_no', '') | |
}) | |
return results | |