| import asyncio
|
| import random
|
| import re
|
| import aiohttp
|
| from bs4 import BeautifulSoup
|
| from typing import Dict, Any, List, Optional, Tuple
|
| import logging
|
| from urllib.parse import urljoin, quote
|
| from dotenv import load_dotenv
|
| import os
|
|
|
| from .utils.http_utils import fetch_page
|
| from .utils.image_utils import filter_logo_images, is_logo_image
|
| from .utils.html_utils import extract_rating_from_element, extract_images_from_soup
|
| from .utils.google_search_utils import fetch_hotel_images_from_google
|
|
|
|
|
| load_dotenv()
|
|
|
| logger = logging.getLogger(__name__)
|
|
|
| class BookingService:
|
| """Service for scraping hotel data from Booking.com"""
|
|
|
| def __init__(self):
|
|
|
| self.user_agents = [
|
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
|
| "Mozilla/5.0 (Macintosh; Intel Mac OS X 14_5) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.4 Safari/605.1.15",
|
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
|
| "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/123.0.0.0 Safari/537.36",
|
| "Mozilla/5.0 (iPhone; CPU iPhone OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Mobile/15E148 Safari/604.1",
|
| "Mozilla/5.0 (iPad; CPU OS 17_4 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) CriOS/123.0.0.0 Mobile/15E148 Safari/604.1",
|
| "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Edge/123.0.0.0 Safari/537.36",
|
| "Mozilla/5.0 (Windows NT 10.0; WOW64; Trident/7.0; rv:11.0) like Gecko"
|
| ]
|
|
|
|
|
| self.headers = {
|
| "Accept-Language": "en-US,en;q=0.9",
|
| "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8",
|
| "Connection": "keep-alive",
|
| "Upgrade-Insecure-Requests": "1",
|
| "sec-ch-ua": '"Google Chrome";v="123", "Not:A-Brand";v="99"',
|
| "sec-ch-ua-mobile": "?0",
|
| "sec-ch-ua-platform": '"Windows"',
|
| }
|
|
|
|
|
| self.google_api_available = bool(os.getenv("GOOGLE_SEARCH_API_KEY") and os.getenv("GOOGLE_SEARCH_ENGINE_ID"))
|
|
|
| logger.info(f"BookingService initialized at 2025-05-21 15:22:38 by Garvit-Nagok")
|
| if self.google_api_available:
|
| logger.info("Google Custom Search API configured as fallback for hotel images")
|
| else:
|
| logger.warning("Google Custom Search API credentials not found - fallback will not be available")
|
|
|
|
|
|
|
| async def get_page(self, session: aiohttp.ClientSession, url: str) -> Optional[str]:
|
| """Wrapper for fetch_page with rotating user agents"""
|
|
|
| current_headers = self.headers.copy()
|
| current_headers["User-Agent"] = random.choice(self.user_agents)
|
|
|
| logger.debug(f"Using user agent: {current_headers['User-Agent'][:30]}...")
|
| return await fetch_page(session, url, current_headers)
|
|
|
| async def extract_amenities(self, session: aiohttp.ClientSession, hotel_element, hotel_url: Optional[str] = None) -> List[str]:
|
| """Extract popular facilities from hotel detail page"""
|
| unique_amenities = set()
|
|
|
| if hotel_url:
|
| try:
|
| html = await self.get_page(session, hotel_url)
|
| if html:
|
| soup = BeautifulSoup(html, 'html.parser')
|
|
|
| popular_heading = soup.find(string=lambda text: text and text.strip() == "Most popular facilities")
|
|
|
| if popular_heading:
|
| current = popular_heading.parent
|
| container = None
|
|
|
|
|
| for _ in range(3):
|
| if not current:
|
| break
|
|
|
| if current.select("svg") or current.select("img"):
|
| container = current
|
| break
|
|
|
| parent = current.parent
|
| if parent and (parent.select("svg") or parent.select("img")):
|
| container = parent
|
| break
|
|
|
| sibling = current.find_next_sibling()
|
| if sibling and (sibling.select("svg") or sibling.select("img")):
|
| container = sibling
|
| break
|
|
|
| current = parent
|
|
|
| if not container:
|
| heading_parent = popular_heading.parent
|
| if heading_parent:
|
| container = heading_parent.find_next_sibling()
|
|
|
|
|
| if container:
|
| facility_items = container.select("span") or container.select("div")
|
|
|
| for item in facility_items:
|
| text = item.get_text().strip()
|
| if text and text != "Most popular facilities" and len(text) < 30:
|
| unique_amenities.add(text)
|
|
|
|
|
| if not unique_amenities:
|
| try:
|
| rows = soup.select(".f6b6d2a959") or soup.select_one("div:-soup-contains('Most popular facilities')").parent.find_next_sibling().select("span")
|
|
|
| for item in rows:
|
| text = item.get_text().strip()
|
| if text and text != "Most popular facilities" and len(text) < 30:
|
| unique_amenities.add(text)
|
| except AttributeError:
|
| logger.debug("Could not find facilities using fallback selector")
|
| except Exception as e:
|
| logger.error(f"Error extracting amenities: {e}")
|
|
|
| return list(unique_amenities)
|
|
|
| async def get_room_images_from_detail_page(self, session: aiohttp.ClientSession, url: str) -> List[str]:
|
| """Get a mix of property and room images from hotel detail page"""
|
| all_images = []
|
|
|
| try:
|
| html = await self.get_page(session, url)
|
| if html:
|
| soup = BeautifulSoup(html, 'html.parser')
|
|
|
| selectors = [
|
| ".bui-carousel__item img", ".bh-photo-grid img",
|
| ".hp-gallery img", ".hotel-photos img",
|
| ".room-gallery img", ".hotel-room-photographs-slides img",
|
| "img.active-image", ".gallery-mosaic img", ".tour-360__image img",
|
| "img[width='300'], img[width='350'], img[width='400'], img[width='500']",
|
| ]
|
|
|
| all_images = extract_images_from_soup(soup, url, selectors)
|
|
|
| if len(all_images) < 5:
|
| for img in soup.select("img"):
|
| width = img.get("width")
|
| if width and int(width) < 100:
|
| continue
|
|
|
| src = img.get("src") or img.get("data-src")
|
| if src and not is_logo_image(src) and src not in all_images:
|
| if not src.startswith("http"):
|
| src = urljoin(url, src)
|
| all_images.append(src)
|
| if len(all_images) >= 5:
|
| break
|
|
|
| return filter_logo_images(all_images)[:5]
|
|
|
| except Exception as e:
|
| logger.error(f"Error getting hotel images: {e}", exc_info=True)
|
|
|
| return all_images[:5] if all_images else []
|
|
|
| async def extract_rating_from_detail_page(self, session: aiohttp.ClientSession, url: str) -> Optional[float]:
|
| """Extract rating from hotel detail page"""
|
| try:
|
| html = await self.get_page(session, url)
|
| if not html:
|
| return None
|
|
|
| soup = BeautifulSoup(html, 'html.parser')
|
|
|
| guest_reviews_section = soup.find("h2", string="Guest reviews")
|
| if guest_reviews_section:
|
| rating_div = soup.select_one("div[aria-label*='Scored'] strong") or soup.select_one(".b5cd09854e")
|
| if rating_div:
|
| text = rating_div.get_text().strip()
|
| match = re.search(r"(\d+[.,]\d+)", text)
|
| if match:
|
| return float(match.group(1).replace(',', '.'))
|
|
|
| nearby_elements = guest_reviews_section.parent.select("div")
|
| for elem in nearby_elements:
|
| text = elem.get_text().strip()
|
| if re.match(r"^\d+[.,]\d+$", text):
|
| return float(text.replace(',', '.'))
|
|
|
| score_elements = soup.select(".review-score-badge, .b5cd09854e")
|
| for elem in score_elements:
|
| text = elem.get_text().strip()
|
| match = re.search(r"(\d+[.,]\d+)", text)
|
| if match:
|
| return float(match.group(1).replace(',', '.'))
|
|
|
| review_text = soup.find(string=lambda text: text and ("Review score" in text))
|
| if review_text:
|
| parent_text = review_text.parent.get_text() if review_text.parent else ""
|
| match = re.search(r"(\d+[.,]\d+)", parent_text)
|
| if match:
|
| return float(match.group(1).replace(',', '.'))
|
|
|
| except Exception as e:
|
| logger.error(f"Error extracting rating: {e}")
|
|
|
| return None
|
|
|
| def extract_rating(self, hotel_element) -> Optional[float]:
|
| """Extract rating from hotel element"""
|
| return extract_rating_from_element(hotel_element)
|
|
|
| def is_name_similar(self, name1: str, name2: str) -> bool:
|
| """Check if two hotel names are similar enough"""
|
| if not name1 or not name2:
|
| return False
|
|
|
| name1 = name1.lower()
|
| name2 = name2.lower()
|
|
|
| if name1 in name2 or name2 in name1:
|
| return True
|
|
|
|
|
| words1 = set(re.findall(r'\w+', name1))
|
| words2 = set(re.findall(r'\w+', name2))
|
|
|
| if not words1 or not words2:
|
| return False
|
|
|
|
|
| common_words = words1.intersection(words2)
|
| similarity = len(common_words) / min(len(words1), len(words2))
|
|
|
| return similarity >= 0.5
|
|
|
| async def search_hotel(self, session: aiohttp.ClientSession, destination: str, hotel_name: str) -> Dict[str, Any]:
|
| """Search for a specific hotel on Booking.com"""
|
| search_query = f"{hotel_name} {destination}"
|
| search_url = f"https://www.booking.com/search.html?ss={quote(search_query)}"
|
|
|
| html = await self.get_page(session, search_url)
|
|
|
| if not html:
|
| return {
|
| "destination": destination,
|
| "hotel_name": hotel_name,
|
| "error": "Failed to retrieve search results"
|
| }
|
|
|
| soup = BeautifulSoup(html, 'html.parser')
|
| hotel_cards = soup.select("[data-testid='property-card'], .sr_property_block, .sr_item")
|
|
|
| if not hotel_cards:
|
| return {
|
| "destination": destination,
|
| "hotel_name": hotel_name,
|
| "error": "No hotels found"
|
| }
|
|
|
|
|
| hotel_card = None
|
| for card in hotel_cards:
|
| name_elem = card.select_one("[data-testid='title'], .sr-hotel__name, .hotel_name")
|
| if name_elem:
|
| card_hotel_name = name_elem.text.strip()
|
| if self.is_name_similar(card_hotel_name, hotel_name):
|
| hotel_card = card
|
| break
|
|
|
| if not hotel_card:
|
| hotel_card = hotel_cards[0]
|
|
|
| name_elem = hotel_card.select_one("[data-testid='title'], .sr-hotel__name, .hotel_name")
|
| name = name_elem.text.strip() if name_elem else hotel_name
|
| rating = self.extract_rating(hotel_card)
|
|
|
| link_elem = hotel_card.select_one("a[href*='hotel'], a.hotel_name_link")
|
| hotel_url = ""
|
| if link_elem and 'href' in link_elem.attrs:
|
| href = link_elem['href']
|
| hotel_url = urljoin("https://www.booking.com", href) if not href.startswith('http') else href
|
|
|
| if hotel_url:
|
| tasks = [
|
| self.extract_rating_from_detail_page(session, hotel_url),
|
| self.get_room_images_from_detail_page(session, hotel_url),
|
| self.extract_amenities(session, hotel_card, hotel_url)
|
| ]
|
|
|
| detail_rating, images, amenities = await asyncio.gather(*tasks)
|
|
|
| if detail_rating is not None:
|
| rating = detail_rating
|
| else:
|
| images = []
|
| amenities = []
|
|
|
|
|
| if not images and self.google_api_available:
|
| logger.info(f"No images found via scraping for {hotel_name} in {destination}. Using Google API as fallback.")
|
| images = await fetch_hotel_images_from_google(session, hotel_name, destination)
|
|
|
| return {
|
| "destination": destination,
|
| "hotel_name": hotel_name,
|
| "data": {
|
| "name": name,
|
| "rating": rating,
|
| "images": images,
|
| "amenities": amenities,
|
| "booking_link": hotel_url
|
| }
|
| } |