|
import numpy as np |
|
import torch |
|
import json |
|
import pandas as pd |
|
from tqdm import tqdm |
|
from typing import List, Dict, Tuple, Set, Union, Optional |
|
from langchain.docstore.document import Document |
|
from langchain_community.vectorstores import FAISS |
|
from langchain_community.vectorstores.faiss import DistanceStrategy |
|
from langchain_core.embeddings.embeddings import Embeddings |
|
from FlagEmbedding import BGEM3FlagModel |
|
|
|
def setup_gpu_info() -> None: |
|
print(f"Số lượng GPU khả dụng: {torch.cuda.device_count()}") |
|
print(f"GPU hiện tại: {torch.cuda.current_device()}") |
|
print(f"Tên GPU: {torch.cuda.get_device_name(0)}") |
|
|
|
def load_model(model_name: str, use_fp16: bool = False) -> BGEM3FlagModel: |
|
return BGEM3FlagModel(model_name, use_fp16=use_fp16) |
|
|
|
def load_json_file(file_path: str) -> dict: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
return json.load(f) |
|
|
|
def load_jsonl_file(file_path: str) -> List[Dict]: |
|
corpus = [] |
|
with open(file_path, "r", encoding="utf-8") as file: |
|
for line in file: |
|
data = json.loads(line.strip()) |
|
corpus.append(data) |
|
return corpus |
|
|
|
def extract_corpus_from_legal_documents(legal_data: dict) -> List[Dict]: |
|
corpus = [] |
|
for document in legal_data: |
|
for article in document['articles']: |
|
chunk = { |
|
"law_id": document['law_id'], |
|
"article_id": article['article_id'], |
|
"title": article['title'], |
|
"text": article['title'] + '\n' + article['text'] |
|
} |
|
corpus.append(chunk) |
|
return corpus |
|
|
|
def convert_corpus_to_documents(corpus: List[Dict[str, str]]) -> List[Document]: |
|
documents = [] |
|
for i in tqdm(range(len(corpus)), desc="Converting corpus to documents"): |
|
context = corpus[i]['text'] |
|
metadata = { |
|
'law_id': corpus[i]['law_id'], |
|
'article_id': corpus[i]['article_id'], |
|
'title': corpus[i]['title'] |
|
} |
|
documents.append(Document(page_content=context, metadata=metadata)) |
|
return documents |
|
|
|
class CustomEmbedding(Embeddings): |
|
"""Custom embedding class that uses the BGEM3FlagModel.""" |
|
|
|
def __init__(self, model: BGEM3FlagModel, batch_size: int = 1): |
|
self.model = model |
|
self.batch_size = batch_size |
|
|
|
def embed_documents(self, texts: List[str]) -> List[List[float]]: |
|
embeddings = [] |
|
for i in tqdm(range(0, len(texts), self.batch_size), desc="Embedding documents"): |
|
batch_texts = texts[i:i+self.batch_size] |
|
batch_embeddings = self._get_batch_embeddings(batch_texts) |
|
embeddings.extend(batch_embeddings) |
|
torch.cuda.empty_cache() |
|
return np.vstack(embeddings) |
|
|
|
def embed_query(self, text: str) -> List[float]: |
|
embedding = self.model.encode(text, max_length=256)['dense_vecs'] |
|
return embedding |
|
|
|
def _get_batch_embeddings(self, texts: List[str]) -> List[List[float]]: |
|
with torch.no_grad(): |
|
outputs = self.model.encode(texts, batch_size=self.batch_size, max_length=2048)['dense_vecs'] |
|
batch_embeddings = outputs |
|
del outputs |
|
return batch_embeddings |
|
|
|
|
|
class VectorDB: |
|
"""Vector database for document retrieval.""" |
|
|
|
def __init__( |
|
self, |
|
documents: List[Document], |
|
embedding: Embeddings, |
|
vector_db=FAISS, |
|
index_path: Optional[str] = None |
|
) -> None: |
|
self.vector_db = vector_db |
|
self.embedding = embedding |
|
self.index_path = index_path |
|
self.db = self._build_db(documents) |
|
|
|
def _build_db(self, documents: List[Document]): |
|
if self.index_path: |
|
db = self.vector_db.load_local( |
|
self.index_path, |
|
self.embedding, |
|
allow_dangerous_deserialization=True |
|
) |
|
else: |
|
db = self.vector_db.from_documents( |
|
documents=documents, |
|
embedding=self.embedding, |
|
distance_strategy=DistanceStrategy.DOT_PRODUCT |
|
) |
|
return db |
|
|
|
def get_retriever(self, search_type: str = "similarity", search_kwargs: dict = {"k": 10}): |
|
retriever = self.db.as_retriever(search_type=search_type, search_kwargs=search_kwargs) |
|
return retriever |
|
|
|
def save_local(self, folder_path: str) -> None: |
|
self.db.save_local(folder_path) |
|
|
|
|
|
def process_sample(sample: dict, retriever) -> List[int]: |
|
question = sample['question'] |
|
docs = retriever.invoke(question) |
|
retrieved_article_full_ids = [ |
|
docs[i].metadata['law_id'] + "#" + docs[i].metadata['article_id'] |
|
for i in range(len(docs)) |
|
] |
|
indexes = [] |
|
for article in sample['relevant_articles']: |
|
article_full_id = article['law_id'] + "#" + article['article_id'] |
|
if article_full_id in retrieved_article_full_ids: |
|
idx = retrieved_article_full_ids.index(article_full_id) + 1 |
|
indexes.append(idx) |
|
else: |
|
indexes.append(0) |
|
return indexes |
|
|
|
def calculate_metrics(all_indexes: List[List[int]], num_samples: int, selected_keys: Set[str]) -> Dict[str, float]: |
|
count = [len(indexes) for indexes in all_indexes] |
|
result = {} |
|
|
|
for thres in [1, 3, 5, 10, 100]: |
|
found = [[y for y in x if 0 < y <= thres] for x in all_indexes] |
|
found_count = [len(x) for x in found] |
|
acc = sum(1 for i in range(num_samples) if found_count[i] > 0) / num_samples |
|
rec = sum(found_count[i] / count[i] for i in range(num_samples)) / num_samples |
|
pre = sum(found_count[i] / thres for i in range(num_samples)) / num_samples |
|
mrr = sum(1 / min(x) if x else 0 for x in found) / num_samples |
|
|
|
if f"Accuracy@{thres}" in selected_keys: |
|
result[f"Accuracy@{thres}"] = acc |
|
if f"MRR@{thres}" in selected_keys: |
|
result[f"MRR@{thres}"] = mrr |
|
|
|
return result |
|
|
|
|
|
def save_results(result: Dict[str, float], output_path: str) -> None: |
|
with open(output_path, "w", encoding="utf-8") as f: |
|
json.dump(result, f, indent=4, ensure_ascii=False) |
|
print(f"Results saved to {output_path}") |
|
|
|
|
|
def main(): |
|
setup_gpu_info() |
|
model = load_model('AITeamVN/Vietnamese_Embedding', use_fp16=False) |
|
samples = load_json_file('zalo_kaggle/train_question_answer.json')['items'] |
|
legal_data = load_json_file('zalo_kaggle/legal_corpus.json') |
|
|
|
corpus = extract_corpus_from_legal_documents(legal_data) |
|
documents = convert_corpus_to_documents(corpus) |
|
embedding = CustomEmbedding(model, batch_size=1) |
|
vectordb = VectorDB( |
|
documents=documents, |
|
embedding=embedding, |
|
vector_db=FAISS, |
|
index_path=None |
|
) |
|
retriever = vectordb.get_retriever(search_type="similarity", search_kwargs={"k": 100}) |
|
all_indexes = [] |
|
for sample in tqdm(samples, desc="Processing samples"): |
|
all_indexes.append(process_sample(sample, retriever)) |
|
selected_keys = {"Accuracy@1", "Accuracy@3", "Accuracy@5", "Accuracy@10", "MRR@10", "Accuracy@100"} |
|
result = calculate_metrics(all_indexes, len(samples), selected_keys) |
|
print(result) |
|
save_results(result, "zalo_kaggle/Vietnamese_Embedding.json") |
|
if __name__ == "__main__": |
|
main() |