naufalnashif's picture
Update app.py
ddbcdcd
raw
history blame
11.9 kB
import requests
from bs4 import BeautifulSoup
import pandas as pd
from urllib.parse import quote
import streamlit as st
import json
import time
import logging
from selenium import webdriver
from selenium.common.exceptions import WebDriverException
from selenium.webdriver.common.by import By
logging.basicConfig(level=logging.DEBUG)
@st.cache_data
def scrape_klikindomaret(nama_barang, num_items):
products = []
page = 1
query = quote(nama_barang)
progress_text = "Scraping in progress. Please wait."
my_bar = st.progress(0, text=progress_text)
#while len(products) < num_items :
for percent in range(num_items):
if len (products) > num_items :
products = products[:num_items]
break
prop = min(len(products)/num_items, 1)
my_bar.progress(prop, text=progress_text)
url = f"https://www.klikindomaret.com/search/?key={query}&categories=&productbrandid=&sortcol=&pagesize=54&page={page}&startprice=&endprice=&attributes=&ShowItem="
response = requests.get(url)
soup = BeautifulSoup(response.text, 'html.parser')
product_list = soup.find_all('a', href=True)
for product in product_list:
product_href = product['href']
if '/product/' in product_href:
product_name = product.find('div', class_='title').text.strip()
product_price = product.find('span', class_='normal price-value').text.strip()
# Cek apakah ada harga sebelum diskon dan persentase diskon
discount_element = product.find('span', class_='strikeout disc-price')
discount_percentage = ""
original_price = ""
if discount_element:
discount_percentage = discount_element.find('span', class_='discount').text.strip()
original_price = discount_element.text.replace(discount_percentage, '').strip()
else:
# Jika tidak ada diskon, set discount_percentage ke "0%" dan original_price ke product_price
discount_percentage = "0%"
original_price = product_price
product_link = f"https://www.klikindomaret.com{product_href}"
products.append({
'product': product_name,
'original_price': original_price,
'discount_percentage': discount_percentage,
'price': product_price,
'link': product_link
})
page += 1
time.sleep(1)
my_bar.empty()
return products
@st.cache_data
def scrape_shopee(nama_barang, num_items):
products = []
#path = ''
#Customize chrome display
#chrome_options = Options()
#chrome_options.add_argument('--no-sandbox')
#chrome_options.add_argument('--headless')
#chrome_options.add_argument('--disable-notifications')
#chrome_options.add_argument('--disable-infobars')
#Customize chrome display huggingface
options = webdriver.ChromeOptions()
options.add_argument('--no-sandbox')
options.add_argument('--headless')
options.add_argument('--disable-notifications')
options.add_argument('--disable-infobars')
options.add_argument('--disable-dev-shm-usage')
try :
page = 1
query = quote(nama_barang)
#driver = webdriver.Chrome(executable_path = path, options = chrome_options)
driver = webdriver.Chrome(options = options)
url = f'https://shopee.co.id/search?keyword={query}&page={page}'
driver.get(url)
time.sleep(10)
# Cari elemen berdasarkan tagname HTML
html_element = driver.find_element(By.TAG_NAME, "html")
# Dapatkan HTML dari elemen
html = html_element.get_attribute("innerHTML")
#html = driver.execute_script("return document.getElementsByTagName('html')[0].innerHTML")
soup = BeautifulSoup(html, "html.parser")
for i in soup.find_all('div', class_ = "ie3A+n bM+7UW Cve6sh" ):
products.append(i.text)
except requests.exceptions.RequestException as e:
logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}")
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP Error: {e}")
except Exception as e:
logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}")
except WebDriverException as e:
st.error(f"An error occurred: {e}")
finally:
if driver:
driver.quit()
return products
@st.cache_data
def scrape_tokped(nama_barang, num_items):
products = []
page = 1
query = quote(nama_barang)
while len(products) < num_items :
url = f'https://www.tokopedia.com/search?navsource=&page={page}&q={query}&srp_component_id=02.01.00.00&srp_page_id=&srp_page_title=&st='
headers = {
'User-Agent' : "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.36",
'Accept-Language' : 'en-US, en;q-0.5',
'Accept-Encoding' : 'grip, deflate, bt',
'Connection': 'keep-alive'
}
timeout = 10
try :
response = requests.get(url, headers = headers, timeout = timeout)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
product_container_list = soup.find_all('a', class_="pcv3__info-content css-gwkf0u", href = True)
for product_info in product_container_list:
link = product_info['href']
title_element = product_info.find('div', class_="prd_link-product-name css-3um8ox")
title = title_element.text.strip() if title_element else None
harga_element = product_info.find('div', class_="prd_link-product-price css-h66vau")
harga = harga_element.text.strip() if harga_element else None
terjual_element = product_info.find('span', class_="prd_label-integrity css-1sgek4h")
terjual = terjual_element.text if terjual_element else None
rating_element = product_info.find('span', class_='prd_rating-average-text css-t70v7i')
rating = rating_element.text if rating_element else None
toko_element = product_info.find('span', class_="prd_link-shop-name css-1kdc32b flip")
toko = toko_element.text.strip() if toko_element else None
asal_product_element = product_info.find('span', class_="prd_link-shop-loc css-1kdc32b flip")
asal_product = asal_product_element.text.strip() if asal_product_element else None
products.append({
'link': link,
'produk' : title,
'harga' : harga,
'terjual' : terjual,
'rating' : rating,
'toko' : toko,
'asal_product' : asal_product,
})
if len(products) >= num_items:
products = products[:num_items]
break
except requests.exceptions.RequestException as e:
logging.error(f"Terjadi kesalahan saat mengirim permintaan: {e}")
break
except requests.exceptions.HTTPError as e:
logging.error(f"HTTP Error: {e}")
break
except Exception as e:
logging.error(f"Terjadi kesalahan yang tidak diketahui: {e}")
break
page += 1
return products
#---------------------------------------------------User Interface----------------------------------------------------------------------
# Streamlit UI
st.title("Scraping E-Commerce")
with st.expander("Settings :"):
# Pilihan untuk memilih situs web
selected_site = st.selectbox("Pilih Situs Web :", ["klikindomaret.com", "shopee.co.id", "tokopedia.com"])
nama_barang = st.text_input("Masukkan Nama Barang :")
num_items = st.number_input("Masukkan Estimasi Banyak Data :", min_value = 1, step = 1, placeholder="Type a number...")
download_format = st.selectbox("Pilih Format Unduhan :", ["XLSX", "CSV", "JSON"])
st.info('Tekan "Mulai Scraping" kembali jika tampilan menghilang ', icon="ℹ️")
# Variabel tersembunyi untuk menyimpan hasil scraping
hidden_data = []
scraping_done = False # Tambahkan variabel ini
if selected_site == "klikindomaret.com":
if st.button("Mulai Scraping"):
if not nama_barang:
st.error("Mohon isi Nama Barang.")
else:
scraped_products = scrape_klikindomaret(nama_barang, num_items)
hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi
scraping_done = True # Set scraping_done menjadi True
if selected_site == "shopee.co.id":
st.error("Sedang dalam pengembangan. Silahkan pilih situs yang lain")
#if st.button("Mulai Scraping"):
# if not nama_barang:
# st.error("Mohon isi Nama Barang.")
# else:
# scraped_products = scrape_shopee(nama_barang, num_items)
# hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi
# scraping_done = True # Set scraping_done menjadi True
if selected_site == "tokopedia.com":
#st.error("Sedang dalam pengembangan. Silahkan pilih situs yang lain")
if st.button("Mulai Scraping"):
if not nama_barang:
st.error("Mohon isi Nama Barang.")
else:
scraped_products = scrape_tokped(nama_barang, num_items)
hidden_data = scraped_products # Simpan data ke dalam variabel tersembunyi
scraping_done = True # Set scraping_done menjadi True
# Simpan DataFrame ke dalam file
output_file = f"scraped_{selected_site}_{nama_barang}.xlsx"
output_file_csv = f"scraped_{selected_site}_{nama_barang}.csv"
output_file_json = f"scraped_{selected_site}_{nama_barang}.json"
#---------------------------------------------------Download File & Hasil Scraping----------------------------------------------------------------------
# Tampilkan hasil scraping
if scraping_done:
if hidden_data:
# Menampilkan hasil sentimen dalam kotak yang dapat diperluas
with st.expander(f"Hasil Scraping {selected_site} :"):
st.write(pd.DataFrame(scraped_products))
if download_format == "XLSX":
df = pd.DataFrame(scraped_products)
df.to_excel(output_file, index=False)
st.download_button(label=f"Unduh XLSX ({len(hidden_data)} data)", data=open(output_file, "rb").read(), key="xlsx_download", file_name=output_file)
elif download_format == "CSV":
df = pd.DataFrame(scraped_products)
csv = df.to_csv(index=False)
st.download_button(label=f"Unduh CSV ({len(hidden_data)} data)", data=csv, key="csv_download", file_name=output_file_csv)
elif download_format == "JSON":
json_data = pd.DataFrame(scraped_products).to_json(orient="records")
st.download_button(label=f"Unduh JSON ({len(hidden_data)} data)", data=json_data, key="json_download", file_name=output_file_json)
elif not hidden_data:
st.warning(f"Tidak ada data pada query '{nama_barang}'", icon="⚠️")
if not scraping_done:
st.write("Tidak ada data untuk diunduh.")
st.divider()
github_link = "https://github.com/naufalnashif/"
st.markdown(f"GitHub: [{github_link}]({github_link})")
instagram_link = "https://www.instagram.com/naufal.nashif/"
st.markdown(f"Instagram: [{instagram_link}]({instagram_link})")
st.write('Terima kasih telah mencoba demo ini!')