Spaces:
Sleeping
Sleeping
import gradio as gr | |
import os | |
import csv | |
import json | |
import time | |
from bs4 import BeautifulSoup | |
from selenium.webdriver.chrome.options import Options | |
from selenium.webdriver.common.by import By | |
from selenium.webdriver.support.ui import WebDriverWait | |
from selenium.webdriver.support import expected_conditions as EC | |
from selenium.common.exceptions import NoSuchElementException, TimeoutException, StaleElementReferenceException, ElementClickInterceptedException | |
import re | |
from urllib.parse import urlparse, urljoin | |
import traceback | |
import io | |
import contextlib | |
from datetime import datetime | |
import threading | |
import pandas as pd | |
import tempfile | |
# --- WebDriverの選択 --- | |
IN_COLAB = 'google.colab' in str(globals().get('get_ipython', '')) | |
if IN_COLAB: | |
print("Google Colab環境を検出。google_colab_selenium を使用します。") | |
try: import google_colab_selenium as gs | |
except ImportError: print("google_colab_seleniumが見つかりません。!pip install google-colab-selenium を実行してください。"); gs = None | |
else: | |
print("ローカル環境を検出。通常の selenium webdriver を使用します。") | |
from selenium import webdriver | |
gs = None | |
try: | |
from selenium.webdriver.chrome.service import Service as ChromeService | |
from webdriver_manager.chrome import ChromeDriverManager | |
except ImportError: | |
print("webdriver-manager が見つかりません。 `pip install webdriver-manager` を実行してください。") | |
ChromeService = None | |
ChromeDriverManager = None | |
# --- 中断フラグ --- | |
interrupt_event = threading.Event() | |
# --- Helper Functions (From Script 1) --- | |
def find_prefixed_data_string(data_structure): | |
"""データ構造内から ")]}'\n" で始まる文字列を見つける(再帰的検索)""" | |
if isinstance(data_structure, str) and data_structure.startswith(")]}'\n"): | |
return data_structure | |
elif isinstance(data_structure, list): | |
for item in data_structure: | |
if interrupt_event.is_set(): return None # 中断チェック | |
found = find_prefixed_data_string(item) | |
if found: | |
return found | |
elif isinstance(data_structure, dict): | |
for value in data_structure.values(): | |
if interrupt_event.is_set(): return None # 中断チェック | |
found = find_prefixed_data_string(value) | |
if found: | |
return found | |
return None | |
def find_details_data_by_id_or_heuristic(data_list, place_id=None): | |
""" | |
JSONデータリストから詳細情報を含む可能性のあるリストを特定する。 | |
place_idがあればそれを優先し、なければヒューリスティック(住所形式など)で探す。 | |
""" | |
if not isinstance(data_list, list): | |
return None | |
if interrupt_event.is_set(): return None # 中断チェック | |
potential_candidates = [] | |
for item in data_list: | |
if interrupt_event.is_set(): return None # 中断チェック | |
# 詳細データは通常、要素数が比較的多いリスト形式 | |
if not isinstance(item, list) or len(item) < 30: | |
continue | |
is_candidate = False | |
# place_id が指定されていれば、リスト内にそのIDが含まれるかチェック | |
if place_id and place_id in str(item): | |
is_candidate = True | |
# place_id がない場合は、住所らしき情報が含まれるかヒューリスティックにチェック | |
elif not place_id: | |
has_address_like = any( | |
isinstance(sub, str) and | |
("〒" in sub or | |
any(k in sub for k in ["都", "道", "府", "県", "市", "区", "町", "村", "丁目", "番地", "号"]) or | |
re.search(r'\d+-\d+-\d+', sub)) | |
for sub in item | |
) | |
if has_address_like: | |
is_candidate = True | |
if is_candidate: | |
potential_candidates.append(item) | |
if not potential_candidates: | |
return None | |
# 候補が1つならそれを返す | |
if len(potential_candidates) == 1: | |
return potential_candidates[0] | |
# 候補が複数ある場合、スコアリングで最もそれらしいものを選ぶ | |
best_candidate = None | |
max_score = -1 | |
for candidate in potential_candidates: | |
if interrupt_event.is_set(): return None # 中断チェック | |
score = len(candidate) # 要素数が多いほど詳細情報の可能性が高い | |
try: | |
# 特定のインデックスにリストが存在するか(構造的な特徴) | |
if any(isinstance(candidate[idx], list) and candidate[idx] for idx in [7, 13, 178] if idx < len(candidate)): | |
score += 50 | |
# URLらしき文字列が含まれるか | |
if 7 < len(candidate) and isinstance(candidate[7], list) and len(candidate[7]) > 0 and isinstance(candidate[7][0], str) and candidate[7][0].startswith('http'): | |
score += 50 | |
# 別の構造的な特徴 | |
if 34 < len(candidate) and isinstance(candidate[34], list) and candidate[34]: | |
score += 30 | |
except Exception: | |
# スコアリング中のエラーは無視 | |
pass | |
if score > max_score: | |
max_score = score | |
best_candidate = candidate | |
return best_candidate | |
def is_domain_like(text): | |
"""文字列がドメイン名らしい形式か簡易的に判定""" | |
if not isinstance(text, str): return False | |
text = text.strip().lower() | |
common_tlds = ['.com', '.jp', '.co.jp', '.net', '.org', '.info', '.biz'] | |
# URLスキーマ、パス、特殊文字、全角文字、IPアドレス形式、前後のドット、連続ドットは除外 | |
if re.search(r'^(https?|ftp)://|[/\\?#\s\u3000-\uFFFF:;@!$%^*()=+]', text): return False | |
if re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', text): return False | |
if text.startswith('.') or text.endswith('.') or '..' in text: return False | |
# ドットを含み、一般的なTLDで終わるかチェック | |
return '.' in text and any(text.endswith(tld) for tld in common_tlds) | |
def safe_get(data, index, default=None): | |
"""ネストされたリストや辞書から安全に値を取得する""" | |
if isinstance(index, int): | |
try: | |
return data[index] if isinstance(data, list) and index < len(data) else default | |
except IndexError: | |
return default | |
elif isinstance(index, list): # インデックスのリストでネストされた要素を取得 | |
current = data | |
for idx in index: | |
if interrupt_event.is_set(): return default # 中断チェック | |
try: | |
if isinstance(current, list) and isinstance(idx, int) and idx < len(current): | |
current = current[idx] | |
elif isinstance(current, dict) and idx in current: | |
current = current[idx] | |
else: | |
return default # 途中でリスト/辞書でない、またはインデックス/キーが存在しない場合 | |
except (IndexError, KeyError, TypeError): | |
return default # その他の予期せぬエラー | |
return current | |
elif isinstance(index, str): # 文字列インデックスは辞書のキーとして扱う | |
return data.get(index, default) if isinstance(data, dict) else default | |
return default | |
# --- 中断チェック付き時間待機関数 --- | |
def interruptible_sleep(duration): | |
"""指定された時間待機するが、中断イベントが発生したら即座に終了する""" | |
interrupt_event.wait(timeout=duration) | |
# waitはタイムアウトするかイベントがセットされると戻る | |
# 呼び出し元で interrupt_event.is_set() をチェックする必要がある | |
# --- HTML抽出関数 (本文抽出を span.wiI7pd 優先に変更、中断チェック追加) --- | |
def extract_details_and_reviews_from_html(html_content): | |
"""詳細HTMLから基本情報と口コミ情報を抽出 (本文は span.wiI7pd 優先、中断チェックあり)""" | |
# この関数はスクレイピング処理中に呼び出される | |
print(" [HTML Extractor - Details & Reviews (wiI7pd priority)] 開始") | |
soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') | |
details = {"name": "N/A", "url": "", "phone": "N/A", "address": "N/A", "links": {}, "reviews": [], "extraction_error": None} | |
try: | |
# --- 基本情報の抽出 --- | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
main_container_selector = '.aIFcqe' | |
main_container = soup.select_one(main_container_selector) | |
search_root = soup # デフォルトはページ全体 | |
if main_container: | |
# print(f" '{main_container_selector}' コンテナ発見。基本情報を抽出。") | |
search_root = main_container | |
# else: | |
# print(f" 警告: '{main_container_selector}' コンテナが見つかりません。ページ全体から基本情報を抽出。") | |
# 名前 (h1タグを探す) | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
name_tag = search_root.find('h1') | |
if name_tag: | |
details['name'] = name_tag.get_text(strip=True) | |
elif details['name'] == 'N/A': # フォールバックで<title>から取得 | |
title_tag = soup.find('title') | |
if title_tag and title_tag.string: | |
title_text = title_tag.string.replace('- Google マップ', '').strip() | |
if title_text.lower() != "google マップ": details["name"] = title_text | |
# 電話、住所、ウェブサイトなどの情報を抽出 | |
selectors_map = { | |
"phone": ['button[data-item-id^="phone:tel:"]', 'div.Io6YTe', 'button[aria-label*="電話番号"]'], | |
"address": ['button[data-item-id="address"]', 'div.rogA2c', 'button[aria-label*="住所"]'], | |
"website": ['a[data-item-id="authority"][href^="http"]', 'button[data-item-id="authority"]', 'a[aria-label*="ウェブサイト"][href^="http"]'], | |
"other_link": ['a.CsEnBe[href^="http"]'] # 公式サイト以外のリンク | |
} | |
for info_type, selectors in selectors_map.items(): | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
found_val = None | |
for selector in selectors: | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
element = search_root.select_one(selector) | |
# コンテナ内で見つからなければページ全体で再検索 | |
if not element and search_root != soup: | |
element = soup.select_one(selector) | |
if element: | |
data_item_id = element.get('data-item-id', '') | |
aria_label = element.get('aria-label', '') | |
element_text = element.get_text(strip=True) | |
href = element.get('href') | |
if info_type == "phone": | |
phone_num = None | |
if data_item_id.startswith('phone:tel:'): phone_num = data_item_id.split(':')[-1] | |
elif "電話番号:" in aria_label: phone_num = re.search(r'([\d-]+)', aria_label.split("電話番号:")[-1]) | |
elif element.name == 'div' and re.match(r'^[\d\s-]+$', element_text): phone_num = element_text | |
# 電話番号形式の整形と検証 | |
if isinstance(phone_num, str): phone_num = phone_num.strip() | |
elif hasattr(phone_num, 'group'): phone_num = phone_num.group(1).strip() | |
if phone_num and re.match(r'^[\d-]+$', phone_num.replace('ー','-')): | |
found_val = phone_num.replace('ー','-') | |
break # 電話番号が見つかったらループ脱出 | |
elif info_type == "address": | |
addr_text = None | |
if data_item_id == 'address': addr_text = element_text | |
elif "住所:" in aria_label: addr_text = aria_label.split("住所:")[-1].split('(新しいウィンドウで開きます)')[0].strip() | |
elif element.name == 'div' and ("〒" in element_text or any(k in element_text for k in ["都","道","府","県","市","区","町","村"])): addr_text = element_text | |
# 住所らしき文字列か簡易チェック | |
if addr_text and len(addr_text) > 5: # ある程度の長さがあるか | |
found_val = addr_text | |
break # 住所が見つかったらループ脱出 | |
elif info_type == "website" or info_type == "other_link": | |
if href and href.startswith('http') and 'google.com' not in urlparse(href).netloc: # Google自身のリンクは除外 | |
link_name = "N/A"; is_website = False | |
# リンクの種類を判別 | |
if data_item_id == 'authority' or "ウェブサイト" in aria_label: | |
link_name = element_text if is_domain_like(element_text) else "ウェブサイト" | |
is_website = True | |
elif info_type == "other_link": | |
link_name = f"リンク ({element_text})" if element_text else "外部リンク" | |
elif is_domain_like(element_text): # ドメイン名らしきテキストの場合 | |
link_name = element_text | |
if link_name != "N/A": | |
normalized_url = href.rstrip('/') | |
# 重複を避けて links 辞書に追加 | |
if not any(existing_url.rstrip('/') == normalized_url for existing_url in details["links"].values()): | |
details["links"][link_name] = href | |
# website タイプで見つかったものを優先的にメインURL候補へ (まだ未設定の場合) | |
if is_website and details["url"] == "": | |
details["url"] = href | |
# website タイプならこのセレクタでの探索は終了 | |
if info_type == "website": | |
found_val = href # 見つかったことを示す | |
break # websiteセレクタのループ脱出 | |
# 各タイプの最初の有効な値を details に格納 (other_link は除く) | |
if found_val and info_type in details and info_type != "other_link": | |
details[info_type] = found_val | |
# メインURLがまだ決まっていない場合、links 辞書から探す | |
if details["url"] == "": | |
priority = ["ウェブサイト", "authority"] # 公式サイトらしき名前を優先 | |
found_url_in_links = False | |
for p_word in priority: | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
for name, url in details["links"].items(): | |
if p_word in name.lower(): | |
details["url"] = url | |
found_url_in_links = True | |
break | |
if found_url_in_links: | |
break | |
# それでも見つからなければ、ドメイン名らしきリンク > 最初のリンク | |
if not found_url_in_links: | |
domain_link = next((url for name, url in details["links"].items() if is_domain_like(name)), None) | |
if domain_link: | |
details["url"] = domain_link | |
elif details["links"]: # linksに何かあれば最初のものをURLとする | |
details["url"] = next(iter(details["links"].values())) | |
# print(f" 基本情報抽出完了: Name='{details['name']}'") | |
# --- 口コミ情報の抽出 --- | |
# print(" 口コミ情報抽出開始 (span.wiI7pd 優先)...") | |
review_container_selector = 'div.GHT2ce.NsCY4' | |
review_container = soup.select_one(review_container_selector) | |
if review_container: | |
# print(f" '{review_container_selector}' 口コミコンテナ発見。") | |
# 口コミカードの特定 (jftiEf or MyEned) | |
review_card_selectors = ['div.jftiEf', 'div.MyEned'] | |
review_cards = [] | |
for sel in review_card_selectors: | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
review_cards = review_container.select(sel) | |
if review_cards: | |
# print(f" 口コミカードセレクタ '{sel}' で {len(review_cards)} 件発見。") | |
break | |
# if not review_cards: | |
# print(" 警告: 口コミコンテナ内で口コミカードが見つかりません。") | |
extracted_reviews = [] | |
for card_idx, card in enumerate(review_cards): | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析中に中断リクエスト") | |
try: | |
review_text = "N/A"; reviewer_name = "N/A"; rating = "N/A" | |
# 口コミ本文抽出 (span.wiI7pd 優先) | |
text_span_wiI7pd = card.select_one('span.wiI7pd') | |
if text_span_wiI7pd: | |
review_text = text_span_wiI7pd.get_text(strip=True) | |
else: | |
# フォールバック: span[jscontroller="MZnM8e"] | |
full_text_span = card.select_one('span[jscontroller="MZnM8e"]') | |
if full_text_span: | |
review_text = full_text_span.get_text(strip=True) | |
# 投稿者名 (.d4r55) | |
name_el = card.select_one('.d4r55'); | |
if name_el: reviewer_name = name_el.get_text(strip=True) | |
# 評価 (.kvMYJc aria-label) | |
rating_el = card.select_one('.kvMYJc'); | |
if rating_el: | |
aria_label = rating_el.get('aria-label', ''); | |
match = re.search(r'星 (\d+(\.\d+)?)', aria_label) # "星 5.0" などを想定 | |
if match: rating = match.group(1) | |
# 情報が一部でも取れていれば追加 | |
if review_text != "N/A" or reviewer_name != "N/A": | |
extracted_reviews.append({"reviewer": reviewer_name, "rating": rating, "text": review_text if review_text != "N/A" else ""}) | |
except Exception as e_card: | |
print(f" 口コミカード {card_idx+1} の解析中にエラー: {e_card}") | |
extracted_reviews.append({"reviewer": "Error", "rating": "N/A", "text": f"解析エラー: {e_card}"}) | |
details['reviews'] = extracted_reviews | |
# print(f" 口コミ抽出完了: {len(details['reviews'])} 件") | |
# else: | |
# print(f" 警告: '{review_container_selector}' 口コミコンテナが見つかりません。") | |
except InterruptedError as e_interrupt: # 中断エラーをキャッチ | |
print(f" HTML解析処理が中断されました: {e_interrupt}") | |
details['extraction_error'] = "Interrupted" | |
details['status'] = 'Interrupted' # ステータスも中断にする | |
except Exception as e_extract: | |
print(f"★★★★★ HTML抽出処理中にエラーが発生しました ★★★★★") | |
error_trace = traceback.format_exc() | |
print(error_trace) | |
details['extraction_error'] = f"Type: {type(e_extract).__name__}, Msg: {e_extract}\nTrace: {error_trace}" | |
# print(f" [HTML Extractor - Details & Reviews (wiI7pd priority)] 完了: Name='{details['name']}'") | |
return details | |
# --- CSV Loading Function (From Script 1, 中断チェック追加) --- | |
def load_queries(csv_path): | |
"""CSVファイルを読み込み、1列目のクエリをリストとして返す(中断チェックあり)""" | |
queries = [] | |
encodings_to_try = ['utf-8-sig', 'utf-8', 'cp932', 'shift_jis'] # 試すエンコーディングリスト | |
file_encoding = None | |
print(f"CSVファイル読み込み開始: {os.path.basename(csv_path)}") | |
if not csv_path or not os.path.exists(csv_path): | |
print("エラー: CSVファイルが見つかりません。") | |
return [] | |
# ファイルのエンコーディングを特定 | |
for encoding in encodings_to_try: | |
if interrupt_event.is_set(): print("CSV読み込み中に中断リクエスト検出"); return [] # 中断チェック | |
try: | |
with open(csv_path, 'r', encoding=encoding, errors='strict') as f: | |
f.read(1024) # ファイルの一部を読んでエンコーディングを確認 | |
file_encoding = encoding | |
print(f" エンコーディング '{encoding}' で読み込み試行...") | |
break | |
except (UnicodeDecodeError, LookupError): | |
continue # 次のエンコーディングを試す | |
except Exception as e_enc: | |
print(f" '{encoding}' 試行中に予期せぬエラー: {e_enc}") | |
continue | |
if not file_encoding: | |
print(f"エラー: ファイル '{os.path.basename(csv_path)}' を読み込めるエンコーディングが見つかりません。") | |
return [] | |
line_num = 0 | |
try: | |
with open(csv_path, 'r', encoding=file_encoding, newline='') as f: | |
reader = csv.reader(f) | |
try: | |
if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック | |
header = next(reader) # 最初の行を読み込む | |
line_num += 1 | |
print(f" 1行目 (ヘッダー可能性あり): {header}") | |
except StopIteration: | |
print("情報: CSVファイルが空です。") | |
return [] # ファイルが空なら終了 | |
except InterruptedError as e_interrupt: | |
print(e_interrupt) | |
return [] | |
# 1行目がヘッダーかどうかを判定 (簡易的) | |
header_keywords = ['query', 'search', 'keyword', 'クエリ', '検索', 'キーワード', '店舗', '会社'] | |
first_col_header = header[0].strip().lower() if header else "" | |
is_header = any(hkw in first_col_header for hkw in header_keywords) | |
# 1行目がヘッダーでなく、かつ内容があればクエリとして追加 | |
if not is_header and header and header[0].strip(): | |
queries.append(header[0].strip()) | |
elif is_header: | |
print(" 1行目はヘッダーと判断しスキップします。") | |
# 2行目以降を処理 | |
for row in reader: | |
if interrupt_event.is_set(): raise InterruptedError("CSV読み込み中に中断リクエスト") # 中断チェック | |
line_num += 1 | |
# 1列目にデータがあればクエリとして追加 | |
if row and row[0].strip(): | |
queries.append(row[0].strip()) | |
# 1列目が空でも他の列にデータがあれば警告を表示 (スキップ対象) | |
elif any(cell.strip() for cell in row): | |
print(f"警告: 行 {line_num} の1列目が空です: {row}。スキップします。") | |
print(f" CSVから {len(queries)} 件の有効なクエリを抽出しました。") | |
except InterruptedError as e_interrupt: # 中断をキャッチ | |
print(e_interrupt) | |
print(f"中断リクエストにより、{len(queries)} 件のクエリまで読み込みました。") | |
return queries # 途中までのクエリを返す | |
except Exception as e: | |
# CSV処理中のエラーハンドリング | |
print(f"★★★★★ CSVファイル処理中にエラー (行 {line_num}) ★★★★★") | |
print(f"エラータイプ: {type(e).__name__}") | |
print(f"エラーメッセージ: {e}") | |
print("--- スタックトレース ---") | |
print(traceback.format_exc()) | |
print("----------------------") | |
return [] # エラー発生時は空リストを返す | |
return queries | |
# --- Single Query Processing Function (From Script 1, 中断チェック強化) --- | |
def process_single_query_full_list(driver, query, query_index, output_dir, wait_config): | |
"""単一クエリ処理: 検索→リストスクロール→リンク抽出→詳細ページ→口コミタブ→口コミスクロール→「もっと見る」クリック→HTML取得→解析 (中断チェックあり)""" | |
print(f"\n--- クエリ処理開始 [Index:{query_index}] ---: {query}") | |
results_list = [] | |
safe_query_part = re.sub(r'[\\/*?:"<>|]', '_', query)[:30].strip() or "empty_query" | |
base_url = "https://www.google.com/maps/" | |
# 待機時間設定 | |
WAIT_TIME_BASE = wait_config['base'] | |
WAIT_TIME_DETAIL = wait_config['detail'] | |
WAIT_TIME_SEARCH = wait_config['search'] | |
# スクロール設定 | |
SCROLL_PAUSE_TIME = max(1.5, WAIT_TIME_BASE * 0.5) | |
MAX_SCROLL_ATTEMPTS = 30 | |
SCROLL_PAUSE_TIME_REVIEW = max(1.0, WAIT_TIME_BASE * 0.3) | |
MAX_SCROLL_ATTEMPTS_REVIEW = 500 # 口コミは多い場合があるので回数を増やす | |
REVIEW_SCROLL_STUCK_LIMIT = 5 # 口コミスクロール停止判定の閾値 | |
try: | |
# --- 中断チェック --- | |
if interrupt_event.is_set(): raise InterruptedError("処理開始前に中断リクエスト") | |
# 1. 検索実行とリスト表示待機 | |
search_url = f"https://www.google.com/maps/search/{query.replace(' ', '+')}" | |
print(f" URLにアクセス: {search_url}") | |
driver.get(search_url) | |
if interrupt_event.is_set(): raise InterruptedError("ページ読み込み後に中断リクエスト") | |
print(f" 検索結果リスト表示待機 (最大{WAIT_TIME_SEARCH}秒)...") | |
list_container_selector = 'div[role="feed"], div[aria-label*="の検索結果"]' | |
try: | |
# WebDriverWait も中断可能にするのは難しいので、ここではそのまま | |
list_container = WebDriverWait(driver, WAIT_TIME_SEARCH).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) | |
) | |
WebDriverWait(driver, 10).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, f'{list_container_selector} a[href*="/maps/place/"]')) | |
) | |
print(" 検索結果リスト表示を確認。") | |
except TimeoutException as e_timeout: | |
print(f" エラー: 検索結果リストの表示タイムアウト。URL: {search_url}\n{e_timeout}") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Timeout)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Timeout'}) | |
return results_list | |
except Exception as e_wait: | |
print(f"★★★★★ リスト待機中に予期せぬエラー ★★★★★\nURL: {search_url}\n{type(e_wait).__name__}: {e_wait}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': search_url, 'html_filename': 'N/A', 'name': f'Error (List Wait Exception)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: List Wait Exception'}) | |
return results_list | |
# 2. 検索リストのスクロール | |
print(" 検索リストをスクロールして全結果を表示...") | |
last_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
scroll_attempts = 0 | |
stuck_count = 0 | |
while scroll_attempts < MAX_SCROLL_ATTEMPTS: | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 中断チェック | |
try: | |
driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', list_container) | |
interruptible_sleep(SCROLL_PAUSE_TIME) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック | |
new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
end_markers = driver.find_elements(By.XPATH, "//span[contains(text(), '結果は以上です')] | //p[contains(text(), '結果は以上です')]") | |
if any(el.is_displayed() for el in end_markers): | |
print(" 「結果は以上です」表示確認。検索リストスクロール終了。") | |
break | |
if new_height == last_height: | |
stuck_count += 1 | |
# print(f" 検索リストスクロール高さ変化なし ({stuck_count}回目)。再試行...") | |
interruptible_sleep(SCROLL_PAUSE_TIME * 1.5) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロール中に中断リクエスト") # 待機後にもチェック | |
new_height = driver.execute_script("return arguments[0].scrollHeight", list_container) | |
if new_height == last_height and stuck_count >= 3: | |
print(" 高さ変化なしが続いたため、検索リストスクロール終了と判断。") | |
break | |
else: | |
stuck_count = 0 | |
last_height = new_height | |
except Exception as e_scroll: | |
if interrupt_event.is_set(): raise InterruptedError("検索リストスクロールエラー処理中に中断リクエスト") # エラー処理中もチェック | |
print(f"★★★★★ 検索リストスクロール中にエラー ★★★★★\n{type(e_scroll).__name__}: {e_scroll}") | |
print(" スクロールエラー発生。可能な範囲で続行します。") | |
scroll_attempts += 1 | |
if scroll_attempts >= MAX_SCROLL_ATTEMPTS: | |
print(f" 検索リスト最大スクロール回数 ({MAX_SCROLL_ATTEMPTS}) 到達。") | |
# 3. リンク抽出 | |
if interrupt_event.is_set(): raise InterruptedError("リンク抽出前に中断リクエスト") # 中断チェック | |
print(" 検索結果リストからリンクを抽出...") | |
unique_place_links = set() | |
result_card_selector = '.hfpxzc' | |
try: | |
list_container_updated = WebDriverWait(driver, 10).until( | |
EC.presence_of_element_located((By.CSS_SELECTOR, list_container_selector)) | |
) | |
result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
# print(f" '{result_card_selector}' 要素を {len(result_cards)} 件発見。") | |
if not result_cards: | |
# print(f" 警告: '{result_card_selector}' が見つかりません。代替セレクタ 'a.hfpxzc' で試行...") | |
result_card_selector = 'a.hfpxzc' | |
result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
# print(f" 代替セレクタで {len(result_cards)} 件発見。") | |
if not result_cards: | |
# print(f" 警告: 代替セレクタ 'a.Nv2PK' で試行...") | |
result_card_selector = 'a.Nv2PK' | |
result_cards = list_container_updated.find_elements(By.CSS_SELECTOR, result_card_selector) | |
# print(f" 代替セレクタで {len(result_cards)} 件発見。") | |
link_extraction_errors = 0 | |
for card_idx, card in enumerate(result_cards): | |
if interrupt_event.is_set(): raise InterruptedError("リンク抽出ループ中に中断リクエスト") # 中断チェック | |
try: | |
link_element = None | |
if card.tag_name == 'a': link_element = card | |
else: | |
try: link_element = card.find_element(By.TAG_NAME, 'a') | |
except NoSuchElementException: continue | |
if link_element: | |
href = link_element.get_attribute('href') | |
if href and "/maps/place/" in href and not href.startswith("javascript:"): | |
absolute_href = urljoin(base_url, href) | |
unique_place_links.add(absolute_href) | |
except StaleElementReferenceException: | |
link_extraction_errors += 1 | |
continue | |
except Exception as e_extract_link: | |
print(f"★★★★★ カード {card_idx+1} からのリンク抽出エラー ★★★★★\n{type(e_extract_link).__name__}: {e_extract_link}") | |
link_extraction_errors += 1 | |
if link_extraction_errors > 0: | |
print(f" リンク抽出中に {link_extraction_errors} 件のエラーが発生しました。") | |
print(f" 抽出したユニークリンク数: {len(unique_place_links)}") | |
except Exception as e_find_links: | |
print(f"★★★★★ リンク抽出プロセス全体でエラー ★★★★★\n使用したセレクタ: '{result_card_selector}'\n{type(e_find_links).__name__}: {e_find_links}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': f'Error (Link Extraction Fail)', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Link Extraction Fail'}) | |
return results_list | |
if not unique_place_links: | |
print(" 有効な詳細ページリンクが見つかりませんでした。このクエリの結果はありません。") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': driver.current_url, 'html_filename': 'N/A', 'name': 'No Results Found', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Success: No Results'}) | |
return results_list | |
# 4. 各リンクの詳細ページを処理 | |
print(f" {len(unique_place_links)} 件の詳細情報を取得...") | |
link_list = sorted(list(unique_place_links)) | |
processed_urls = set() | |
for i, place_url in enumerate(link_list, 1): | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ処理ループ開始前に中断リクエスト") # 中断チェック | |
if place_url in processed_urls: continue | |
processed_urls.add(place_url) | |
print(f"\n --- 詳細取得 [Query:{query_index}, Result:{i}/{len(link_list)}] ---") | |
result_details = {'query_index': query_index, 'original_query': query, 'result_rank': i, 'place_url': place_url, 'html_filename': 'N/A', 'name': 'N/A', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Pending', 'extraction_error': None} | |
try: | |
print(f" 詳細ページに遷移: {place_url}") | |
driver.get(place_url) | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ読み込み後に中断リクエスト") | |
WebDriverWait(driver, WAIT_TIME_DETAIL).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, 'h1')) | |
) | |
interruptible_sleep(WAIT_TIME_BASE * 0.2) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ待機後に中断リクエスト") | |
# --- 口コミタブをクリック --- | |
review_tab_text = "クチコミ" | |
review_tab_xpath = f"//button[@role='tab'][contains(., '{review_tab_text}') or contains(@aria-label, '{review_tab_text}')]" | |
review_tab_clicked = False | |
review_scroll_element = None | |
try: | |
# print(f" {review_tab_text}タブ クリック試行...") | |
review_tab = WebDriverWait(driver, 10).until( | |
EC.element_to_be_clickable((By.XPATH, review_tab_xpath)) | |
) | |
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", review_tab) | |
interruptible_sleep(0.3) | |
if interrupt_event.is_set(): raise InterruptedError("口コミタブクリック前に中断リクエスト") | |
driver.execute_script("arguments[0].click();", review_tab) | |
review_tab_clicked = True | |
print(f" {review_tab_text}タブをクリックしました。口コミコンテナ表示待機...") | |
review_container_selector = 'div.GHT2ce.NsCY4' | |
first_review_card_selector = f'{review_container_selector} div.jftiEf:first-of-type, {review_container_selector} div.MyEned:first-of-type' | |
review_scroll_element = WebDriverWait(driver, WAIT_TIME_DETAIL).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, review_container_selector)) | |
) | |
WebDriverWait(driver, 5).until( | |
EC.visibility_of_element_located((By.CSS_SELECTOR, first_review_card_selector)) | |
) | |
print(f" 口コミコンテナ表示確認、スクロール要素取得。") | |
interruptible_sleep(WAIT_TIME_BASE * 0.5) | |
if interrupt_event.is_set(): raise InterruptedError("口コミコンテナ待機後に中断リクエスト") | |
except TimeoutException: print(f" 警告: {review_tab_text}タブまたは口コミコンテナの表示タイムアウト。") | |
except ElementClickInterceptedException: print(f" 警告: {review_tab_text}タブのクリックが遮られました。") | |
except NoSuchElementException: print(f" 警告: {review_tab_text}タブが見つかりません。") | |
except Exception as e_click_review: print(f"★★★★★ {review_tab_text}タブ処理中に予期せぬエラー ★★★★★\n{type(e_click_review).__name__}: {e_click_review}") | |
# --- 口コミエリアのスクロール処理 --- | |
if review_scroll_element: | |
print(" 口コミエリアをスクロールして全件表示試行...") | |
review_last_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) | |
review_scroll_attempts = 0 | |
review_stuck_count = 0 | |
while review_scroll_attempts < MAX_SCROLL_ATTEMPTS_REVIEW: | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 中断チェック | |
try: | |
driver.execute_script('arguments[0].scrollTop = arguments[0].scrollHeight', review_scroll_element) | |
interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック | |
review_new_height = driver.execute_script("return arguments[0].scrollHeight", review_scroll_element) | |
if review_new_height == review_last_height: | |
review_stuck_count += 1 | |
if review_stuck_count >= REVIEW_SCROLL_STUCK_LIMIT: | |
print(f" 口コミスクロール高さが{REVIEW_SCROLL_STUCK_LIMIT}回変化なし。スクロール終了と判断。") | |
break | |
else: | |
interruptible_sleep(SCROLL_PAUSE_TIME_REVIEW * 2) # 中断可能な待機 | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロール中に中断リクエスト") # 待機後にもチェック | |
else: | |
review_stuck_count = 0 | |
review_last_height = review_new_height | |
except Exception as e_review_scroll: | |
if interrupt_event.is_set(): raise InterruptedError("口コミスクロールエラー処理中に中断リクエスト") | |
print(f"★★★★★ 口コミスクロール中にエラー ★★★★★\n{type(e_review_scroll).__name__}: {e_review_scroll}") | |
print(" 口コミスクロールエラー発生。可能な範囲で続行します。") | |
break | |
review_scroll_attempts += 1 | |
if review_scroll_attempts >= MAX_SCROLL_ATTEMPTS_REVIEW: | |
print(f" 最大口コミスクロール回数 ({MAX_SCROLL_ATTEMPTS_REVIEW}) 到達。") | |
print(" 口コミエリアのスクロール完了。") | |
elif review_tab_clicked: print(" 警告: 口コミスクロール要素が見つからなかったため、口コミスクロールをスキップします。") | |
# --- 「もっと見る」ボタンをクリック --- | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック前に中断リクエスト") | |
if review_tab_clicked and review_scroll_element: | |
print(" 「もっと見る」ボタンを検索してクリック試行...") | |
more_buttons_xpath = "//button[contains(text(), 'もっと見る')]" | |
clicked_count = 0 | |
click_attempts = 0 | |
max_click_attempts = 3 | |
while click_attempts < max_click_attempts: | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」ループ中に中断リクエスト") # 中断チェック | |
buttons_found_this_round = 0 | |
try: | |
more_buttons = driver.find_elements(By.XPATH, more_buttons_xpath) | |
if not more_buttons: | |
# if click_attempts == 0: print(" 「もっと見る」ボタンが見つかりませんでした。") | |
# else: print(f" 追加の「もっと見る」ボタンは見つかりませんでした (試行 {click_attempts+1}/{max_click_attempts})。") | |
break | |
# print(f" 「もっと見る」ボタンを {len(more_buttons)} 個発見 (試行 {click_attempts+1}/{max_click_attempts})。クリック開始...") | |
for btn_idx, button in enumerate(more_buttons): | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") # 中断チェック | |
try: | |
if button.is_displayed() and button.is_enabled(): | |
driver.execute_script("arguments[0].scrollIntoView({block: 'center'});", button) | |
interruptible_sleep(0.2) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") | |
driver.execute_script("arguments[0].click();", button) | |
clicked_count += 1 | |
buttons_found_this_round += 1 | |
interruptible_sleep(0.3) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック中に中断リクエスト") | |
except ElementClickInterceptedException: print(f" ボタン {btn_idx+1} のクリックが遮られました。スキップします。") | |
except StaleElementReferenceException: print(f" ボタン {btn_idx+1} が古くなりました。スキップします。") | |
except Exception as e_click_more: print(f" ボタン {btn_idx+1} のクリック中にエラー: {e_click_more}") | |
# print(f" 今回の試行で {buttons_found_this_round} 個の「もっと見る」ボタンをクリックしました。") | |
if buttons_found_this_round == 0: | |
# print(" これ以上クリックできる「もっと見る」ボタンはありませんでした。") | |
break | |
except Exception as e_find_more: | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」検索エラー処理中に中断リクエスト") | |
print(f"★★★★★ 「もっと見る」ボタン検索中にエラー ★★★★★\n{type(e_find_more).__name__}: {e_find_more}") | |
break | |
click_attempts += 1 | |
if click_attempts < max_click_attempts: | |
interruptible_sleep(1.0) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」試行間待機中に中断リクエスト") | |
if clicked_count > 0: print(f" 合計 {clicked_count} 個の「もっと見る」ボタンをクリックしました。") | |
# else: print(" クリックされた「もっと見る」ボタンはありませんでした。") | |
interruptible_sleep(WAIT_TIME_BASE * 0.5) | |
if interrupt_event.is_set(): raise InterruptedError("「もっと見る」クリック後に中断リクエスト") | |
# --- HTML取得と保存 --- | |
print(" ページのHTMLを取得・保存中...") | |
detail_html_content = "" | |
try: | |
if interrupt_event.is_set(): raise InterruptedError("HTML取得前に中断リクエスト") | |
detail_html_content = driver.page_source | |
temp_name = 'N/A' | |
try: temp_name = driver.find_element(By.TAG_NAME, 'h1').text | |
except: pass | |
safe_place_name_part = re.sub(r'[\\/*?:"<>|]', '_', temp_name)[:20].strip() or "no_name" | |
tab_suffix = "_reviews_expanded" if review_tab_clicked else "_overview" | |
# クエリごとのサブディレクトリを作成 | |
query_subdir = os.path.join(output_dir, f"Q{query_index:03d}_{safe_query_part}") | |
os.makedirs(query_subdir, exist_ok=True) | |
detail_html_fname = f"R{i:03d}_{safe_place_name_part}{tab_suffix}.html" | |
detail_html_path = os.path.join(query_subdir, detail_html_fname) | |
with open(detail_html_path, 'w', encoding='utf-8') as f: | |
f.write(detail_html_content) | |
# 相対パスを保存 | |
result_details['html_filename'] = os.path.join(f"Q{query_index:03d}_{safe_query_part}", detail_html_fname) | |
print(f" HTMLを保存しました: {result_details['html_filename']}") | |
except Exception as e_save_html: | |
print(f" HTML取得/保存エラー: {e_save_html}") | |
result_details['html_filename'] = 'Error Saving HTML' | |
# --- HTML解析 --- | |
if detail_html_content: | |
print(" HTMLを解析して情報を抽出中...") | |
if interrupt_event.is_set(): raise InterruptedError("HTML解析前に中断リクエスト") | |
extracted_info = extract_details_and_reviews_from_html(detail_html_content) | |
result_details.update(extracted_info) | |
# 抽出関数内で中断された場合、ステータスが'Interrupted'になっているはず | |
if result_details.get('status') != 'Interrupted': | |
if result_details.get('extraction_error'): | |
result_details['status'] = f"Warning: HTML Extraction Error" | |
else: | |
result_details['status'] = 'Success' | |
print(" HTML解析完了。") | |
else: | |
print(" エラー: HTMLコンテンツが空のため、情報抽出をスキップします。") | |
result_details['status'] = 'Error: Empty HTML Content' | |
except TimeoutException as e_timeout_detail: | |
print(f"★★★★★ 詳細ページ読み込みタイムアウト ★★★★★\nURL: {place_url}") | |
result_details['status'] = f'Error: Detail Page Timeout'; result_details['name'] = f"Error (Timeout R:{i})" | |
except NoSuchElementException as e_nse: | |
print(f"★★★★★ 詳細ページで必須要素(h1など)が見つかりません ★★★★★\nURL: {place_url}") | |
result_details['status'] = f'Error: Detail Page Missing Element (e.g., h1)'; result_details['name'] = f"Error (ElementNotFound R:{i})" | |
except Exception as e_detail: | |
if interrupt_event.is_set(): raise InterruptedError("詳細ページ例外処理中に中断リクエスト") # 例外処理中もチェック | |
print(f"★★★★★ 詳細ページ処理中に予期せぬエラー ★★★★★\nURL: {place_url}\n{type(e_detail).__name__}: {e_detail}") | |
result_details['status'] = f'Error: Detail Page Exception - {type(e_detail).__name__}'; result_details['name'] = f"Error (Exception R:{i})" | |
finally: | |
# 中断された場合、ステータスを上書き | |
if interrupt_event.is_set() and result_details.get('status') != 'Interrupted': | |
result_details['status'] = 'Interrupted' | |
results_list.append(result_details) | |
except InterruptedError as e_interrupt: # クエリ処理全体で中断をキャッチ | |
print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理中に中断リクエスト: {e_interrupt} ★★★★★") | |
# 中断されたことを示す結果を追加 | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 'N/A', 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Interrupted Query {query_index}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': 'Interrupted'}) | |
# ★重要★ 中断例外を再度発生させ、run_scraping関数に中断を伝える | |
raise e_interrupt | |
except Exception as e_main_query: | |
print(f"★★★★★ クエリ '{query}' [Index:{query_index}] の処理全体でエラー ★★★★★\n{type(e_main_query).__name__}: {e_main_query}\n--- Traceback ---\n{traceback.format_exc()}\n--- End Traceback ---") | |
results_list.append({'query_index': query_index, 'original_query': query, 'result_rank': 0, 'place_url': 'N/A', 'html_filename': 'N/A', 'name': f'Error (Overall Query {query_index})', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'status': f'Error: Query Level Exception - {type(e_main_query).__name__}'}) | |
finally: | |
status_msg = "中断" if interrupt_event.is_set() else "完了" | |
print(f"--- クエリ処理{status_msg} [Index:{query_index}] - {len(results_list)} 件の結果 ---") | |
return results_list | |
# --- 中断リクエスト用関数 (From Script 1) --- | |
def request_interrupt(): | |
"""中断フラグをセットする""" | |
if not interrupt_event.is_set(): | |
print("\n!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!") | |
print("!!! 中断リクエストを受け付けました。 !!!") | |
print("!!! 現在のスクレイピング処理が完了次第、停止します... !!!") | |
print("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!\n") | |
interrupt_event.set() | |
else: | |
print("\n--- 中断は既にリクエストされています ---") | |
return "[中断リクエスト受信]" | |
# --- Gradio Processing Function (From Script 1, 中断処理対応, 途中ダウンロード削除) --- | |
def run_scraping(input_csv_file, output_dir_name, output_csv_name, csv_encoding, | |
wait_time_base, wait_time_detail, wait_time_search, headless_mode, progress=gr.Progress()): | |
"""Gradioインターフェースから呼び出されるスクレイピング処理関数""" | |
log_stream = io.StringIO() # ログ出力用 | |
start_time_total = time.time() # 全体処理時間計測開始 | |
driver = None # WebDriverオブジェクト初期化 | |
processed_query_count = 0 # 処理済みクエリ数 | |
total_results_count = 0 # CSV書き込み総行数 | |
total_queries = 0 # 総クエリ数 | |
output_csv_path = None # 出力CSVファイルパス | |
html_base_output_dir = None # HTML出力ベースディレクトリ | |
interrupted_flag = False # 処理が中断されたかを示すフラグ | |
# --- 中断フラグをリセット --- | |
interrupt_event.clear() | |
print("中断フラグをリセットしました。", file=log_stream) | |
# 標準出力と標準エラー出力をログストリームにリダイレクト | |
with contextlib.redirect_stdout(log_stream), contextlib.redirect_stderr(log_stream): | |
try: | |
print("=== スクレイピング処理開始 ===") | |
print(f"開始時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
# 入力ファイルチェック | |
if input_csv_file is None: | |
print("エラー: クエリCSVファイルが選択されていません。処理を中断します。") | |
yield log_stream.getvalue(), None, None # ログ, 結果CSV, HTMLフォルダパス | |
return | |
yield log_stream.getvalue(), None, None # 初期ログをUIに反映 | |
# パラメータ設定 | |
SEARCH_QUERIES_CSV_PATH = input_csv_file.name | |
OUTPUT_DIR = output_dir_name.strip() or "gmap_scraping_output" | |
OUTPUT_CSV_FILENAME = output_csv_name.strip() or "scraping_results.csv" | |
CSV_ENCODING = csv_encoding | |
try: | |
wait_config = { | |
'base': max(1.0, float(wait_time_base)), | |
'detail': max(10.0, float(wait_time_detail)), | |
'search': max(5.0, float(wait_time_search)) | |
} | |
except ValueError: | |
print("警告: 待機時間に無効な値が入力されました。デフォルト値を使用します。") | |
wait_config = {'base': 4.0, 'detail': 25.0, 'search': 15.0} | |
print(f"待機時間設定: 基本={wait_config['base']}秒, 詳細/口コミ={wait_config['detail']}秒, 検索={wait_config['search']}秒") | |
yield log_stream.getvalue(), None, None | |
# 出力ディレクトリ設定と作成 | |
if not os.path.isabs(OUTPUT_DIR): | |
OUTPUT_DIR = os.path.join(os.getcwd(), OUTPUT_DIR) | |
html_base_output_dir = os.path.join(OUTPUT_DIR, "html_files") # HTML保存用サブディレクトリ | |
output_csv_path = os.path.join(OUTPUT_DIR, OUTPUT_CSV_FILENAME) # CSVはメインディレクトリに | |
print(f"HTML出力先ベースディレクトリ: {html_base_output_dir}") | |
print(f"CSV出力先ファイル: {output_csv_path}") | |
os.makedirs(OUTPUT_DIR, exist_ok=True) | |
os.makedirs(html_base_output_dir, exist_ok=True) # HTML用サブディレクトリも作成 | |
yield log_stream.getvalue(), None, html_base_output_dir # HTMLフォルダパスを返す | |
# CSVからクエリ読み込み (中断チェックあり) | |
queries = load_queries(SEARCH_QUERIES_CSV_PATH) | |
yield log_stream.getvalue(), None, html_base_output_dir | |
if interrupt_event.is_set(): # 読み込み中に中断されたかチェック | |
print("CSV読み込み中に中断されたため、処理を終了します。") | |
interrupted_flag = True | |
raise InterruptedError("CSV loading interrupted") # 処理を中断フローへ | |
if not queries: | |
print("エラー: CSVから処理可能なクエリが見つかりませんでした。処理を終了します。") | |
yield log_stream.getvalue(), None, html_base_output_dir | |
return | |
total_queries = len(queries) | |
print(f"{total_queries} 件のクエリを処理します。") | |
yield log_stream.getvalue(), None, html_base_output_dir | |
# --- 中断チェック --- | |
if interrupt_event.is_set(): raise InterruptedError("WebDriver初期化前に中断リクエスト") | |
# WebDriver初期化 | |
progress(0, desc="WebDriver初期化中...") | |
print("\nWebDriver初期化中...") | |
yield log_stream.getvalue(), None, html_base_output_dir | |
options = Options() | |
options.add_argument('--no-sandbox') | |
options.add_argument('--disable-dev-shm-usage') | |
options.add_argument('--lang=ja-JP') | |
options.add_argument("--window-size=1920,1080") | |
options.add_argument('--disable-extensions') | |
options.add_argument('--disable-blink-features=AutomationControlled') | |
options.add_argument('--disable-gpu') | |
options.add_experimental_option('excludeSwitches', ['enable-automation']) | |
options.add_experimental_option('useAutomationExtension', False) | |
options.add_experimental_option("prefs", { | |
"credentials_enable_service": False, | |
"profile.password_manager_enabled": False | |
}) | |
if headless_mode: | |
print(" ヘッドレスモードで実行します。") | |
options.add_argument('--headless=new') | |
else: | |
print(" 通常モード (非ヘッドレス) で実行します。") | |
try: | |
if IN_COLAB and gs: | |
print(" Colab環境でgoogle_colab_seleniumを使用します。") | |
driver = gs.Chrome(options=options) | |
elif not IN_COLAB and ChromeService and ChromeDriverManager: | |
try: | |
print(" webdriver-managerを使用してChromeDriverパスを解決します...") | |
service = ChromeService(ChromeDriverManager().install()) | |
driver = webdriver.Chrome(service=service, options=options) | |
print(" ChromeDriver (webdriver-manager) 起動成功。") | |
except Exception as e_wdm: | |
print(f" webdriver-managerでの初期化エラー: {e_wdm}") | |
print(" PATH上のChromeDriverで試行します...") | |
driver = webdriver.Chrome(options=options) | |
print(" ChromeDriver (PATH) 起動成功。") | |
elif not IN_COLAB: | |
print(" PATH上のChromeDriverを使用します...") | |
driver = webdriver.Chrome(options=options) | |
print(" ChromeDriver (PATH) 起動成功。") | |
else: | |
raise Exception("WebDriverを初期化できませんでした。適切なWebDriver設定が見つかりません。") | |
driver.implicitly_wait(3) | |
print("WebDriver初期化完了。") | |
except Exception as e_wd_init: | |
print(f"★★★★★ WebDriver初期化失敗 ★★★★★") | |
print(f"エラータイプ: {type(e_wd_init).__name__}") | |
print(f"エラーメッセージ: {e_wd_init}") | |
print("--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") | |
yield log_stream.getvalue(), None, html_base_output_dir | |
return | |
yield log_stream.getvalue(), None, html_base_output_dir | |
# --- 中断チェック --- | |
if interrupt_event.is_set(): raise InterruptedError("CSV処理開始前に中断リクエスト") | |
# CSVヘッダーを定義 (口コミ本文も個別列にするか検討 → 結合文字列でよさそう) | |
csv_header = ['QueryIndex', 'OriginalQuery', 'ResultRank', 'Status', 'ExtractedName', | |
'ExtractedWebsite', 'ExtractedPhone', 'ExtractedAddress', 'ReviewCount', 'ReviewsCombined', | |
'ExtractionError', 'PlaceURL', 'DetailHTMLFilename'] | |
file_exists = os.path.exists(output_csv_path) | |
file_mode = 'a' if file_exists and os.path.getsize(output_csv_path) > 0 else 'w' | |
print(f"結果CSVファイルを '{file_mode}' モードで開きます (パス: {output_csv_path}, エンコーディング: {CSV_ENCODING})。") | |
yield log_stream.getvalue(), None, html_base_output_dir | |
try: | |
with open(output_csv_path, file_mode, newline='', encoding=CSV_ENCODING, errors='replace') as csv_file: | |
writer = csv.writer(csv_file) | |
if file_mode == 'w': | |
print(" 新規CSVファイルのためヘッダー行を書き込みます。") | |
writer.writerow(csv_header) | |
csv_file.flush() | |
elif file_exists: | |
print(f" 既存ファイル '{os.path.basename(output_csv_path)}' に追記します。") | |
for i, query in enumerate(queries, 1): | |
# --- ループ開始時に中断チェック --- | |
if interrupt_event.is_set(): | |
print(f"\n===== クエリ {i}/{total_queries} の処理開始前に中断リクエストを検出 =====") | |
interrupted_flag = True | |
break # ループを抜ける | |
progress(i / total_queries, desc=f"クエリ {i}/{total_queries} 処理中: {query[:30]}...") | |
start_time_query = time.time() | |
print(f"\n===== クエリ {i}/{total_queries} 開始: '{query}' =====") | |
yield log_stream.getvalue(), None, html_base_output_dir | |
results = [] | |
try: | |
# --- 単一クエリのスクレイピング処理実行 (中断例外をキャッチ) --- | |
# HTML保存先として html_base_output_dir を渡す | |
results = process_single_query_full_list(driver, query, i, html_base_output_dir, wait_config) | |
except InterruptedError as e_interrupt_query: | |
print(f"クエリ {i} の処理が中断されました: {e_interrupt_query}") | |
interrupted_flag = True # メインループに中断を伝える | |
if not any(r['status'] == 'Interrupted' for r in results): | |
results.append({'query_index': i, 'original_query': query, 'result_rank': 'N/A', 'status': 'Interrupted', 'name': f'Interrupted Query {i}', 'url': '', 'phone': 'N/A', 'address': 'N/A', 'reviews': [], 'extraction_error': str(e_interrupt_query), 'place_url': 'N/A', 'html_filename': 'N/A'}) | |
yield log_stream.getvalue(), None, html_base_output_dir | |
# --- 取得結果をCSVに書き込み --- | |
written_count_query = 0 | |
print(f" クエリ {i} の結果をCSVに書き込み中...") | |
for result_data in results: | |
try: | |
reviews_list = result_data.get('reviews', []) | |
review_count = 0 | |
formatted_reviews = "" | |
if isinstance(reviews_list, list) and reviews_list: | |
review_texts = [] | |
for idx, review_item in enumerate(reviews_list): | |
if isinstance(review_item, dict): | |
r_text = str(review_item.get('text', '')).replace('\n', ' ').replace('\r', '') | |
reviewer = review_item.get('reviewer', 'N/A') | |
rating = review_item.get('rating', 'N/A') | |
review_texts.append(f"[{idx+1}] {reviewer} ({rating}): {r_text}") | |
elif isinstance(review_item, str): | |
review_texts.append(f"[{idx+1}] {review_item.replace('n', ' ').replace('r', '')}") | |
formatted_reviews = " || ".join(review_texts) # 区切り文字で結合 | |
review_count = len(reviews_list) | |
elif isinstance(reviews_list, str): # 文字列の場合(エラーメッセージなど) | |
formatted_reviews = reviews_list.replace('\n', ' ').replace('\r', '') | |
extraction_error_msg = result_data.get('extraction_error', '') | |
if extraction_error_msg and len(extraction_error_msg) > 500: | |
extraction_error_msg = extraction_error_msg[:250] + "..." + extraction_error_msg[-250:] | |
# CSVヘッダーに合わせてデータを準備 | |
row_data = [ | |
result_data.get('query_index', i), result_data.get('original_query', query), | |
result_data.get('result_rank', 'N/A'), result_data.get('status', 'Unknown'), | |
result_data.get('name', 'N/A'), result_data.get('url', ''), | |
result_data.get('phone', 'N/A'), result_data.get('address', 'N/A'), | |
review_count, # レビュー数 | |
formatted_reviews, # 結合されたレビュー文字列 | |
extraction_error_msg, | |
result_data.get('place_url', 'N/A'), | |
# HTMLファイル名は output_dir からの相対パス | |
result_data.get('html_filename', 'N/A') | |
] | |
writer.writerow(row_data) | |
written_count_query += 1 | |
except Exception as e_write: | |
print(f"★★★★★ CSV書き込み中にエラーが発生しました (行スキップ) ★★★★★") | |
print(f"エラーデータ (一部): {str(result_data)[:200]}...") | |
print(f"エラータイプ: {type(e_write).__name__}: {e_write}") | |
csv_file.flush() | |
total_results_count += written_count_query | |
processed_query_count += 1 | |
end_time_query = time.time() | |
query_status_msg = "中断" if result_data.get('status') == 'Interrupted' else "完了" | |
print(f"===== クエリ {i}/{total_queries} {query_status_msg} - {written_count_query}件書き込み, 所要時間: {end_time_query - start_time_query:.2f} 秒 =====") | |
# ここで部分的なCSVを yield しないように変更 | |
yield log_stream.getvalue(), None, html_base_output_dir | |
# 中断フラグが立っていたら、ループを終了 | |
if interrupted_flag: | |
print("\n中断リクエストに従い、次のクエリへ進まず処理を終了します。") | |
break | |
# --- クエリ間の待機 (中断可能) --- | |
if i < total_queries and not interrupted_flag: | |
sleep_duration = wait_config['base'] * 1.5 + (hash(query + str(i)) % (wait_config['base'] * 1.5)) | |
sleep_duration = max(wait_config['base'] * 0.8, min(sleep_duration, wait_config['base'] * 4.0)) | |
print(f"次のクエリまで {sleep_duration:.2f} 秒待機します...") | |
yield log_stream.getvalue(), None, html_base_output_dir | |
interruptible_sleep(sleep_duration) | |
# 待機後にも中断チェック | |
if interrupt_event.is_set(): | |
print("待機中に中断リクエストを検出。処理を終了します。") | |
interrupted_flag = True | |
break # ループを抜ける | |
elif interrupted_flag: | |
pass # 中断されたら待機しない | |
else: | |
print("\n全クエリの処理が完了しました。") | |
except IOError as e_io: | |
print(f"★★★★★ CSVファイル '{output_csv_path}' のオープン/書き込み中にIOエラー ★★★★★") | |
print(f"エラータイプ: {type(e_io).__name__}: {e_io}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") | |
print("ファイルが他のプログラムで開かれていないか、書き込み権限があるか確認してください。") | |
output_csv_path = None # 結果ファイルパスを無効化 | |
except Exception as e_csv_loop: | |
print(f"★★★★★ CSV処理ループ中に予期せぬエラー ★★★★★") | |
print(f"エラータイプ: {type(e_csv_loop).__name__}: {e_csv_loop}\n--- Traceback ---\n{traceback.format_exc()}\n----------------------") | |
except InterruptedError: # run_scraping全体で中断をキャッチ | |
print("\n★★★★★ スクレイピング処理がユーザーによって中断されました ★★★★★") | |
interrupted_flag = True # 中断フラグを立てる | |
except Exception as e_main: | |
print(f"\n★★★★★ メイン処理 (run_scraping) 中に予期せぬエラーが発生しました ★★★★★") | |
print(f"エラータイプ: {type(e_main).__name__}: {e_main}") | |
print("\n--- スタックトレース ---\n", traceback.format_exc(), "\n----------------------") | |
finally: | |
# --- 終了処理 --- | |
if driver: | |
print("\nWebDriver終了処理中...") | |
try: | |
driver.quit() | |
print("WebDriver正常終了。") | |
except Exception as e_quit: | |
print(f"★★★★★ WebDriver終了時にエラー ★★★★★") | |
print(f"エラータイプ: {type(e_quit).__name__}: {e_quit}") | |
end_time_total = time.time() | |
total_duration_seconds = end_time_total - start_time_total | |
final_status = "中断" if interrupted_flag else "完了" | |
print(f"\n=== スクレイピング全処理終了 ({final_status}) ===") | |
print(f"終了時刻: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") | |
print(f"処理{final_status}クエリ数: {processed_query_count}/{total_queries if total_queries > 0 else 'N/A'} 件") | |
print(f"CSV書き込み総行数: {total_results_count} 件") | |
print(f"総処理時間: {total_duration_seconds:.2f} 秒 ({total_duration_seconds/60:.2f} 分)") | |
if interrupted_flag: | |
print("*** スクレイピング処理は途中で中断されました ***") | |
final_log = log_stream.getvalue() | |
# プログレスバーを完了状態にする | |
progress(1.0, desc=f"スクレイピング処理 {final_status}") | |
# 最終的なCSVファイルのパスを返す | |
final_csv_output = None | |
if output_csv_path and os.path.exists(output_csv_path) and os.path.getsize(output_csv_path) > 0: | |
print(f"結果CSVファイル: {output_csv_path}") | |
final_csv_output = gr.File(value=output_csv_path, label=f"スクレイピング結果CSV ({final_status})") | |
elif output_csv_path: | |
print(f"警告: 結果CSVファイル '{output_csv_path}' は空または存在しません。") | |
else: | |
print("結果CSVファイルは生成されませんでした。") | |
# HTMLフォルダパスも返す | |
yield final_log, final_csv_output, html_base_output_dir | |
# --- Helper Functions (From Script 2) --- | |
def normalize_folder_path(folder_path): | |
"""フォルダパスを正規化する関数""" | |
try: | |
if not isinstance(folder_path, str): return None | |
# 引用符や余分なスペースを削除 | |
folder_path = folder_path.strip().strip('"').strip("'") | |
# バックスラッシュをスラッシュに変換し、正規化 | |
folder_path = os.path.normpath(folder_path).replace("\\", "/") | |
return folder_path | |
except Exception as e: | |
print(f"フォルダパス正規化エラー: {e}") | |
return None | |
def extract_shop_name_from_html_filename(filename): | |
"""HTMLファイル名から店名を抽出する関数 (拡張)""" | |
try: | |
# 例: R001_店舗名_reviews_expanded.html | |
# 例: R001_店舗名_overview.html | |
# 例: Q001_R001_店舗名_クエリ_detail_reviews_expanded.html (古い形式も考慮) | |
base = os.path.basename(filename) | |
# まず拡張子と既知の接尾辞を削除 | |
base = re.sub(r'(_reviews_expanded|_overview)?\.html$', '', base) | |
# ランキング部分 (Rxxx_ または Qxxx_Rxxx_) を削除 | |
base = re.sub(r'^(Q\d+_)?R\d+_', '', base) | |
# 古い形式の可能性のある接尾辞を削除 | |
base = re.sub(r'_detail_overview$|_detail_reviews_expanded$|_detailRESS$', '', base) | |
# 残った部分を店名とする (前後のアンダースコアや空白トリム) | |
shop_name = base.replace('_', ' ').strip() | |
return shop_name if shop_name else filename | |
except: | |
return filename # エラー時は元のファイル名を返す | |
def collect_reviews_from_html(folder_path, progress=gr.Progress()): | |
"""指定フォルダ内のHTMLから口コミデータを収集する関数""" | |
reviews_data = [] | |
log_stream = io.StringIO() | |
# フォルダパスを正規化 | |
folder_path = normalize_folder_path(folder_path) | |
if not folder_path: | |
print("エラー: 無効なHTMLフォルダパスです。", file=log_stream) | |
return pd.DataFrame(), log_stream.getvalue() | |
# フォルダの存在確認 | |
if not os.path.exists(folder_path): | |
print(f"エラー: フォルダ '{folder_path}' が見つかりません。", file=log_stream) | |
return pd.DataFrame(), log_stream.getvalue() | |
# フォルダ内のすべてのHTMLファイルを処理対象とする | |
try: | |
all_files = [] | |
# 再帰的にサブディレクトリも探索 | |
for root, _, files in os.walk(folder_path): | |
for filename in files: | |
if filename.lower().endswith(".html"): | |
all_files.append(os.path.join(root, filename)) | |
if not all_files: | |
print(f"警告: '{folder_path}' 以下にHTMLファイルが見つかりません。", file=log_stream) | |
return pd.DataFrame(), log_stream.getvalue() | |
print(f"処理対象のHTMLファイル数: {len(all_files)}", file=log_stream) | |
total_files = len(all_files) | |
for i, file_path in enumerate(all_files): | |
progress(i / total_files, desc=f"HTML解析中 {i}/{total_files}") | |
filename = os.path.basename(file_path) | |
relative_path = os.path.relpath(file_path, folder_path) # ベースフォルダからの相対パス | |
# 店名をファイル名から抽出 | |
shop_name = extract_shop_name_from_html_filename(filename) | |
# HTMLファイルを読み込む | |
try: | |
with open(file_path, "r", encoding="utf-8") as file: | |
html_content = file.read() | |
except Exception as e: | |
print(f"ファイル '{relative_path}' の読み込みエラー: {e}", file=log_stream) | |
continue | |
# BeautifulSoupでパース | |
soup = BeautifulSoup(html_content, 'lxml' if 'lxml' in globals() else 'html.parser') | |
# 基本情報も念のため抽出 (h1があれば店名として優先) | |
h1_tag = soup.find('h1') | |
if h1_tag: | |
shop_name = h1_tag.get_text(strip=True) | |
# 口コミカードを取得 (Script 1の抽出ロジックに合わせる: jftiEf or MyEned) | |
review_card_selectors = ['div.jftiEf', 'div.MyEned'] | |
review_cards = [] | |
for sel in review_card_selectors: | |
review_cards = soup.select(sel) | |
if review_cards: | |
break | |
if not review_cards: | |
# print(f"ファイル '{relative_path}' に口コミデータが見つかりません (jftiEf/MyEned)。", file=log_stream) | |
# 口コミがなくても、店舗情報だけは記録するかもしれない(オプション) | |
# reviews_data.append({ | |
# "ファイル名": relative_path, "店名": shop_name, "投稿者": "N/A", | |
# "投稿者情報": "N/A", "評価": "N/A", "投稿時期": "N/A", | |
# "口コミ本文": "口コミなし", "オーナーからの返信": "N/A" | |
# }) | |
continue | |
for card_idx, card in enumerate(review_cards): | |
try: | |
# 投稿者名 (.d4r55) | |
reviewer_el = card.select_one('.d4r55') | |
reviewer_name = reviewer_el.get_text(strip=True) if reviewer_el else "不明" | |
# 投稿者情報(ローカルガイド情報など .RfnDt) | |
reviewer_info_el = card.select_one('.RfnDt') | |
reviewer_details = reviewer_info_el.get_text(strip=True) if reviewer_info_el else "なし" | |
# 評価 (.kvMYJc aria-label) | |
rating_el = card.select_one('.kvMYJc') | |
rating_value = "不明" | |
if rating_el and rating_el.get('aria-label'): | |
match = re.search(r'星 (\d+(\.\d+)?)', rating_el['aria-label']) | |
if match: rating_value = f"星 {match.group(1)}" # "星 X.X" 形式で保存 | |
# 投稿時期 (.rsqaWe) | |
post_time_el = card.select_one('.rsqaWe') | |
post_time_value = post_time_el.get_text(strip=True) if post_time_el else "不明" | |
# 口コミ本文 (span.wiI7pd 優先) | |
review_text_el = card.select_one('span.wiI7pd') | |
if not review_text_el: | |
review_text_el = card.select_one('span[jscontroller="MZnM8e"]') # フォールバック | |
review_content = review_text_el.get_text(strip=True) if review_text_el else "なし" | |
# オーナーからの返信 (.CDe7pd) - 注意: これは古いセレクタかもしれない | |
# 新しい構造では返信は別のdiv構造になっている可能性がある | |
# 簡単のため、一旦 .CDe7pd を試す | |
owner_response_el = card.select_one('.CDe7pd') | |
owner_response_text = owner_response_el.get_text(strip=True) if owner_response_el else "なし" | |
# データを辞書形式で保存 | |
review_data = { | |
"ファイル名": relative_path, | |
"店名": shop_name, | |
"投稿者": reviewer_name, | |
"投稿者情報": reviewer_details, | |
"評価": rating_value, | |
"投稿時期": post_time_value, | |
"口コミ本文": review_content, | |
"オーナーからの返信": owner_response_text | |
} | |
reviews_data.append(review_data) | |
except Exception as e_card: | |
print(f"ファイル '{relative_path}' の口コミカード {card_idx+1} の解析中にエラー: {e_card}", file=log_stream) | |
progress(1.0, desc="HTML解析完了") | |
except Exception as e_folder: | |
print(f"フォルダ '{folder_path}' の処理中に予期せぬエラー: {e_folder}", file=log_stream) | |
print(traceback.format_exc(), file=log_stream) | |
return pd.DataFrame(), log_stream.getvalue() | |
# DataFrameに変換 | |
df = pd.DataFrame(reviews_data) | |
if df.empty: | |
print("警告: 口コミデータが収集できませんでした。", file=log_stream) | |
else: | |
print(f"収集された口コミデータ件数: {len(df)}", file=log_stream) | |
# print("DataFrameの列:", df.columns.tolist(), file=log_stream) # デバッグ用 | |
return df, log_stream.getvalue() | |
# --- Functions for Review Search Tab --- | |
def get_csv_columns_safe(csv_file_obj): | |
"""アップロードされたCSVファイルから安全に列名を取得する""" | |
if csv_file_obj is None: | |
return gr.Dropdown(choices=[], label="検索対象の列 (CSVをアップロードしてください)") | |
try: | |
# pandasで読み込んで列名を取得 | |
# TODO: エンコーディング自動判別を追加した方が良いかも | |
df_peek = pd.read_csv(csv_file_obj.name, nrows=5) # 先頭数行だけ読む | |
columns = df_peek.columns.tolist() | |
# "ReviewsCombined" や "口コミ" など、検索に適した列をデフォルトで選択させる候補 | |
default_col = next((c for c in columns if c.lower() in ['reviewscombined', '口コミ', '口コミ本文', 'text', 'review']), columns[0] if columns else None) | |
return gr.Dropdown(choices=columns, value=default_col, label="検索対象の列") | |
except Exception as e: | |
print(f"CSV列名取得エラー: {e}") | |
return gr.Dropdown(choices=[], label=f"列名取得エラー: {e}") | |
def search_reviews_controller(search_source, html_folder_path, uploaded_csv_file, search_column, keyword, progress=gr.Progress()): | |
"""口コミ検索のコントローラー関数""" | |
log_stream = io.StringIO() | |
df = pd.DataFrame() | |
search_results_df = pd.DataFrame() | |
temp_csv_path = None | |
results_text = "" | |
print(f"検索ソース: {search_source}", file=log_stream) | |
try: | |
if search_source == "HTMLフォルダから検索": | |
if not html_folder_path: | |
results_text = "エラー: HTMLフォルダパスを入力してください。" | |
return results_text, None, None, log_stream.getvalue() | |
print(f"HTMLフォルダから口コミを収集中: {html_folder_path}", file=log_stream) | |
df, collect_log = collect_reviews_from_html(html_folder_path, progress) | |
log_stream.write(collect_log) | |
search_col_actual = "口コミ本文" # HTMLからの場合はこの列を検索 | |
if df.empty: | |
results_text = f"エラー: フォルダ '{html_folder_path}' から口コミデータが収集できませんでした。" | |
elif search_col_actual not in df.columns: | |
results_text = f"エラー: 収集したデータに '{search_col_actual}' 列が見つかりません。" | |
elif search_source == "CSVファイルから検索": | |
if uploaded_csv_file is None: | |
results_text = "エラー: 検索対象のCSVファイルをアップロードしてください。" | |
return results_text, None, None, log_stream.getvalue() | |
if not search_column: | |
results_text = "エラー: 検索対象の列を選択してください。" | |
return results_text, None, None, log_stream.getvalue() | |
print(f"アップロードされたCSVから検索: {os.path.basename(uploaded_csv_file.name)}, 列: {search_column}", file=log_stream) | |
try: | |
# TODO: エンコーディングを考慮 | |
df = pd.read_csv(uploaded_csv_file.name) | |
search_col_actual = search_column | |
if search_col_actual not in df.columns: | |
results_text = f"エラー: アップロードされたCSVに列 '{search_col_actual}' が見つかりません。" | |
except Exception as e_csv: | |
results_text = f"エラー: CSVファイルの読み込みに失敗しました。ファイル形式やエンコーディングを確認してください。\n{e_csv}" | |
else: | |
results_text = "エラー: 不明な検索ソースです。" | |
# データフレームと検索列が有効かチェック | |
if results_text: # 上記のいずれかでエラーが発生した場合 | |
pass | |
elif df.empty: | |
if search_source == "HTMLフォルダから検索": | |
results_text = "情報: 収集された口コミデータがありませんでした。" | |
else: | |
results_text = "エラー: CSVからデータを読み込めませんでした。" | |
elif not keyword or keyword.strip() == "": | |
results_text = "情報: キーワードが入力されていません。全件表示します。" | |
search_results_df = df # キーワード空欄時は全件 | |
else: | |
keyword = keyword.strip() | |
print(f"キーワード '{keyword}' で列 '{search_col_actual}' を検索中...", file=log_stream) | |
try: | |
# NaNを空文字列に変換してから検索 | |
search_results_df = df[df[search_col_actual].fillna('').astype(str).str.contains(keyword, case=False, na=False)] | |
count = len(search_results_df) | |
if count > 0: | |
results_text = f"キーワード '{keyword}' を含む口コミが {count} 件見つかりました。" | |
print(results_text, file=log_stream) | |
else: | |
results_text = f"キーワード '{keyword}' を含む口コミは見つかりませんでした。" | |
print(results_text, file=log_stream) | |
except KeyError: | |
results_text = f"エラー: DataFrameに検索対象列 '{search_col_actual}' が見つかりません。" | |
print(results_text, file=log_stream) | |
except Exception as e_search: | |
results_text = f"検索中にエラーが発生しました: {e_search}" | |
print(results_text, file=log_stream) | |
print(traceback.format_exc(), file=log_stream) | |
# 結果をCSVに保存 (検索結果がある場合) | |
if not search_results_df.empty: | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix=".csv", mode="w", encoding="utf-8-sig") as temp_file: | |
search_results_df.to_csv(temp_file.name, index=False) | |
temp_csv_path = temp_file.name | |
print(f"検索結果を一時CSVファイルに保存しました: {temp_csv_path}", file=log_stream) | |
except Exception as e_csv_save: | |
print(f"検索結果のCSV保存中にエラー: {e_csv_save}", file=log_stream) | |
results_text += "\n警告: 検索結果のCSV保存に失敗しました。" | |
# テーブル表示用に列を絞る(存在しない列は無視) | |
display_columns = ['店名', '投稿者', '評価', '投稿時期', '口コミ本文', 'オーナーからの返信', 'ファイル名'] | |
if search_source == "CSVファイルから検索" and not search_results_df.empty: | |
# CSVからの場合、元の列名を優先しつつ、なければHTML由来の列名も試す | |
available_cols = search_results_df.columns.tolist() | |
display_columns = [col for col in available_cols if col in display_columns or col == search_column] # 検索列も表示 | |
# 存在しない列を除外してDataFrameを返す | |
display_df = search_results_df[[col for col in display_columns if col in search_results_df.columns]] if not search_results_df.empty else pd.DataFrame() | |
except Exception as e_controller: | |
error_msg = f"口コミ検索コントローラーで予期せぬエラー: {e_controller}" | |
print(error_msg, file=log_stream) | |
print(traceback.format_exc(), file=log_stream) | |
results_text = error_msg | |
return results_text, display_df, gr.File(value=temp_csv_path if temp_csv_path else None), log_stream.getvalue() | |
def export_all_reviews_controller(search_source, html_folder_path, uploaded_csv_file, progress=gr.Progress()): | |
"""全口コミデータをCSVにエクスポートするコントローラー関数""" | |
log_stream = io.StringIO() | |
df = pd.DataFrame() | |
temp_csv_path = None | |
results_text = "" | |
print(f"全件エクスポート開始。ソース: {search_source}", file=log_stream) | |
try: | |
if search_source == "HTMLフォルダから検索": | |
if not html_folder_path: | |
results_text = "エラー: HTMLフォルダパスを入力してください。" | |
return results_text, None, log_stream.getvalue() | |
print(f"HTMLフォルダから全口コミを収集中: {html_folder_path}", file=log_stream) | |
df, collect_log = collect_reviews_from_html(html_folder_path, progress) | |
log_stream.write(collect_log) | |
if df.empty: | |
results_text = f"情報: フォルダ '{html_folder_path}' から収集できる口コミデータがありませんでした。" | |
elif search_source == "CSVファイルから検索": | |
if uploaded_csv_file is None: | |
results_text = "エラー: 対象のCSVファイルをアップロードしてください。" | |
return results_text, None, log_stream.getvalue() | |
print(f"アップロードされたCSVをエクスポート対象として読み込み中: {os.path.basename(uploaded_csv_file.name)}", file=log_stream) | |
try: | |
# アップロードされたCSVをそのままデータフレームとする | |
df = pd.read_csv(uploaded_csv_file.name) | |
if df.empty: | |
results_text = "情報: アップロードされたCSVは空です。" | |
except Exception as e_csv: | |
results_text = f"エラー: CSVファイルの読み込みに失敗しました。\n{e_csv}" | |
else: | |
results_text = "エラー: 不明な検索ソースです。" | |
# データフレームが有効で、空でない場合にCSVエクスポート | |
if not df.empty: | |
try: | |
with tempfile.NamedTemporaryFile(delete=False, suffix="_all.csv", mode="w", encoding="utf-8-sig") as temp_file: | |
df.to_csv(temp_file.name, index=False) | |
temp_csv_path = temp_file.name | |
results_text = f"全 {len(df)} 件のデータをCSVファイルにエクスポートしました。" | |
print(results_text, file=log_stream) | |
print(f"エクスポートファイル: {temp_csv_path}", file=log_stream) | |
except Exception as e_csv_save: | |
results_text = f"全件CSVエクスポート中にエラー: {e_csv_save}" | |
print(results_text, file=log_stream) | |
print(traceback.format_exc(), file=log_stream) | |
except Exception as e_controller: | |
error_msg = f"全件エクスポートコントローラーで予期せぬエラー: {e_controller}" | |
print(error_msg, file=log_stream) | |
print(traceback.format_exc(), file=log_stream) | |
results_text = error_msg | |
return results_text, gr.File(value=temp_csv_path if temp_csv_path else None), log_stream.getvalue() | |
# --- Gradio UI 定義 (統合版) --- | |
with gr.Blocks(theme=gr.themes.Soft()) as demo: | |
gr.Markdown("# Google Maps スクレイピング & 口コミ検索ツール") | |
gr.Markdown( | |
""" | |
**タブ1:** Google Mapsから店舗情報をスクレイピングし、結果をCSVとHTMLファイルに出力します。 | |
**タブ2:** スクレイピングで保存されたHTMLフォルダ、またはアップロードしたCSVファイルから口コミ情報を検索・エクスポートします。 | |
""" | |
) | |
with gr.Tabs(): | |
with gr.TabItem("① スクレイピング実行"): | |
gr.Markdown("### Google Maps スクレイピング設定") | |
gr.Markdown( | |
""" | |
CSVクエリで検索し、詳細ページで「クチコミ」タブをクリック後、口コミエリアを**最後までスクロール**し、 | |
さらに**「もっと見る」ボタンを全てクリック**して全件表示を試みます。 | |
その後、基本情報と口コミ情報を抽出し、結果CSVとHTMLファイル群を出力します。 | |
HTMLファイルはクエリごとにサブディレクトリに保存されます。 | |
**「処理中断」ボタン**で進行中のスクレイピング処理を安全に停止できます(現在のクエリ完了後)。 | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=2): | |
gr.Markdown("#### 入力ファイルと出力設定") | |
input_csv_file_scrape = gr.File(label="検索クエリCSVファイル (1列目のみ使用)", file_types=[".csv"]) | |
output_dir_name_scrape = gr.Textbox(label="出力先ベースディレクトリ名", value="gmap_scraping_output") | |
output_csv_name_scrape = gr.Textbox(label="出力CSVファイル名 (ベースディレクトリ内)", value="scraping_results.csv") | |
csv_encoding_scrape = gr.Dropdown(label="出力CSVエンコーディング", choices=['utf-8-sig', 'cp932'], value='utf-8-sig') | |
headless_mode_scrape = gr.Checkbox(label="ヘッドレスモードで実行 (エラー発生時はOFF推奨)", value=True) | |
with gr.Column(scale=1): | |
gr.Markdown("#### 待機時間設定 (秒)") | |
wait_time_base_scrape = gr.Number(label="基本待機", minimum=1, maximum=20, step=0.5, value=4) | |
wait_time_detail_scrape = gr.Number(label="詳細/口コミ最大待機", minimum=10, maximum=60, step=1, value=25) | |
wait_time_search_scrape = gr.Number(label="検索リスト最大待機", minimum=5, maximum=60, step=1, value=15) | |
with gr.Row(): | |
start_button_scrape = gr.Button("スクレイピング開始", variant="primary", size="lg", scale=3) | |
stop_button_scrape = gr.Button("処理中断", variant="stop", size="lg", scale=1) | |
gr.Markdown("#### 処理ステータスとエラーログ") | |
progress_bar_scrape = gr.Progress(track_tqdm=True) | |
status_textbox_scrape = gr.Textbox(label="ログ", lines=15, interactive=False, autoscroll=True, max_lines=2000) | |
gr.Markdown("#### 結果") | |
output_csv_download_scrape = gr.File(label="結果CSVダウンロード", interactive=False) | |
# HTMLフォルダパスを表示するためのテキストボックス (読み取り専用) | |
html_output_folder_path_display = gr.Textbox(label="HTML保存先フォルダパス (口コミ検索タブで使用)", interactive=False) | |
with gr.TabItem("② 口コミ検索"): | |
gr.Markdown("### 口コミ検索・エクスポート") | |
gr.Markdown( | |
""" | |
**検索ソース**を選択し、HTMLフォルダパスまたはCSVファイルを指定して口コミを検索・エクスポートします。 | |
- **HTMLフォルダから検索:** タブ1で出力されたHTMLファイル群が含まれる**ベースディレクトリ内の `html_files` フォルダ**、または他のHTMLファイル群を含むフォルダを指定してください。 | |
- **CSVファイルから検索:** タブ1で出力された結果CSV、または同様の形式のCSVファイルをアップロードしてください。 | |
""" | |
) | |
with gr.Row(): | |
with gr.Column(scale=1): | |
search_source_review = gr.Radio( | |
choices=["HTMLフォルダから検索", "CSVファイルから検索"], | |
label="検索ソースを選択", | |
value="HTMLフォルダから検索" | |
) | |
html_folder_path_review = gr.Textbox( | |
label="HTMLフォルダパス", | |
placeholder="例: gmap_scraping_output/html_files", | |
visible=True # 初期表示 | |
) | |
uploaded_csv_review = gr.File( | |
label="検索対象CSVファイル", | |
file_types=[".csv"], | |
visible=False # 初期非表示 | |
) | |
search_column_review = gr.Dropdown( | |
label="検索対象の列 (CSV選択時)", | |
choices=[], | |
interactive=True, | |
visible=False # 初期非表示 | |
) | |
keyword_review = gr.Textbox(label="検索キーワード (空欄で全件)") | |
search_button_review = gr.Button("検索実行", variant="primary") | |
export_all_button_review = gr.Button("全件CSVエクスポート") | |
with gr.Column(scale=2): | |
gr.Markdown("#### 検索/エクスポート結果") | |
status_textbox_review = gr.Textbox(label="処理状況", lines=5, interactive=False) | |
output_table_review = gr.Dataframe(label="検索結果(テーブル)") | |
search_csv_output_review = gr.File(label="検索結果CSVダウンロード", interactive=False) | |
all_reviews_csv_output_review = gr.File(label="全件エクスポートCSVダウンロード", interactive=False) | |
progress_bar_review = gr.Progress(track_tqdm=True) # 口コミ収集/エクスポート用 | |
# --- イベントハンドラ定義 --- | |
# --- タブ1: スクレイピング --- | |
start_button_scrape.click( | |
fn=run_scraping, | |
inputs=[input_csv_file_scrape, output_dir_name_scrape, output_csv_name_scrape, csv_encoding_scrape, | |
wait_time_base_scrape, wait_time_detail_scrape, wait_time_search_scrape, headless_mode_scrape], | |
outputs=[status_textbox_scrape, output_csv_download_scrape, html_output_folder_path_display], | |
# progress 引数は Gradio 側で自動的に渡される (show_progress='full' の場合) | |
) | |
stop_button_scrape.click(fn=request_interrupt, inputs=None, outputs=status_textbox_scrape) # ログに中断リクエストを表示 | |
# --- タブ2: 口コミ検索 --- | |
# 検索ソースの選択に応じてUI表示を切り替え | |
def update_review_source_ui(source): | |
if source == "HTMLフォルダから検索": | |
return { | |
html_folder_path_review: gr.Textbox(visible=True), | |
uploaded_csv_review: gr.File(visible=False, value=None), # クリア | |
search_column_review: gr.Dropdown(visible=False, value=None, choices=[]) # クリア | |
} | |
elif source == "CSVファイルから検索": | |
return { | |
html_folder_path_review: gr.Textbox(visible=False, value=""), # クリア | |
uploaded_csv_review: gr.File(visible=True), | |
search_column_review: gr.Dropdown(visible=True) # 列選択を表示 | |
} | |
else: | |
return { # デフォルト | |
html_folder_path_review: gr.Textbox(visible=True), | |
uploaded_csv_review: gr.File(visible=False, value=None), | |
search_column_review: gr.Dropdown(visible=False, value=None, choices=[]) | |
} | |
search_source_review.change( | |
fn=update_review_source_ui, | |
inputs=search_source_review, | |
outputs=[html_folder_path_review, uploaded_csv_review, search_column_review] | |
) | |
# CSVアップロード時に列名を取得してドロップダウンを更新 | |
uploaded_csv_review.upload( | |
fn=get_csv_columns_safe, | |
inputs=uploaded_csv_review, | |
outputs=search_column_review | |
) | |
# 検索ボタンのクリック | |
search_button_review.click( | |
fn=search_reviews_controller, | |
inputs=[search_source_review, html_folder_path_review, uploaded_csv_review, search_column_review, keyword_review], | |
outputs=[status_textbox_review, output_table_review, search_csv_output_review, status_textbox_scrape] # 最後のログはデバッグ用に入れておく | |
) | |
# 全件エクスポートボタンのクリック | |
export_all_button_review.click( | |
fn=export_all_reviews_controller, | |
inputs=[search_source_review, html_folder_path_review, uploaded_csv_review], | |
outputs=[status_textbox_review, all_reviews_csv_output_review, status_textbox_scrape] # 最後のログはデバッグ用 | |
) | |
# --- UI起動 --- | |
print("Gradio UIを起動します...") | |
# queue()で複数ユーザー対応、share=Trueで共有リンク生成 (Colabでは自動的に共有リンク) | |
# launch() に debug=True をつけるとリロードなどが有効になるが、不安定になることもある | |
demo.queue().launch(share=False, debug=False) |