|
import requests |
|
from lxml import html |
|
from collections import deque |
|
import json |
|
import time |
|
import os |
|
|
|
|
|
|
|
from selenium import webdriver |
|
from selenium.webdriver.chrome.service import Service |
|
|
|
|
|
|
|
|
|
|
|
from selenium.webdriver.common.by import By |
|
from selenium.webdriver.support.ui import WebDriverWait |
|
from selenium.webdriver.support import expected_conditions as EC |
|
from selenium.common.exceptions import TimeoutException, NoSuchElementException, WebDriverException |
|
|
|
|
|
from PIL import Image |
|
from io import BytesIO |
|
|
|
|
|
def set_screenshot(driver, images=[]): |
|
png = driver.get_screenshot_as_png() |
|
image = Image.open(BytesIO(png)) |
|
images.append(image) |
|
return images |
|
|
|
|
|
def get_chrome_options(): |
|
options = webdriver.ChromeOptions() |
|
options.add_argument('--headless') |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('--disable-dev-shm-usage') |
|
|
|
return options |
|
|
|
|
|
def set_driver(): |
|
options = get_chrome_options() |
|
|
|
try: |
|
web_driver = webdriver.Chrome(options=options) |
|
web_driver.set_window_size(1080, 720) |
|
except WebDriverException as e: |
|
return Image.new('RGB', (1, 1)) |
|
|
|
return web_driver |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_webdriver_instance(browser_type="chrome"): |
|
if browser_type.lower() == "chrome": |
|
chrome_options = get_chrome_options() |
|
try: |
|
|
|
service = Service(executable_path="/usr/bin/chromedriver") |
|
driver = webdriver.Chrome(service=service, options=chrome_options) |
|
return driver |
|
except WebDriverException as e: |
|
print(f"Error initializing ChromeDriver. Error: {e}") |
|
return None |
|
else: |
|
raise ValueError("Unsupported browser type.") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def save_crawl_state(to_visit_deque, visited_set, song_urls_list, state_filename="crawl_state.json", |
|
song_pages_json_file="pagalgana_song_pages.json"): |
|
"""Saves the current state of the crawler to JSON files.""" |
|
try: |
|
with open(song_pages_json_file, 'w', encoding='utf-8') as f: |
|
json.dump(song_urls_list, f, indent=4) |
|
|
|
crawl_state_data = { |
|
"to_visit": list(to_visit_deque), |
|
"visited_urls": list(visited_set) |
|
} |
|
with open(state_filename, 'w', encoding='utf-8') as f: |
|
json.dump(crawl_state_data, f, indent=4) |
|
print( |
|
f"--- Crawl state saved. URLs to visit: {len(to_visit_deque)}, Visited: {len(visited_set)}, Song pages found: {len(song_urls_list)} ---") |
|
except IOError as e: |
|
print(f"Error saving crawl state: {e}") |
|
except Exception as e: |
|
print(f"An unexpected error occurred while saving state: {e}") |
|
|
|
|
|
def load_crawl_state(state_filename="crawl_state.json", song_pages_json_file="pagalgana_song_pages.json"): |
|
"""Loads previous crawl state if files exist.""" |
|
to_visit_deque = deque() |
|
visited_set = set() |
|
song_urls_list = [] |
|
|
|
if os.path.exists(song_pages_json_file): |
|
try: |
|
with open(song_pages_json_file, 'r', encoding='utf-8') as f: |
|
song_urls_list = json.load(f) |
|
print(f"Loaded {len(song_urls_list)} song URLs from '{song_pages_json_file}'.") |
|
except json.JSONDecodeError: |
|
print(f"Warning: '{song_pages_json_file}' corrupted or empty. Starting fresh song list.") |
|
song_urls_list = [] |
|
except Exception as e: |
|
print(f"Error loading '{song_pages_json_file}': {e}") |
|
|
|
if os.path.exists(state_filename): |
|
try: |
|
with open(state_filename, 'r', encoding='utf-8') as f: |
|
crawl_state_data = json.load(f) |
|
to_visit_deque = deque(crawl_state_data.get("to_visit", [])) |
|
visited_set = set(crawl_state_data.get("visited_urls", [])) |
|
print(f"Loaded crawl state: {len(to_visit_deque)} URLs to visit, {len(visited_set)} visited.") |
|
except json.JSONDecodeError: |
|
print(f"Warning: '{state_filename}' corrupted or empty. Starting fresh state.") |
|
to_visit_deque = deque() |
|
visited_set = set() |
|
except Exception as e: |
|
print(f"Error loading '{state_filename}': {e}") |
|
|
|
return to_visit_deque, visited_set, song_urls_list |
|
|
|
|
|
def crawl_pagalgana_site(base_url: str, song_pages_json_file: str, max_crawl_depth: int, state_filename: str, |
|
save_interval: int,images): |
|
""" |
|
Crawls Pagalgana.com to find and save song page URLs. |
|
Supports resuming a crawl. |
|
""" |
|
|
|
driver = set_driver() |
|
if not driver: |
|
print("Failed to initialize WebDriver. Exiting.") |
|
return [] |
|
|
|
to_visit, visited_urls, song_page_urls = load_crawl_state(state_filename, song_pages_json_file) |
|
|
|
if not to_visit and not visited_urls: |
|
print("No previous crawl state found. Starting fresh.") |
|
to_visit.append((base_url, 0)) |
|
else: |
|
print("Resuming crawl from previous state.") |
|
if base_url not in visited_urls and (base_url, 0) not in to_visit: |
|
to_visit.appendleft((base_url, 0)) |
|
|
|
AUDIO_CONTAINER_XPATH = '//*[@id="audio-container"]' |
|
LOAD_MORE_BUTTON_XPATH = '//a[@class="button" and contains(@onclick, "loadMoreCategory")]' |
|
|
|
print(f"Starting/Resuming crawl with base: {base_url}, max depth: {max_crawl_depth}") |
|
print( |
|
f"Initial Queue size: {len(to_visit)}, Initial Visited size: {len(visited_urls)}, Song page URLs: {len(song_page_urls)}") |
|
|
|
processed_count = 0 |
|
while to_visit: |
|
current_url, current_depth = to_visit.popleft() |
|
|
|
if current_url in visited_urls: |
|
continue |
|
|
|
if current_depth > max_crawl_depth: |
|
print(f"Skipping {current_url} - max depth reached ({max_crawl_depth})") |
|
continue |
|
|
|
print(f"\n--- Visiting ({current_depth}): {current_url} ---") |
|
visited_urls.add(current_url) |
|
processed_count += 1 |
|
|
|
try: |
|
driver.get(current_url) |
|
time.sleep(3) |
|
|
|
print(f" Page title: {driver.title}") |
|
print(f" Current URL after load: {driver.current_url}") |
|
images=set_screenshot(driver=driver,images=images) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if "Attention Required" in driver.title or "cloudflare" in driver.page_source.lower(): |
|
print( |
|
" --> Cloudflare challenge detected! Try switching to undetected_chromedriver or add a longer sleep.") |
|
print(" --> Skipping current URL due to Cloudflare challenge.") |
|
images = set_screenshot(driver=driver, images=images) |
|
continue |
|
|
|
|
|
audio_container_elements = driver.find_elements(By.XPATH, AUDIO_CONTAINER_XPATH) |
|
if audio_container_elements: |
|
print(f" --> FOUND AUDIO CONTAINER! This is a song page: {current_url}") |
|
if current_url not in song_page_urls: |
|
song_page_urls.append(current_url) |
|
|
|
|
|
load_more_found_and_clicked = False |
|
while True: |
|
try: |
|
load_more_button = WebDriverWait(driver, 15).until( |
|
EC.element_to_be_clickable((By.XPATH, LOAD_MORE_BUTTON_XPATH)) |
|
) |
|
|
|
last_height = driver.execute_script("return document.body.scrollHeight") |
|
|
|
print(" Clicking 'Load More' button...") |
|
load_more_button.click() |
|
load_more_found_and_clicked = True |
|
|
|
new_height = last_height |
|
scroll_attempts = 0 |
|
while new_height == last_height and scroll_attempts < 7: |
|
time.sleep(2) |
|
new_height = driver.execute_script("return document.body.scrollHeight") |
|
scroll_attempts += 1 |
|
|
|
if new_height == last_height: |
|
print(" No more content loaded after click, or button disappeared.") |
|
break |
|
|
|
except (NoSuchElementException, TimeoutException): |
|
if not load_more_found_and_clicked: |
|
print(" 'Load More' button not found or not clickable.") |
|
else: |
|
print(" 'Load More' button no longer present (all content likely loaded).") |
|
break |
|
except Exception as e: |
|
print(f" Error clicking 'Load More': {e}") |
|
break |
|
|
|
|
|
tree = html.fromstring(driver.page_source) |
|
|
|
|
|
links = tree.xpath('//a/@href') |
|
print(f" Found {len(links)} raw links on the page.") |
|
|
|
links_added_to_queue = 0 |
|
for link in links: |
|
absolute_url = requests.compat.urljoin(current_url, link) |
|
|
|
if "pagalgana.com" in absolute_url and "#" not in absolute_url and "?" not in absolute_url: |
|
if not (absolute_url.endswith( |
|
('.mp3', '.zip', '.rar', '.jpg', '.png', '.gif', '.pdf', '.txt', '.xml', '.css', '.js'))): |
|
if absolute_url not in visited_urls and (absolute_url, current_depth + 1) not in to_visit: |
|
if absolute_url not in song_page_urls: |
|
to_visit.append((absolute_url, current_depth + 1)) |
|
links_added_to_queue += 1 |
|
|
|
|
|
except Exception as e: |
|
print(f" An unexpected error occurred for {current_url}: {e}") |
|
finally: |
|
if processed_count % save_interval == 0: |
|
print(f"--- Processed {processed_count} pages. Saving current crawl state... ---") |
|
save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file) |
|
|
|
driver.quit() |
|
|
|
print("\n--- Crawl finished. Performing final save of song page URLs. ---") |
|
save_crawl_state(to_visit, visited_urls, song_page_urls, state_filename, song_pages_json_file) |
|
print(f"\nCrawl complete. Total {len(song_page_urls)} song pages found and saved to '{song_pages_json_file}'.") |
|
images = set_screenshot(driver=driver, images=images) |
|
return song_page_urls,images |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
|
|
|
images=[] |
|
discovered_urls,images = crawl_pagalgana_site( |
|
base_url="https://pagalgana.com/category/bollywood-mp3-songs.html", |
|
song_pages_json_file="bollywood_song_pages.json", |
|
state_filename="bollywood_crawl_state.json", |
|
max_crawl_depth=2, |
|
save_interval=5, |
|
images=images |
|
) |
|
print(f"Crawler finished. Discovered {len(discovered_urls)} song URLs.") |
|
|