| import requests |
| from typing import List, Dict, Optional, Tuple |
| import time |
| import json |
| from pathlib import Path |
| from datetime import datetime |
|
|
| class OverpassGeocoder: |
| def __init__(self, output_dir: str = "output"): |
| self.overpass_url = "https://overpass-api.de/api/interpreter" |
| self.nominatim_url = "https://nominatim.openstreetmap.org" |
| self.headers = {'User-Agent': 'jhbhbbvsio0'} |
| self.output_dir = Path(output_dir) |
| self.output_dir.mkdir(exist_ok=True) |
| |
| def geocode_location(self, location: str) -> Optional[Tuple[float, float]]: |
| """ |
| Chuyển đổi địa điểm (city, country) thành tọa độ |
| """ |
| print(f"Đang tìm tọa độ cho: {location}") |
| |
| params = { |
| 'q': location, |
| 'format': 'json', |
| 'limit': 1 |
| } |
| |
| try: |
| response = requests.get( |
| f"{self.nominatim_url}/search", |
| params=params, |
| headers=self.headers, |
| timeout=10 |
| ) |
| |
| if response.status_code == 200: |
| data = response.json() |
| if data: |
| lat = float(data[0]['lat']) |
| lon = float(data[0]['lon']) |
| print(f"✓ Tìm thấy: {lat}, {lon}") |
| return (lat, lon) |
| |
| except Exception as e: |
| print(f"Lỗi geocode: {e}") |
| |
| return None |
| |
| def get_all_places(self, lat: float, lon: float, radius_meters: int = 3000) -> List[Dict]: |
| """ |
| Lấy tất cả địa điểm trong bán kính |
| """ |
| overpass_query = f""" |
| [out:json][timeout:60]; |
| ( |
| node["amenity"](around:{radius_meters},{lat},{lon}); |
| way["amenity"](around:{radius_meters},{lat},{lon}); |
| node["shop"](around:{radius_meters},{lat},{lon}); |
| way["shop"](around:{radius_meters},{lat},{lon}); |
| node["building"]["name"](around:{radius_meters},{lat},{lon}); |
| way["building"]["name"](around:{radius_meters},{lat},{lon}); |
| node["healthcare"](around:{radius_meters},{lat},{lon}); |
| way["healthcare"](around:{radius_meters},{lat},{lon}); |
| node["leisure"](around:{radius_meters},{lat},{lon}); |
| way["leisure"](around:{radius_meters},{lat},{lon}); |
| node["tourism"](around:{radius_meters},{lat},{lon}); |
| way["tourism"](around:{radius_meters},{lat},{lon}); |
| node["office"](around:{radius_meters},{lat},{lon}); |
| way["office"](around:{radius_meters},{lat},{lon}); |
| ); |
| out center body; |
| """ |
| |
| print(f"Đang truy vấn Overpass API (bán kính {radius_meters}m)...") |
| |
| try: |
| response = requests.post( |
| self.overpass_url, |
| data={'data': overpass_query}, |
| timeout=120 |
| ) |
| response.raise_for_status() |
| data = response.json() |
| |
| elements = data.get('elements', []) |
| print(f"✓ Tìm thấy {len(elements)} địa điểm từ OpenStreetMap") |
| return elements |
| |
| except Exception as e: |
| print(f"Lỗi Overpass API: {e}") |
| return [] |
| |
| def reverse_geocode(self, lat: float, lon: float) -> Optional[Dict]: |
| """ |
| Lấy địa chỉ chính xác từ tọa độ |
| """ |
| params = { |
| 'lat': lat, |
| 'lon': lon, |
| 'format': 'json', |
| 'addressdetails': 1, |
| 'zoom': 18 |
| } |
| |
| try: |
| response = requests.get( |
| f"{self.nominatim_url}/reverse", |
| params=params, |
| headers=self.headers, |
| timeout=10 |
| ) |
| |
| if response.status_code == 200: |
| return response.json() |
| |
| except Exception as e: |
| pass |
| |
| return None |
| |
| def get_coordinates(self, element: Dict) -> Optional[Tuple[float, float]]: |
| """ |
| Lấy tọa độ từ element |
| """ |
| if element['type'] == 'node': |
| return (element.get('lat'), element.get('lon')) |
| elif element['type'] == 'way' and 'center' in element: |
| return (element['center'].get('lat'), element['center'].get('lon')) |
| return None |
| |
| def format_address(self, name: str, reverse_data: Dict, seed_city: str, country: str) -> str: |
| """ |
| Format địa chỉ từ reverse geocoding |
| """ |
| if not reverse_data: |
| return f"{name}, {seed_city}, {country}" |
| |
| address = reverse_data.get('address', {}) |
| |
| road = address.get('road', '') |
| house_number = address.get('house_number', '') |
| suburb = address.get('suburb', '') |
| |
| |
| city = (address.get('city') or |
| address.get('town') or |
| address.get('village') or |
| address.get('municipality') or |
| seed_city) |
| |
| parts = [name] |
| |
| if road: |
| if house_number: |
| parts.append(f"{house_number} {road}") |
| else: |
| parts.append(road) |
| |
| |
| if city and city != seed_city: |
| parts.append(city) |
| elif city == seed_city: |
| parts.append(city) |
| |
| if suburb and suburb != city: |
| parts.append(suburb) |
| |
| parts.append(country) |
| |
| return ', '.join(parts) |
| |
| def format_element(self, element: Dict) -> Optional[Dict]: |
| """ |
| Format element từ Overpass |
| """ |
| tags = element.get('tags', {}) |
| |
| name = (tags.get('name') or |
| tags.get('name:en') or |
| tags.get('brand') or |
| tags.get('operator')) |
| |
| if not name: |
| return None |
| |
| category = (tags.get('amenity') or |
| tags.get('shop') or |
| tags.get('building') or |
| tags.get('healthcare') or |
| tags.get('leisure') or |
| tags.get('tourism') or |
| tags.get('office') or |
| 'unknown') |
| |
| coords = self.get_coordinates(element) |
| |
| if not coords or not coords[0] or not coords[1]: |
| return None |
| |
| return { |
| 'id': element.get('id'), |
| 'type': element.get('type'), |
| 'name': name, |
| 'category': category, |
| 'lat': coords[0], |
| 'lon': coords[1], |
| 'tags': tags |
| } |
| |
| def process_places(self, elements: List[Dict], seed_city: str, country: str, |
| only_seed_city: bool = True, delay: float = 1.0) -> List[Dict]: |
| """ |
| Xử lý và lấy địa chỉ cho tất cả địa điểm |
| |
| Args: |
| only_seed_city: Nếu True, chỉ giữ địa điểm trong seed city |
| """ |
| |
| unique_places = {} |
| for elem in elements: |
| formatted = self.format_element(elem) |
| if formatted and formatted['name'] not in unique_places: |
| unique_places[formatted['name']] = formatted |
| |
| places = list(unique_places.values()) |
| total = len(places) |
| |
| print(f"\nĐang lấy địa chỉ cho {total} địa điểm duy nhất...") |
| print("=" * 80) |
| |
| results = [] |
| filtered_count = 0 |
| c=0 |
| for i, place in enumerate(places, 1): |
| print(f"[{i}/{total}] {place['name'][:50]:<50}", end='\r') |
| |
| reverse_data = self.reverse_geocode(place['lat'], place['lon']) |
| c+=1 |
| if c>100: |
| break |
| |
| if only_seed_city and reverse_data: |
| actual_city = (reverse_data.get('address', {}).get('city') or |
| reverse_data.get('address', {}).get('town') or |
| reverse_data.get('address', {}).get('village') or |
| reverse_data.get('address', {}).get('municipality') or |
| seed_city) |
| |
| |
| if actual_city.lower() != seed_city.lower(): |
| filtered_count += 1 |
| time.sleep(delay) |
| continue |
| |
| address = self.format_address(place['name'], reverse_data, seed_city, country) |
| |
| place['address'] = address |
| place['reverse_data'] = reverse_data.get('address', {}) if reverse_data else {} |
| results.append(place) |
| |
| time.sleep(delay) |
| |
| print(f"\n{'✓ Hoàn thành!':<80}") |
| if filtered_count > 0: |
| print(f" → Đã lọc bỏ {filtered_count} địa điểm ngoài {seed_city}") |
| |
| return results |
| |
| def save_results(self, places: List[Dict], location: str): |
| """ |
| Lưu kết quả vào output folder |
| """ |
| timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") |
| safe_location = location.replace(',', '_').replace(' ', '_') |
| |
| |
| places.sort(key=lambda x: x['name']) |
| |
| |
| txt_file = self.output_dir / f"{safe_location}_addresses_{timestamp}.txt" |
| with open(txt_file, 'w', encoding='utf-8') as f: |
| for place in places: |
| f.write(place['address'] + '\n') |
| print(f"✓ Đã lưu {len(places)} địa chỉ vào: {txt_file}") |
| |
| |
| json_file = self.output_dir / f"{safe_location}_places_{timestamp}.json" |
| with open(json_file, 'w', encoding='utf-8') as f: |
| json.dump(places, f, ensure_ascii=False, indent=2) |
| print(f"✓ Đã lưu chi tiết vào: {json_file}") |
| |
| return txt_file, json_file |
| |
| def print_statistics(self, places: List[Dict], city: str): |
| """ |
| In thống kê |
| """ |
| print("\n" + "=" * 80) |
| print("THỐNG KÊ:") |
| print("=" * 80) |
| |
| with_city = sum(1 for p in places if city in p['address']) |
| print(f"Có '{city}': {with_city}/{len(places)}") |
| |
| category_count = {} |
| for place in places: |
| cat = place['category'] |
| category_count[cat] = category_count.get(cat, 0) + 1 |
| |
| print("\nTheo loại:") |
| for cat, count in sorted(category_count.items(), key=lambda x: x[1], reverse=True)[:10]: |
| print(f" {cat:30s}: {count:3d}") |
| |
| def run(self, location: str, radius_meters: int = 3000, delay: float = 1.0): |
| """ |
| Chạy toàn bộ quy trình |
| |
| Args: |
| location: Định dạng "City, Country" (vd: "Victoria, Seychelles") |
| radius_meters: Bán kính tìm kiếm (mặc định 3000m) |
| delay: Thời gian chờ giữa các request (mặc định 1.0s) |
| """ |
| print("=" * 80) |
| print(f"GEOCODER - {location.upper()}") |
| print("=" * 80) |
| |
| |
| coords = self.geocode_location(location) |
| if not coords: |
| print("✗ Không tìm thấy tọa độ cho địa điểm này!") |
| return |
| |
| lat, lon = coords |
| city = location.split(',')[0].strip() |
| country = location.split(',')[-1].strip() |
| |
| |
| elements = self.get_all_places(lat, lon, radius_meters) |
| if not elements: |
| print("✗ Không tìm thấy địa điểm nào!") |
| return |
| |
| |
| places = self.process_places(elements, city, country, delay) |
| |
| |
| self.save_results(places, location) |
| |
| |
| self.print_statistics(places, city) |
| |
| print("\n" + "=" * 80) |
| print("✓ HOÀN THÀNH!") |
| print("=" * 80) |
|
|
|
|
| if __name__ == "__main__": |
| |
| geocoder = OverpassGeocoder(output_dir="output") |
| |
| |
| geocoder.run( |
| location="Grytviken, South Georgia and the South Sandwich Islands", |
| radius_meters=3000, |
| delay=1.0 |
| ) |
| |
| |
| |
| |