|
import psycopg2 |
|
from psycopg2.extras import RealDictCursor |
|
import re |
|
import json |
|
from typing import List, Dict, Any, Optional, Union |
|
from sklearn.feature_extraction.text import TfidfVectorizer |
|
from sklearn.metrics.pairwise import cosine_similarity |
|
import numpy as np |
|
from pydantic import BaseModel, Field |
|
from bs4 import BeautifulSoup |
|
import math |
|
|
|
class TourRecommendationRequest(BaseModel): |
|
user_id: Optional[int] = Field(None) |
|
tour_id: Optional[int] = Field(None) |
|
limit: int = Field(3, ge=1, le=10) |
|
|
|
class TourSummary(BaseModel): |
|
tour_id: int |
|
title: str |
|
duration: Optional[str] = None |
|
departure_location: Optional[str] = None |
|
destination: Optional[List[str]] = None |
|
region: Optional[int] = None |
|
description: Optional[str] = None |
|
similarity_score: Optional[float] = None |
|
|
|
class TourRecommendationResponse(BaseModel): |
|
recommendations: List[TourSummary] |
|
|
|
class ContentBasedRecommender: |
|
def __init__(self, conn): |
|
self.conn = conn |
|
vietnamese_stop_words = [ |
|
"và", "là", "của", "trong", "được", "có", "không", "cho", "với", |
|
"tại", "bằng", "để", "này", "khi", "một", "những", "các", "đã", |
|
"rồi", "lại", "nếu", "vì", "thì", "từ", "ra", "đến", "trên", "dưới", |
|
"quý", "khách", "tham", "quan", "du", "lịch", "tour", "ngày", "đêm", |
|
"ăn", "sáng", "trưa", "tối", "nghỉ", "khách", "sạn", "tự", "túc" |
|
] |
|
|
|
self.vectorizer = TfidfVectorizer( |
|
max_features=8000, |
|
stop_words=vietnamese_stop_words, |
|
ngram_range=(1, 3), |
|
min_df=1, |
|
max_df=0.8, |
|
token_pattern=r'[a-zA-ZÀ-ỹ]+', |
|
lowercase=True |
|
) |
|
|
|
self.field_weights = { |
|
'title': 0.20, |
|
'destination': 0.30, |
|
'description': 0.15, |
|
'departure_location': 0.10, |
|
'region': 0.15, |
|
'itinerary': 0.10, |
|
'duration': 0.05, |
|
'attractions': 0.15 |
|
} |
|
|
|
self.region_proximity = { |
|
1: {1: 1.0, 2: 0.6, 3: 0.3}, |
|
2: {1: 0.6, 2: 1.0, 3: 0.7}, |
|
3: {1: 0.3, 2: 0.7, 3: 1.0} |
|
} |
|
|
|
def clean_html(self, text): |
|
if not text: |
|
return "" |
|
try: |
|
soup = BeautifulSoup(text, 'html.parser') |
|
clean_text = soup.get_text() |
|
clean_text = re.sub(r'\s+', ' ', clean_text).strip() |
|
return clean_text |
|
except: |
|
return str(text) |
|
|
|
def preprocess_text(self, text): |
|
if not text: |
|
return "" |
|
|
|
text = self.clean_html(text) |
|
|
|
text = str(text).lower() |
|
|
|
text = re.sub(r'[^\w\sÀ-ỹ]', ' ', text) |
|
|
|
text = re.sub(r'\s+', ' ', text).strip() |
|
|
|
words = text.split() |
|
words = [word for word in words if len(word) >= 2] |
|
|
|
return " ".join(words) |
|
|
|
def preprocess_list(self, items): |
|
if not items: |
|
return "" |
|
processed_items = [] |
|
for item in items: |
|
cleaned = self.preprocess_text(item) |
|
if cleaned: |
|
processed_items.append(cleaned) |
|
return " ".join(processed_items) |
|
|
|
def extract_attractions_from_itinerary(self, itinerary): |
|
if not itinerary: |
|
return "" |
|
|
|
try: |
|
if isinstance(itinerary, str): |
|
data = json.loads(itinerary) |
|
else: |
|
data = itinerary |
|
|
|
attractions = [] |
|
|
|
if isinstance(data, list): |
|
for day in data: |
|
if isinstance(day, dict): |
|
description = day.get('description', '') |
|
if description: |
|
clean_desc = self.clean_html(description) |
|
soup = BeautifulSoup(description, 'html.parser') |
|
strong_tags = soup.find_all('strong') |
|
for tag in strong_tags: |
|
attractions.append(tag.get_text()) |
|
|
|
colored_spans = soup.find_all('span', style=lambda x: x and 'color' in x) |
|
for span in colored_spans: |
|
attractions.append(span.get_text()) |
|
|
|
clean_attractions = [] |
|
for attraction in attractions: |
|
cleaned = self.preprocess_text(attraction) |
|
if cleaned and len(cleaned) > 3: |
|
clean_attractions.append(cleaned) |
|
|
|
return " ".join(clean_attractions) |
|
|
|
except Exception as e: |
|
print(f"Error extracting attractions: {e}") |
|
return "" |
|
|
|
def preprocess_json(self, json_data): |
|
if not json_data: |
|
return "" |
|
try: |
|
if isinstance(json_data, str): |
|
data = json.loads(json_data) |
|
else: |
|
data = json_data |
|
|
|
text_values = [] |
|
|
|
def extract_values(obj): |
|
if isinstance(obj, dict): |
|
for key, val in obj.items(): |
|
if key.lower() in ['title', 'description', 'name', 'location']: |
|
if val: |
|
clean_val = self.clean_html(str(val)) |
|
if clean_val: |
|
text_values.append(clean_val) |
|
else: |
|
extract_values(val) |
|
elif isinstance(obj, list): |
|
for item in obj: |
|
extract_values(item) |
|
elif obj and len(str(obj)) > 3: |
|
clean_val = self.clean_html(str(obj)) |
|
if clean_val: |
|
text_values.append(clean_val) |
|
|
|
extract_values(data) |
|
return " ".join(text_values) |
|
except Exception as e: |
|
print(f"Error preprocessing JSON: {e}") |
|
return "" |
|
|
|
def get_all_tours(self): |
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
|
cursor.execute(""" |
|
SELECT |
|
t.tour_id, |
|
t.title, |
|
t.duration, |
|
t.departure_location, |
|
t.description, |
|
t.destination, |
|
t.region, |
|
t.itinerary, |
|
t.max_participants, |
|
MIN(d.price_adult) as min_price, |
|
MAX(d.price_adult) as max_price, |
|
AVG(d.price_adult) as avg_price |
|
FROM |
|
Tour t |
|
LEFT JOIN |
|
Departure d ON t.tour_id = d.tour_id AND d.availability = true |
|
WHERE |
|
t.availability = true |
|
GROUP BY |
|
t.tour_id, t.title, t.duration, t.departure_location, |
|
t.description, t.destination, t.region, t.itinerary, t.max_participants |
|
""") |
|
return cursor.fetchall() |
|
|
|
def get_user_history(self, user_id): |
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
|
cursor.execute(""" |
|
SELECT |
|
h.tour_id, |
|
COUNT(*) as interaction_count, |
|
MAX(h.timestamp) as last_interaction |
|
FROM |
|
History h |
|
WHERE |
|
h.user_id = %s |
|
GROUP BY |
|
h.tour_id |
|
ORDER BY |
|
interaction_count DESC, last_interaction DESC |
|
""", (user_id,)) |
|
return cursor.fetchall() |
|
|
|
def get_tour_by_id(self, tour_id): |
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
|
cursor.execute(""" |
|
SELECT |
|
t.tour_id, |
|
t.title, |
|
t.duration, |
|
t.departure_location, |
|
t.description, |
|
t.destination, |
|
t.region, |
|
t.itinerary, |
|
t.max_participants, |
|
MIN(d.price_adult) as min_price, |
|
MAX(d.price_adult) as max_price, |
|
AVG(d.price_adult) as avg_price |
|
FROM |
|
Tour t |
|
LEFT JOIN |
|
Departure d ON t.tour_id = d.tour_id AND d.availability = true |
|
WHERE |
|
t.tour_id = %s |
|
GROUP BY |
|
t.tour_id, t.title, t.duration, t.departure_location, |
|
t.description, t.destination, t.region, t.itinerary, t.max_participants |
|
""", (tour_id,)) |
|
return cursor.fetchone() |
|
|
|
def extract_duration_days(self, duration): |
|
if not duration: |
|
return 0 |
|
|
|
numbers = re.findall(r'\d+', duration) |
|
if numbers: |
|
return int(numbers[0]) |
|
return 0 |
|
|
|
def calculate_price_similarity(self, price1, price2): |
|
if not price1 or not price2: |
|
return 0.5 |
|
|
|
price1 = float(price1) |
|
price2 = float(price2) |
|
|
|
max_price = max(price1, price2) |
|
min_price = min(price1, price2) |
|
|
|
if max_price == 0: |
|
return 1.0 |
|
|
|
ratio = min_price / max_price |
|
return ratio |
|
|
|
def create_tour_features(self, tours): |
|
tour_features = {} |
|
|
|
for tour in tours: |
|
title = self.preprocess_text(tour.get('title', '')) |
|
description = self.preprocess_text(tour.get('description', '')) |
|
departure_location = self.preprocess_text(tour.get('departure_location', '')) |
|
destination = self.preprocess_list(tour.get('destination', [])) |
|
region = self.preprocess_text(str(tour.get('region', ''))) |
|
duration = self.preprocess_text(tour.get('duration', '')) |
|
|
|
itinerary = self.preprocess_json(tour.get('itinerary')) |
|
attractions = self.extract_attractions_from_itinerary(tour.get('itinerary')) |
|
|
|
combined_features = ( |
|
f"{title} " * int(self.field_weights['title'] * 20) + |
|
f"{destination} " * int(self.field_weights['destination'] * 20) + |
|
f"{description} " * int(self.field_weights['description'] * 20) + |
|
f"{departure_location} " * int(self.field_weights['departure_location'] * 20) + |
|
f"{region} " * int(self.field_weights['region'] * 20) + |
|
f"{itinerary} " * int(self.field_weights['itinerary'] * 20) + |
|
f"{duration} " * int(self.field_weights['duration'] * 20) + |
|
f"{attractions} " * int(self.field_weights['attractions'] * 20) |
|
) |
|
|
|
tour_features[tour['tour_id']] = combined_features.strip() |
|
|
|
return tour_features |
|
|
|
def calculate_enhanced_similarity(self, tours): |
|
tour_features = self.create_tour_features(tours) |
|
|
|
tour_ids = list(tour_features.keys()) |
|
feature_texts = [tour_features[tour_id] for tour_id in tour_ids] |
|
|
|
if not feature_texts or all(not text.strip() for text in feature_texts): |
|
return {} |
|
|
|
try: |
|
tfidf_matrix = self.vectorizer.fit_transform(feature_texts) |
|
text_similarity = cosine_similarity(tfidf_matrix, tfidf_matrix) |
|
except Exception as e: |
|
print(f"Error in TF-IDF calculation: {e}") |
|
return {} |
|
|
|
tour_lookup = {tour['tour_id']: tour for tour in tours} |
|
|
|
similarity_dict = {} |
|
|
|
for i, tour_id in enumerate(tour_ids): |
|
similarity_dict[tour_id] = {} |
|
tour_i = tour_lookup[tour_id] |
|
|
|
for j, other_tour_id in enumerate(tour_ids): |
|
if i == j: |
|
continue |
|
|
|
tour_j = tour_lookup[other_tour_id] |
|
|
|
text_sim = text_similarity[i][j] |
|
|
|
region_i = tour_i.get('region', 1) |
|
region_j = tour_j.get('region', 1) |
|
region_sim = self.region_proximity.get(region_i, {}).get(region_j, 0.3) |
|
|
|
duration_i = self.extract_duration_days(tour_i.get('duration')) |
|
duration_j = self.extract_duration_days(tour_j.get('duration')) |
|
duration_sim = 1.0 if duration_i == duration_j else 0.7 if abs(duration_i - duration_j) <= 1 else 0.3 |
|
|
|
price_i = tour_i.get('avg_price') |
|
price_j = tour_j.get('avg_price') |
|
price_sim = self.calculate_price_similarity(price_i, price_j) |
|
|
|
final_similarity = ( |
|
text_sim * 0.6 + |
|
region_sim * 0.2 + |
|
duration_sim * 0.1 + |
|
price_sim * 0.1 |
|
) |
|
|
|
similarity_dict[tour_id][other_tour_id] = final_similarity |
|
|
|
return similarity_dict |
|
|
|
def recommend_similar_tours(self, tour_id, limit=3): |
|
all_tours = self.get_all_tours() |
|
target_tour = None |
|
|
|
for tour in all_tours: |
|
if tour.get('tour_id') == tour_id: |
|
target_tour = tour |
|
break |
|
|
|
if not target_tour: |
|
return [] |
|
|
|
similarity_dict = self.calculate_enhanced_similarity(all_tours) |
|
|
|
if tour_id in similarity_dict: |
|
similar_tours = sorted( |
|
similarity_dict[tour_id].items(), |
|
key=lambda x: x[1], |
|
reverse=True |
|
)[:limit] |
|
|
|
recommended_tours = [] |
|
for similar_tour_id, similarity_score in similar_tours: |
|
for tour in all_tours: |
|
if tour.get('tour_id') == similar_tour_id: |
|
tour_copy = dict(tour) |
|
tour_copy['similarity_score'] = float(similarity_score) |
|
recommended_tours.append(tour_copy) |
|
break |
|
|
|
return recommended_tours |
|
|
|
return [] |
|
|
|
def recommend_for_user(self, user_id, limit=3): |
|
user_history = self.get_user_history(user_id) |
|
|
|
if not user_history: |
|
return self.recommend_popular_tours(limit) |
|
|
|
all_tours = self.get_all_tours() |
|
similarity_dict = self.calculate_enhanced_similarity(all_tours) |
|
|
|
tour_scores = {} |
|
total_interactions = sum(h['interaction_count'] for h in user_history) |
|
|
|
for tour in all_tours: |
|
tour_id = tour.get('tour_id') |
|
if tour_id is None or any(h['tour_id'] == tour_id for h in user_history): |
|
continue |
|
|
|
total_similarity = 0 |
|
total_weight = 0 |
|
|
|
for history_item in user_history: |
|
history_tour_id = history_item['tour_id'] |
|
interaction_weight = history_item['interaction_count'] / total_interactions |
|
|
|
if (history_tour_id in similarity_dict and |
|
tour_id in similarity_dict[history_tour_id]): |
|
|
|
similarity = similarity_dict[history_tour_id][tour_id] |
|
total_similarity += similarity * interaction_weight |
|
total_weight += interaction_weight |
|
|
|
if total_weight > 0: |
|
tour_scores[tour_id] = total_similarity / total_weight |
|
|
|
user_regions = set() |
|
for history_item in user_history: |
|
for tour in all_tours: |
|
if tour['tour_id'] == history_item['tour_id']: |
|
user_regions.add(tour.get('region')) |
|
break |
|
|
|
for tour_id, score in tour_scores.items(): |
|
for tour in all_tours: |
|
if tour['tour_id'] == tour_id: |
|
if tour.get('region') not in user_regions: |
|
tour_scores[tour_id] = score * 1.1 |
|
break |
|
|
|
top_tours = sorted( |
|
tour_scores.items(), |
|
key=lambda x: x[1], |
|
reverse=True |
|
)[:limit] |
|
|
|
recommended_tours = [] |
|
for tour_id, similarity_score in top_tours: |
|
for tour in all_tours: |
|
if tour['tour_id'] == tour_id: |
|
tour_copy = dict(tour) |
|
tour_copy['similarity_score'] = float(similarity_score) |
|
recommended_tours.append(tour_copy) |
|
break |
|
|
|
return recommended_tours |
|
|
|
def recommend_popular_tours(self, limit=3): |
|
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor: |
|
cursor.execute(""" |
|
SELECT |
|
t.tour_id, |
|
t.title, |
|
t.duration, |
|
t.departure_location, |
|
t.description, |
|
t.destination, |
|
t.region, |
|
COUNT(DISTINCT b.booking_id) as booking_count, |
|
AVG(r.average_rating) as avg_rating, |
|
COUNT(DISTINCT r.review_id) as review_count |
|
FROM |
|
Tour t |
|
LEFT JOIN |
|
Departure d ON t.tour_id = d.tour_id |
|
LEFT JOIN |
|
Booking b ON d.departure_id = b.departure_id |
|
LEFT JOIN |
|
Review r ON t.tour_id = r.tour_id |
|
WHERE |
|
t.availability = true |
|
GROUP BY |
|
t.tour_id, t.title, t.duration, t.departure_location, |
|
t.description, t.destination, t.region |
|
ORDER BY |
|
(COUNT(DISTINCT b.booking_id) * 0.6 + |
|
COALESCE(AVG(r.average_rating), 3.0) * COUNT(DISTINCT r.review_id) * 0.4) DESC |
|
LIMIT %s |
|
""", (limit,)) |
|
|
|
popular_tours = cursor.fetchall() |
|
for tour in popular_tours: |
|
tour['similarity_score'] = None |
|
return popular_tours |
|
|
|
def get_recommendations(self, user_id=None, tour_id=None, limit=3): |
|
if tour_id: |
|
return self.recommend_similar_tours(tour_id, limit) |
|
elif user_id: |
|
return self.recommend_for_user(user_id, limit) |
|
else: |
|
return self.recommend_popular_tours(limit) |
|
|
|
def get_db_connection(): |
|
try: |
|
from src.database import conn_pool |
|
return conn_pool.getconn() |
|
except Exception as e: |
|
print(f"Error getting connection from pool: {e}") |
|
try: |
|
try: |
|
from src.config import DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME |
|
conn = psycopg2.connect( |
|
user=DB_USER, |
|
password=DB_PASSWORD, |
|
host=DB_HOST, |
|
port=DB_PORT, |
|
dbname=DB_NAME |
|
) |
|
except ImportError: |
|
import os |
|
conn = psycopg2.connect( |
|
user=os.getenv("DB_USER"), |
|
password=os.getenv("DB_PASSWORD"), |
|
host=os.getenv("DB_HOST"), |
|
port=os.getenv("DB_PORT"), |
|
dbname=os.getenv("DB_NAME") |
|
) |
|
return conn |
|
except Exception as e2: |
|
print(f"Error creating direct connection: {e2}") |
|
raise |
|
|
|
def return_db_connection(conn): |
|
try: |
|
from src.database import conn_pool |
|
conn_pool.putconn(conn) |
|
except Exception as e: |
|
print(f"Error returning connection to pool: {e}") |
|
try: |
|
conn.close() |
|
except: |
|
pass |
|
|
|
def convert_to_tour_summary(tour): |
|
return TourSummary( |
|
tour_id=tour.get('tour_id'), |
|
title=tour.get('title', ''), |
|
duration=tour.get('duration'), |
|
departure_location=tour.get('departure_location'), |
|
destination=tour.get('destination'), |
|
region=tour.get('region'), |
|
description=tour.get('description'), |
|
similarity_score=tour.get('similarity_score') |
|
) |
|
|
|
def get_tour_recommendations(user_id=None, tour_id=None, limit=3): |
|
conn = None |
|
try: |
|
conn = get_db_connection() |
|
recommender = ContentBasedRecommender(conn) |
|
recommended_tours = recommender.get_recommendations(user_id, tour_id, limit) |
|
tour_summaries = [convert_to_tour_summary(tour) for tour in recommended_tours] |
|
response = TourRecommendationResponse( |
|
recommendations=tour_summaries, |
|
recommendation_type="content-based" |
|
) |
|
return response |
|
except Exception as e: |
|
print(f"Error getting recommendations: {e}") |
|
raise |
|
finally: |
|
if conn: |
|
return_db_connection(conn) |