|
|
|
import torch
|
|
import torch.nn as nn
|
|
import torch.optim as optim
|
|
import pandas as pd
|
|
from sentence_transformers import SentenceTransformer, util
|
|
import numpy as np
|
|
from sklearn.preprocessing import MinMaxScaler
|
|
from collections import Counter
|
|
import re
|
|
import string
|
|
from collections import Counter
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
from nltk.corpus import stopwords
|
|
from nltk.stem import WordNetLemmatizer
|
|
from nltk.tokenize import word_tokenize
|
|
import spacy
|
|
|
|
def truncate_text(text, max_length=1024):
|
|
tokens = text.split()
|
|
if len(tokens) > max_length:
|
|
return ' '.join(tokens[:max_length])
|
|
return text
|
|
|
|
class RankingNN(nn.Module):
|
|
def __init__(self, input_size=7):
|
|
super(RankingNN, self).__init__()
|
|
self.fc1 = nn.Linear(input_size, 64)
|
|
self.fc2 = nn.Linear(64, 32)
|
|
self.fc3 = nn.Linear(32, 16)
|
|
self.fc4 = nn.Linear(16, 1)
|
|
self.dropout = nn.Dropout(0.2)
|
|
|
|
def forward(self, x):
|
|
x = torch.relu(self.fc1(x))
|
|
x = self.dropout(x)
|
|
x = torch.relu(self.fc2(x))
|
|
x = self.dropout(x)
|
|
x = torch.relu(self.fc3(x))
|
|
x = self.fc4(x)
|
|
return x
|
|
|
|
transformer_model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
ranking_model = RankingNN()
|
|
optimizer = optim.Adam(ranking_model.parameters(), lr=0.001, weight_decay=1e-5)
|
|
criterion = nn.MSELoss()
|
|
scaler = MinMaxScaler()
|
|
|
|
|
|
import nltk
|
|
nltk.download('punkt')
|
|
nltk.download('stopwords')
|
|
nltk.download('wordnet')
|
|
|
|
|
|
stop_words = set(stopwords.words('english'))
|
|
lemmatizer = WordNetLemmatizer()
|
|
nlp = spacy.load("en_core_web_sm")
|
|
|
|
def preprocess_text(text):
|
|
"""
|
|
Preprocess the input text by lowercasing, removing punctuation, and filtering out stopwords.
|
|
Lemmatization is applied as well.
|
|
"""
|
|
|
|
text = text.lower()
|
|
|
|
|
|
text = re.sub(r'[' + string.punctuation + ']', ' ', text)
|
|
|
|
|
|
words = word_tokenize(text)
|
|
|
|
|
|
processed_words = [lemmatizer.lemmatize(word) for word in words if word.isalpha() and word not in stop_words]
|
|
|
|
return processed_words
|
|
|
|
def extract_named_entities(text):
|
|
"""
|
|
Extract named entities (e.g., people, organizations, locations) from the text.
|
|
"""
|
|
doc = nlp(text)
|
|
named_entities = [ent.text for ent in doc.ents if ent.label_ in {"PERSON", "ORG", "GPE", "LOC"}]
|
|
return named_entities
|
|
|
|
def extract_keywords_tfidf(corpus, text, n=5):
|
|
"""
|
|
Extract keywords from the text using TF-IDF, combined with Named Entity Recognition and lemmatization.
|
|
"""
|
|
|
|
preprocessed_texts = [' '.join(preprocess_text(doc)) for doc in corpus]
|
|
preprocessed_text = ' '.join(preprocess_text(text))
|
|
|
|
|
|
named_entities = extract_named_entities(text)
|
|
|
|
|
|
vectorizer = TfidfVectorizer(max_features=1000)
|
|
X = vectorizer.fit_transform(preprocessed_texts)
|
|
|
|
|
|
feature_names = vectorizer.get_feature_names_out()
|
|
|
|
|
|
response = vectorizer.transform([preprocessed_text])
|
|
tfidf_scores = zip(feature_names, response.toarray()[0])
|
|
|
|
|
|
sorted_tfidf = sorted(tfidf_scores, key=lambda x: x[1], reverse=True)
|
|
|
|
|
|
keywords = [word for word, score in sorted_tfidf[:n]]
|
|
combined_keywords = keywords + named_entities
|
|
|
|
return combined_keywords[:n]
|
|
|
|
def extract_keywords(text, corpus, n=5):
|
|
"""
|
|
Wrapper function that combines preprocessing, TF-IDF, and Named Entity Recognition to extract top N keywords.
|
|
"""
|
|
if not text.strip():
|
|
return []
|
|
|
|
|
|
keywords = extract_keywords_tfidf(corpus, text, n)
|
|
|
|
|
|
if not keywords:
|
|
return extract_fallback_keywords(text, n)
|
|
|
|
return keywords
|
|
|
|
def extract_fallback_keywords(text, n=5):
|
|
"""
|
|
Fallback method to extract keywords based on word frequency in case TF-IDF or NER fails.
|
|
"""
|
|
words = preprocess_text(text)
|
|
word_freq = Counter(words)
|
|
return [word for word, _ in word_freq.most_common(n)]
|
|
|
|
def calculate_keyword_overlap(query_keywords, result_keywords):
|
|
if len(query_keywords) == 0:
|
|
return 0
|
|
return len(set(query_keywords) & set(result_keywords)) / len(query_keywords)
|
|
|
|
def train_ranking_model(query, results, corpus=None, epochs=1):
|
|
query = truncate_text(query)
|
|
if not results:
|
|
print("No results available. Skipping training.")
|
|
return []
|
|
|
|
if corpus is None:
|
|
|
|
corpus = [truncate_text(result['content']) for result in results if 'content' in result]
|
|
|
|
query_embedding = transformer_model.encode(query)
|
|
query_keywords = extract_keywords(query, corpus)
|
|
|
|
training_data = []
|
|
target_scores = []
|
|
|
|
for result in results:
|
|
|
|
content = truncate_text(result['content'])
|
|
content_embedding = transformer_model.encode(content)
|
|
|
|
|
|
title = truncate_text(result.get('title', ''))
|
|
title_embedding = transformer_model.encode(title)
|
|
|
|
meta_description = truncate_text(result.get('meta', {}).get('description', ''))
|
|
meta_description_embedding = transformer_model.encode(meta_description)
|
|
|
|
content_similarity = util.pytorch_cos_sim(query_embedding, content_embedding).item()
|
|
title_similarity = util.pytorch_cos_sim(query_embedding, title_embedding).item()
|
|
meta_description_similarity = util.pytorch_cos_sim(query_embedding, meta_description_embedding).item()
|
|
|
|
|
|
content_length = result.get('meta', {}).get('content_length', 0)
|
|
total_links = result.get('meta', {}).get('total_links', 0)
|
|
|
|
result_keywords = extract_keywords(content, corpus)
|
|
keyword_overlap = calculate_keyword_overlap(query_keywords, result_keywords)
|
|
domain_authority = get_domain_authority(result.get('link', ''))
|
|
|
|
features = [
|
|
content_similarity, title_similarity, meta_description_similarity,
|
|
content_length, total_links, keyword_overlap, domain_authority
|
|
]
|
|
|
|
training_data.append(features)
|
|
|
|
target_score = (0.4 * content_similarity + 0.3 * title_similarity +
|
|
0.2 * meta_description_similarity + 0.1 * keyword_overlap)
|
|
target_scores.append(target_score)
|
|
|
|
|
|
training_data = scaler.fit_transform(training_data)
|
|
training_data_tensor = torch.tensor(training_data, dtype=torch.float32)
|
|
target_scores_tensor = torch.tensor(target_scores, dtype=torch.float32).unsqueeze(1)
|
|
|
|
|
|
for epoch in range(epochs):
|
|
optimizer.zero_grad()
|
|
predicted_scores = ranking_model(training_data_tensor)
|
|
loss = criterion(predicted_scores, target_scores_tensor)
|
|
loss.backward()
|
|
optimizer.step()
|
|
|
|
if (epoch + 1) % 5 == 0:
|
|
print(f"Epoch {epoch+1}/{epochs}, Loss: {loss.item():.4f}")
|
|
|
|
|
|
with torch.no_grad():
|
|
final_scores = ranking_model(training_data_tensor).squeeze().tolist()
|
|
|
|
|
|
if isinstance(final_scores, float):
|
|
final_scores = [final_scores]
|
|
|
|
for result, score in zip(results, final_scores):
|
|
result['predicted_score'] = score
|
|
|
|
ranked_results = sorted(results, key=lambda x: x['predicted_score'], reverse=True)
|
|
return ranked_results
|
|
|
|
def get_domain_authority(url):
|
|
|
|
high_authority_domains = ['arxiv.org', 'ncbi.nlm.nih.gov', 'nature.com', 'science.org']
|
|
medium_authority_domains = ['wikipedia.org', 'stackexchange.com', 'github.com']
|
|
|
|
for domain in high_authority_domains:
|
|
if domain in url:
|
|
return 1.0
|
|
for domain in medium_authority_domains:
|
|
if domain in url:
|
|
return 0.7
|
|
return 0.5 |