Travel_AI / src /recommendation_api.py
ayayaya12's picture
Update Recommend API
795f8d5
import psycopg2
from psycopg2.extras import RealDictCursor
import re
import json
from typing import List, Dict, Any, Optional, Union
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from pydantic import BaseModel, Field
from bs4 import BeautifulSoup
import math
class TourRecommendationRequest(BaseModel):
user_id: Optional[int] = Field(None)
tour_id: Optional[int] = Field(None)
limit: int = Field(3, ge=1, le=10)
class TourSummary(BaseModel):
tour_id: int
title: str
duration: Optional[str] = None
departure_location: Optional[str] = None
destination: Optional[List[str]] = None
region: Optional[int] = None
description: Optional[str] = None
similarity_score: Optional[float] = None
class TourRecommendationResponse(BaseModel):
recommendations: List[TourSummary]
class ContentBasedRecommender:
def __init__(self, conn):
self.conn = conn
vietnamese_stop_words = [
"và", "là", "của", "trong", "được", "có", "không", "cho", "với",
"tại", "bằng", "để", "này", "khi", "một", "những", "các", "đã",
"rồi", "lại", "nếu", "vì", "thì", "từ", "ra", "đến", "trên", "dưới",
"quý", "khách", "tham", "quan", "du", "lịch", "tour", "ngày", "đêm",
"ăn", "sáng", "trưa", "tối", "nghỉ", "khách", "sạn", "tự", "túc"
]
self.vectorizer = TfidfVectorizer(
max_features=8000,
stop_words=vietnamese_stop_words,
ngram_range=(1, 3),
min_df=1,
max_df=0.8,
token_pattern=r'[a-zA-ZÀ-ỹ]+',
lowercase=True
)
self.field_weights = {
'title': 0.20,
'destination': 0.30,
'description': 0.15,
'departure_location': 0.10,
'region': 0.15,
'itinerary': 0.10,
'duration': 0.05,
'attractions': 0.15
}
self.region_proximity = {
1: {1: 1.0, 2: 0.6, 3: 0.3},
2: {1: 0.6, 2: 1.0, 3: 0.7},
3: {1: 0.3, 2: 0.7, 3: 1.0}
}
def clean_html(self, text):
if not text:
return ""
try:
soup = BeautifulSoup(text, 'html.parser')
clean_text = soup.get_text()
clean_text = re.sub(r'\s+', ' ', clean_text).strip()
return clean_text
except:
return str(text)
def preprocess_text(self, text):
if not text:
return ""
text = self.clean_html(text)
text = str(text).lower()
text = re.sub(r'[^\w\sÀ-ỹ]', ' ', text)
text = re.sub(r'\s+', ' ', text).strip()
words = text.split()
words = [word for word in words if len(word) >= 2]
return " ".join(words)
def preprocess_list(self, items):
if not items:
return ""
processed_items = []
for item in items:
cleaned = self.preprocess_text(item)
if cleaned:
processed_items.append(cleaned)
return " ".join(processed_items)
def extract_attractions_from_itinerary(self, itinerary):
if not itinerary:
return ""
try:
if isinstance(itinerary, str):
data = json.loads(itinerary)
else:
data = itinerary
attractions = []
if isinstance(data, list):
for day in data:
if isinstance(day, dict):
description = day.get('description', '')
if description:
clean_desc = self.clean_html(description)
soup = BeautifulSoup(description, 'html.parser')
strong_tags = soup.find_all('strong')
for tag in strong_tags:
attractions.append(tag.get_text())
colored_spans = soup.find_all('span', style=lambda x: x and 'color' in x)
for span in colored_spans:
attractions.append(span.get_text())
clean_attractions = []
for attraction in attractions:
cleaned = self.preprocess_text(attraction)
if cleaned and len(cleaned) > 3:
clean_attractions.append(cleaned)
return " ".join(clean_attractions)
except Exception as e:
print(f"Error extracting attractions: {e}")
return ""
def preprocess_json(self, json_data):
if not json_data:
return ""
try:
if isinstance(json_data, str):
data = json.loads(json_data)
else:
data = json_data
text_values = []
def extract_values(obj):
if isinstance(obj, dict):
for key, val in obj.items():
if key.lower() in ['title', 'description', 'name', 'location']:
if val:
clean_val = self.clean_html(str(val))
if clean_val:
text_values.append(clean_val)
else:
extract_values(val)
elif isinstance(obj, list):
for item in obj:
extract_values(item)
elif obj and len(str(obj)) > 3:
clean_val = self.clean_html(str(obj))
if clean_val:
text_values.append(clean_val)
extract_values(data)
return " ".join(text_values)
except Exception as e:
print(f"Error preprocessing JSON: {e}")
return ""
def get_all_tours(self):
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("""
SELECT
t.tour_id,
t.title,
t.duration,
t.departure_location,
t.description,
t.destination,
t.region,
t.itinerary,
t.max_participants,
MIN(d.price_adult) as min_price,
MAX(d.price_adult) as max_price,
AVG(d.price_adult) as avg_price
FROM
Tour t
LEFT JOIN
Departure d ON t.tour_id = d.tour_id AND d.availability = true
WHERE
t.availability = true
GROUP BY
t.tour_id, t.title, t.duration, t.departure_location,
t.description, t.destination, t.region, t.itinerary, t.max_participants
""")
return cursor.fetchall()
def get_user_history(self, user_id):
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("""
SELECT
h.tour_id,
COUNT(*) as interaction_count,
MAX(h.timestamp) as last_interaction
FROM
History h
WHERE
h.user_id = %s
GROUP BY
h.tour_id
ORDER BY
interaction_count DESC, last_interaction DESC
""", (user_id,))
return cursor.fetchall()
def get_tour_by_id(self, tour_id):
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("""
SELECT
t.tour_id,
t.title,
t.duration,
t.departure_location,
t.description,
t.destination,
t.region,
t.itinerary,
t.max_participants,
MIN(d.price_adult) as min_price,
MAX(d.price_adult) as max_price,
AVG(d.price_adult) as avg_price
FROM
Tour t
LEFT JOIN
Departure d ON t.tour_id = d.tour_id AND d.availability = true
WHERE
t.tour_id = %s
GROUP BY
t.tour_id, t.title, t.duration, t.departure_location,
t.description, t.destination, t.region, t.itinerary, t.max_participants
""", (tour_id,))
return cursor.fetchone()
def extract_duration_days(self, duration):
if not duration:
return 0
numbers = re.findall(r'\d+', duration)
if numbers:
return int(numbers[0])
return 0
def calculate_price_similarity(self, price1, price2):
if not price1 or not price2:
return 0.5
price1 = float(price1)
price2 = float(price2)
max_price = max(price1, price2)
min_price = min(price1, price2)
if max_price == 0:
return 1.0
ratio = min_price / max_price
return ratio
def create_tour_features(self, tours):
tour_features = {}
for tour in tours:
title = self.preprocess_text(tour.get('title', ''))
description = self.preprocess_text(tour.get('description', ''))
departure_location = self.preprocess_text(tour.get('departure_location', ''))
destination = self.preprocess_list(tour.get('destination', []))
region = self.preprocess_text(str(tour.get('region', '')))
duration = self.preprocess_text(tour.get('duration', ''))
itinerary = self.preprocess_json(tour.get('itinerary'))
attractions = self.extract_attractions_from_itinerary(tour.get('itinerary'))
combined_features = (
f"{title} " * int(self.field_weights['title'] * 20) +
f"{destination} " * int(self.field_weights['destination'] * 20) +
f"{description} " * int(self.field_weights['description'] * 20) +
f"{departure_location} " * int(self.field_weights['departure_location'] * 20) +
f"{region} " * int(self.field_weights['region'] * 20) +
f"{itinerary} " * int(self.field_weights['itinerary'] * 20) +
f"{duration} " * int(self.field_weights['duration'] * 20) +
f"{attractions} " * int(self.field_weights['attractions'] * 20)
)
tour_features[tour['tour_id']] = combined_features.strip()
return tour_features
def calculate_enhanced_similarity(self, tours):
tour_features = self.create_tour_features(tours)
tour_ids = list(tour_features.keys())
feature_texts = [tour_features[tour_id] for tour_id in tour_ids]
if not feature_texts or all(not text.strip() for text in feature_texts):
return {}
try:
tfidf_matrix = self.vectorizer.fit_transform(feature_texts)
text_similarity = cosine_similarity(tfidf_matrix, tfidf_matrix)
except Exception as e:
print(f"Error in TF-IDF calculation: {e}")
return {}
tour_lookup = {tour['tour_id']: tour for tour in tours}
similarity_dict = {}
for i, tour_id in enumerate(tour_ids):
similarity_dict[tour_id] = {}
tour_i = tour_lookup[tour_id]
for j, other_tour_id in enumerate(tour_ids):
if i == j:
continue
tour_j = tour_lookup[other_tour_id]
text_sim = text_similarity[i][j]
region_i = tour_i.get('region', 1)
region_j = tour_j.get('region', 1)
region_sim = self.region_proximity.get(region_i, {}).get(region_j, 0.3)
duration_i = self.extract_duration_days(tour_i.get('duration'))
duration_j = self.extract_duration_days(tour_j.get('duration'))
duration_sim = 1.0 if duration_i == duration_j else 0.7 if abs(duration_i - duration_j) <= 1 else 0.3
price_i = tour_i.get('avg_price')
price_j = tour_j.get('avg_price')
price_sim = self.calculate_price_similarity(price_i, price_j)
final_similarity = (
text_sim * 0.6 +
region_sim * 0.2 +
duration_sim * 0.1 +
price_sim * 0.1
)
similarity_dict[tour_id][other_tour_id] = final_similarity
return similarity_dict
def recommend_similar_tours(self, tour_id, limit=3):
all_tours = self.get_all_tours()
target_tour = None
for tour in all_tours:
if tour.get('tour_id') == tour_id:
target_tour = tour
break
if not target_tour:
return []
similarity_dict = self.calculate_enhanced_similarity(all_tours)
if tour_id in similarity_dict:
similar_tours = sorted(
similarity_dict[tour_id].items(),
key=lambda x: x[1],
reverse=True
)[:limit]
recommended_tours = []
for similar_tour_id, similarity_score in similar_tours:
for tour in all_tours:
if tour.get('tour_id') == similar_tour_id:
tour_copy = dict(tour)
tour_copy['similarity_score'] = float(similarity_score)
recommended_tours.append(tour_copy)
break
return recommended_tours
return []
def recommend_for_user(self, user_id, limit=3):
user_history = self.get_user_history(user_id)
if not user_history:
return self.recommend_popular_tours(limit)
all_tours = self.get_all_tours()
similarity_dict = self.calculate_enhanced_similarity(all_tours)
tour_scores = {}
total_interactions = sum(h['interaction_count'] for h in user_history)
for tour in all_tours:
tour_id = tour.get('tour_id')
if tour_id is None or any(h['tour_id'] == tour_id for h in user_history):
continue
total_similarity = 0
total_weight = 0
for history_item in user_history:
history_tour_id = history_item['tour_id']
interaction_weight = history_item['interaction_count'] / total_interactions
if (history_tour_id in similarity_dict and
tour_id in similarity_dict[history_tour_id]):
similarity = similarity_dict[history_tour_id][tour_id]
total_similarity += similarity * interaction_weight
total_weight += interaction_weight
if total_weight > 0:
tour_scores[tour_id] = total_similarity / total_weight
user_regions = set()
for history_item in user_history:
for tour in all_tours:
if tour['tour_id'] == history_item['tour_id']:
user_regions.add(tour.get('region'))
break
for tour_id, score in tour_scores.items():
for tour in all_tours:
if tour['tour_id'] == tour_id:
if tour.get('region') not in user_regions:
tour_scores[tour_id] = score * 1.1
break
top_tours = sorted(
tour_scores.items(),
key=lambda x: x[1],
reverse=True
)[:limit]
recommended_tours = []
for tour_id, similarity_score in top_tours:
for tour in all_tours:
if tour['tour_id'] == tour_id:
tour_copy = dict(tour)
tour_copy['similarity_score'] = float(similarity_score)
recommended_tours.append(tour_copy)
break
return recommended_tours
def recommend_popular_tours(self, limit=3):
with self.conn.cursor(cursor_factory=RealDictCursor) as cursor:
cursor.execute("""
SELECT
t.tour_id,
t.title,
t.duration,
t.departure_location,
t.description,
t.destination,
t.region,
COUNT(DISTINCT b.booking_id) as booking_count,
AVG(r.average_rating) as avg_rating,
COUNT(DISTINCT r.review_id) as review_count
FROM
Tour t
LEFT JOIN
Departure d ON t.tour_id = d.tour_id
LEFT JOIN
Booking b ON d.departure_id = b.departure_id
LEFT JOIN
Review r ON t.tour_id = r.tour_id
WHERE
t.availability = true
GROUP BY
t.tour_id, t.title, t.duration, t.departure_location,
t.description, t.destination, t.region
ORDER BY
(COUNT(DISTINCT b.booking_id) * 0.6 +
COALESCE(AVG(r.average_rating), 3.0) * COUNT(DISTINCT r.review_id) * 0.4) DESC
LIMIT %s
""", (limit,))
popular_tours = cursor.fetchall()
for tour in popular_tours:
tour['similarity_score'] = None
return popular_tours
def get_recommendations(self, user_id=None, tour_id=None, limit=3):
if tour_id:
return self.recommend_similar_tours(tour_id, limit)
elif user_id:
return self.recommend_for_user(user_id, limit)
else:
return self.recommend_popular_tours(limit)
def get_db_connection():
try:
from src.database import conn_pool
return conn_pool.getconn()
except Exception as e:
print(f"Error getting connection from pool: {e}")
try:
try:
from src.config import DB_USER, DB_PASSWORD, DB_HOST, DB_PORT, DB_NAME
conn = psycopg2.connect(
user=DB_USER,
password=DB_PASSWORD,
host=DB_HOST,
port=DB_PORT,
dbname=DB_NAME
)
except ImportError:
import os
conn = psycopg2.connect(
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"),
host=os.getenv("DB_HOST"),
port=os.getenv("DB_PORT"),
dbname=os.getenv("DB_NAME")
)
return conn
except Exception as e2:
print(f"Error creating direct connection: {e2}")
raise
def return_db_connection(conn):
try:
from src.database import conn_pool
conn_pool.putconn(conn)
except Exception as e:
print(f"Error returning connection to pool: {e}")
try:
conn.close()
except:
pass
def convert_to_tour_summary(tour):
return TourSummary(
tour_id=tour.get('tour_id'),
title=tour.get('title', ''),
duration=tour.get('duration'),
departure_location=tour.get('departure_location'),
destination=tour.get('destination'),
region=tour.get('region'),
description=tour.get('description'),
similarity_score=tour.get('similarity_score')
)
def get_tour_recommendations(user_id=None, tour_id=None, limit=3):
conn = None
try:
conn = get_db_connection()
recommender = ContentBasedRecommender(conn)
recommended_tours = recommender.get_recommendations(user_id, tour_id, limit)
tour_summaries = [convert_to_tour_summary(tour) for tour in recommended_tours]
response = TourRecommendationResponse(
recommendations=tour_summaries,
recommendation_type="content-based"
)
return response
except Exception as e:
print(f"Error getting recommendations: {e}")
raise
finally:
if conn:
return_db_connection(conn)