|
import requests |
|
from bs4 import BeautifulSoup |
|
import pandas as pd |
|
from urllib.parse import quote |
|
import streamlit as st |
|
import json |
|
import time |
|
|
|
from selenium import webdriver |
|
from selenium.common.exceptions import WebDriverException |
|
from selenium.webdriver.common.by import By |
|
|
|
@st.cache_data |
|
def scrape_klikindomaret(nama_barang, num_items): |
|
products = [] |
|
page = 1 |
|
query = quote(nama_barang) |
|
progress_text = "Scraping in progress. Please wait." |
|
my_bar = st.progress(0, text=progress_text) |
|
|
|
|
|
for percent in range(num_items): |
|
if len (products) > num_items : |
|
products = products[:num_items] |
|
break |
|
|
|
prop = min(len(products)/num_items, 1) |
|
my_bar.progress(prop, text=progress_text) |
|
|
|
url = f"https://www.klikindomaret.com/search/?key={query}&categories=&productbrandid=&sortcol=&pagesize=54&page={page}&startprice=&endprice=&attributes=&ShowItem=" |
|
response = requests.get(url) |
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
|
|
product_list = soup.find_all('a', href=True) |
|
|
|
for product in product_list: |
|
|
|
product_href = product['href'] |
|
if '/product/' in product_href: |
|
product_name = product.find('div', class_='title').text.strip() |
|
product_price = product.find('span', class_='normal price-value').text.strip() |
|
|
|
|
|
discount_element = product.find('span', class_='strikeout disc-price') |
|
discount_percentage = "" |
|
original_price = "" |
|
if discount_element: |
|
discount_percentage = discount_element.find('span', class_='discount').text.strip() |
|
original_price = discount_element.text.replace(discount_percentage, '').strip() |
|
else: |
|
|
|
discount_percentage = "0%" |
|
original_price = product_price |
|
|
|
product_link = f"https://www.klikindomaret.com{product_href}" |
|
products.append({ |
|
'product': product_name, |
|
'original_price': original_price, |
|
'discount_percentage': discount_percentage, |
|
'price': product_price, |
|
'link': product_link |
|
}) |
|
|
|
page += 1 |
|
|
|
time.sleep(1) |
|
my_bar.empty() |
|
return products |
|
|
|
@st.cache_data |
|
def scrape_shopee(nama_barang, num_items): |
|
products = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
options = webdriver.ChromeOptions() |
|
options.add_argument('--no-sandbox') |
|
options.add_argument('--headless') |
|
options.add_argument('--disable-notifications') |
|
options.add_argument('--disable-infobars') |
|
options.add_argument('--disable-dev-shm-usage') |
|
|
|
|
|
try : |
|
page = 1 |
|
query = quote(nama_barang) |
|
|
|
driver = webdriver.Chrome(options = options) |
|
url = f'https://shopee.co.id/search?keyword={query}&page={page}' |
|
driver.get(url) |
|
time.sleep(10) |
|
|
|
|
|
html_element = driver.find_element(By.TAG_NAME, "html") |
|
|
|
|
|
html = html_element.get_attribute("innerHTML") |
|
|
|
soup = BeautifulSoup(html, "html.parser") |
|
|
|
for i in soup.find_all('div', class_ = "ie3A+n bM+7UW Cve6sh" ): |
|
products.append(i.text) |
|
|
|
|
|
except WebDriverException as e: |
|
st.error(f"An error occurred: {e}") |
|
finally: |
|
if driver: |
|
driver.quit() |
|
|
|
return products |
|
|
|
@st.cache_data |
|
def scrape_tokped(nama_barang, num_items): |
|
products = [] |
|
page = 1 |
|
query = quote(nama_barang) |
|
|
|
while len(products) < num_items : |
|
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st=' |
|
|
|
headers = { |
|
'User-Agent' : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36", |
|
'Accept-Language' : 'en-US, en;q-0.5', |
|
'Accept-Encoding' : 'grip, deflate, bt', |
|
'Connection': 'keep-alive' |
|
} |
|
timeout = 10 |
|
try : |
|
|
|
response = requests.get(url, headers = headers, timeout = timeout) |
|
response.raise_for_status() |
|
|
|
soup = BeautifulSoup(response.text, 'html.parser') |
|
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href = True) |
|
|
|
for product_info in product_container_list: |
|
link = product_info['href'] |
|
title = product_info.find('div', class_="prd_link-product-name css-3um8ox" ).text.strip() |
|
harga = product_info.find('div', class_="prd_link-product-price css-h66vau").text.strip() |
|
|
|
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h").text.strip() |
|
terjual = terjual_element if terjual_element else None |
|
|
|
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i') |
|
rating = rating_element.text if rating_element else None |
|
|
|
toko = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip").text.strip() |
|
asal_product = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip" ).text.strip() |
|
|
|
products.append({ |
|
'link': link, |
|
'produk' : title, |
|
'harga' : harga, |
|
'terjual' : terjual, |
|
'rating' : rating, |
|
'toko' : toko, |
|
'asal_product' : asal_product, |
|
}) |
|
if len(products) >= num_items: |
|
products = products[:num_items] |
|
break |
|
|
|
except requests.exceptions.RequestException as e: |
|
st.error("Terjadi kesalahan") |
|
break |
|
except requests.exceptions.HTTPError as e: |
|
st.error("HTTP Error :", str(e)) |
|
break |
|
page += 1 |
|
return products |
|
|
|
|
|
|
|
st.title("Scraping E-Commerce") |
|
|
|
with st.expander("Settings :"): |
|
|
|
selected_site = st.selectbox("Pilih Situs Web :", ["klikindomaret.com", "shopee.co.id", "tokopedia.com"]) |
|
|
|
nama_barang = st.text_input("Masukkan Nama Barang :") |
|
num_items = st.number_input("Masukkan Estimasi Banyak Data :", min_value = 1, step = 1, placeholder="Type a number...") |
|
|
|
download_format = st.selectbox("Pilih Format Unduhan :", ["XLSX", "CSV", "JSON"]) |
|
st.info('Tekan "Mulai Scraping" kembali jika tampilan menghilang ', icon="ℹ️") |
|
|
|
|
|
hidden_data = [] |
|
|
|
scraping_done = False |
|
|
|
if selected_site == "klikindomaret.com": |
|
if st.button("Mulai Scraping"): |
|
if not nama_barang: |
|
st.error("Mohon isi Nama Barang.") |
|
else: |
|
scraped_products = scrape_klikindomaret(nama_barang, num_items) |
|
hidden_data = scraped_products |
|
scraping_done = True |
|
|
|
if selected_site == "shopee.co.id": |
|
|
|
if st.button("Mulai Scraping"): |
|
if not nama_barang: |
|
st.error("Mohon isi Nama Barang.") |
|
else: |
|
scraped_products = scrape_shopee(nama_barang, num_items) |
|
hidden_data = scraped_products |
|
scraping_done = True |
|
|
|
if selected_site == "tokopedia.com": |
|
|
|
if st.button("Mulai Scraping"): |
|
if not nama_barang: |
|
st.error("Mohon isi Nama Barang.") |
|
else: |
|
scraped_products = scrape_tokped(nama_barang, num_items) |
|
hidden_data = scraped_products |
|
scraping_done = True |
|
|
|
|
|
output_file = f"scraped_{selected_site}_{nama_barang}.xlsx" |
|
output_file_csv = f"scraped_{selected_site}_{nama_barang}.csv" |
|
output_file_json = f"scraped_{selected_site}_{nama_barang}.json" |
|
|
|
|
|
|
|
|
|
|
|
if scraping_done: |
|
if hidden_data: |
|
|
|
with st.expander(f"Hasil Scraping {selected_site} :"): |
|
st.write(pd.DataFrame(scraped_products)) |
|
if download_format == "XLSX": |
|
df = pd.DataFrame(scraped_products) |
|
df.to_excel(output_file, index=False) |
|
st.download_button(label=f"Unduh XLSX ({len(hidden_data)} data)", data=open(output_file, "rb").read(), key="xlsx_download", file_name=output_file) |
|
elif download_format == "CSV": |
|
df = pd.DataFrame(scraped_products) |
|
csv = df.to_csv(index=False) |
|
st.download_button(label=f"Unduh CSV ({len(hidden_data)} data)", data=csv, key="csv_download", file_name=output_file_csv) |
|
elif download_format == "JSON": |
|
json_data = pd.DataFrame(scraped_products).to_json(orient="records") |
|
st.download_button(label=f"Unduh JSON ({len(hidden_data)} data)", data=json_data, key="json_download", file_name=output_file_json) |
|
elif not hidden_data: |
|
st.warning(f"Tidak ada data pada query '{nama_barang}'", icon="⚠️") |
|
|
|
if not scraping_done: |
|
st.write("Tidak ada data untuk diunduh.") |
|
|
|
st.divider() |
|
github_link = "https://github.com/naufalnashif/" |
|
st.markdown(f"GitHub: [{github_link}]({github_link})") |
|
instagram_link = "https://www.instagram.com/naufal.nashif/" |
|
st.markdown(f"Instagram: [{instagram_link}]({instagram_link})") |
|
st.write('Terima kasih telah mencoba demo ini!') |
|
|