|
import json |
|
from datetime import date |
|
from .llm import llm |
|
from .prompts import ner_prompt |
|
from .database import search_tours_db, get_available_locations |
|
import dateparser |
|
from bs4 import BeautifulSoup |
|
|
|
_cached_locations = None |
|
_locations_fetched_date = None |
|
|
|
def fetch_locations_tool(): |
|
global _cached_locations, _locations_fetched_date |
|
today = date.today() |
|
if _cached_locations is None or _locations_fetched_date != today: |
|
_cached_locations = get_available_locations() |
|
_locations_fetched_date = today |
|
return _cached_locations if _cached_locations else [] |
|
|
|
def get_available_tours_for_destination(destination: str, limit: int = 5) -> list: |
|
""" |
|
Get available tours for a specific destination when user mentions wanting to go there. |
|
This helps show proactive tour recommendations. |
|
""" |
|
try: |
|
from .database import execute_query |
|
|
|
query = """ |
|
SELECT |
|
t.tour_id, |
|
t.title, |
|
t.duration, |
|
t.departure_location, |
|
t.destination, |
|
t.region, |
|
t.itinerary, |
|
t.max_participants, |
|
d.departure_id, |
|
d.start_date, |
|
d.price_adult, |
|
d.price_child_120_140, |
|
d.price_child_100_120, |
|
p.promotion_id, |
|
p.name AS promotion_name, |
|
p.type AS promotion_type, |
|
p.discount AS promotion_discount, |
|
p.start_date AS promotion_start_date, |
|
p.end_date AS promotion_end_date |
|
FROM Departure d |
|
JOIN Tour t ON d.tour_id = t.tour_id |
|
LEFT JOIN Tour_Promotion tp ON t.tour_id = tp.tour_id |
|
LEFT JOIN Promotion p ON tp.promotion_id = p.promotion_id |
|
AND d.start_date BETWEEN p.start_date AND p.end_date |
|
AND p.status = 'active' |
|
WHERE t.availability = true AND d.availability = true |
|
AND t.destination && %s::text[] |
|
ORDER BY d.start_date ASC, t.title |
|
LIMIT %s; |
|
""" |
|
|
|
results = execute_query(query, ([destination], limit)) |
|
|
|
if results is None: |
|
return [] |
|
|
|
formatted_results = format_itineraries(results) |
|
|
|
return formatted_results |
|
except Exception as e: |
|
return [] |
|
|
|
def format_itineraries(tours_array): |
|
if not tours_array: |
|
return [] |
|
|
|
formatted_tours = [] |
|
for tour in tours_array: |
|
if not tour.get('tour_id'): |
|
continue |
|
|
|
if isinstance(tour.get('itinerary'), list): |
|
itinerary_str = "" |
|
days = sorted(tour['itinerary'], key=lambda x: x.get('day_number', 0)) |
|
|
|
for day in days: |
|
day_number = day.get('day_number', '') |
|
title = day.get('title', '') |
|
description_html = day.get('description', '') |
|
|
|
try: |
|
soup = BeautifulSoup(description_html, 'html.parser') |
|
description_text = soup.get_text(separator=' ') |
|
except: |
|
description_text = description_html |
|
itinerary_str += f"Ngày {day_number}: {title}\n{description_text}\n\n" |
|
tour['itinerary'] = itinerary_str.strip() |
|
formatted_tours.append(tour) |
|
return formatted_tours |
|
|
|
def extract_entities_tool(user_query: str, current_date_str: str) -> dict: |
|
locations = fetch_locations_tool() |
|
if not locations: |
|
pass |
|
|
|
prompt = ner_prompt.format( |
|
current_date=current_date_str, |
|
locations=", ".join(locations), |
|
question=user_query |
|
) |
|
|
|
try: |
|
from .llm import llm |
|
if llm is None: |
|
return {"error": "LLM not available"} |
|
|
|
ai_message = llm.invoke(prompt) |
|
content = ai_message.content |
|
|
|
if content.startswith("```json"): |
|
content = content[7:] |
|
if content.endswith("```"): |
|
content = content[:-3] |
|
content = content.strip() |
|
|
|
entities = json.loads(content) |
|
return entities |
|
|
|
except json.JSONDecodeError as e: |
|
try: |
|
import re |
|
match = re.search(r'\{.*\}', content, re.DOTALL) |
|
if match: |
|
potential_json = match.group(0) |
|
entities = json.loads(potential_json) |
|
return entities |
|
else: |
|
return {"error": "Invalid JSON response from LLM", "raw_output": content} |
|
except Exception as inner_e: |
|
return {"error": "Invalid JSON response from LLM", "raw_output": content} |
|
except Exception as e: |
|
return {"error": str(e)} |
|
|
|
def search_tours_tool(entities: dict) -> list: |
|
if not isinstance(entities, dict) or "error" in entities: |
|
return [] |
|
|
|
if not any(key in entities for key in ['region', 'destination', 'duration', 'time', 'budget', 'number_of_people']): |
|
return [] |
|
|
|
try: |
|
search_results = search_tours_db(entities) |
|
|
|
if search_results is None: |
|
return [] |
|
|
|
formatted_results = format_itineraries(search_results) |
|
return formatted_results |
|
except Exception as e: |
|
return [] |