Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
from bs4 import BeautifulSoup | |
import concurrent.futures | |
import pandas as pd | |
from io import BytesIO | |
from pyxlsb import open_workbook as open_xlsb | |
if 'df' not in st.session_state: | |
st.session_state['df'] = None | |
if 'is_df' not in st.session_state: | |
st.session_state['is_df'] = False | |
headers = { | |
'authority': 'cdn.jwplayer.com', | |
'accept': '*/*', | |
'accept-language': 'en-US,en;q=0.5', | |
'origin': 'https://hotcopper.com.au', | |
'referer': 'https://hotcopper.com.au/', | |
'sec-ch-ua': '"Chromium";v="122", "Not(A:Brand";v="24", "Brave";v="122"', | |
'sec-ch-ua-mobile': '?0', | |
'sec-ch-ua-platform': '"Windows"', | |
'sec-fetch-dest': 'empty', | |
'sec-fetch-mode': 'cors', | |
'sec-fetch-site': 'cross-site', | |
'sec-gpc': '1', | |
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36', | |
} | |
cookies = { | |
'xf_show_post_view': '0', | |
'xf_threads_terms_conditions_pop': '1', | |
'hc_user_tracker': 'ZAXlB7LEHd0eT6X4QQfAOFkXms373LrE', | |
'xf_user': '971779%2C5e49c37ac9fa56c1a2798923703d4978b9f8427d', | |
'xf_session': '16mn22etenrnkf7aimqacdu026', | |
} | |
post_links = [] | |
post_data = { | |
'username': [], | |
'number_of_posts_by_user': [], | |
'number_of_great_analysis_for_user': [], | |
'post_date': [], | |
'post_time': [], | |
'post_id': [], | |
'number_of_upvotes': [], | |
'stock_pill': [], | |
'stock_pill_link': [], | |
'price_at_posting': [], | |
'sentiment': [], | |
'disclosure': [], | |
'message': [], | |
'reply_post_id': [], | |
'reply_post_url': [] | |
} | |
def get_number_of_posts(company_code, max_page_count=999999999): | |
url = f'https://hotcopper.com.au/asx/{company_code}/discussion/page-{max_page_count**3}' | |
response = requests.get(url, headers=headers, cookies=cookies) | |
number_of_posts = 0 | |
if url != response.url: | |
number_of_posts = int(response.url.split('-')[-1]) | |
else: | |
return get_number_of_posts(company_code, max_page_count*3) | |
return number_of_posts | |
def get_all_posts(url): | |
global post_links | |
response = requests.get(url, headers=headers, cookies=cookies) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
posts = [f"https://hotcopper.com.au{post['href']}" for post in soup.find_all('a', class_='subject-a')] | |
post_links.extend(posts) | |
return posts | |
def get_post(url): | |
response = requests.get(url, headers=headers, cookies=cookies) | |
soup = BeautifulSoup(response.text, 'html.parser') | |
username = soup.find('div', class_='user-username').text.strip() | |
number_of_posts_by_user = soup.find('div', class_='user-post-num').text.replace(',','').replace('Posts.', '').strip() | |
try: | |
number_of_great_analysis_for_user = soup.find('div', class_='user-ga-count').text.replace(',','').replace('lightbulb Created with Sketch.','').strip() | |
except: | |
number_of_great_analysis_for_user = 0 | |
post_date = soup.find('div', class_='post-metadata-date').text.replace('Posted:','').strip() | |
post_time = soup.find('div', class_='post-metadata-time').text.replace('Time:', '').strip() | |
post_id = url.split('=')[-1] | |
try: | |
number_of_upvotes = soup.find('div', class_='votes-num has-not-voted').text.strip() | |
except: | |
number_of_upvotes = 0 | |
stock_pill = soup.find('span', class_='stock-pill').text.strip() | |
stock_pill_link = f"https://hotcopper.com.au{soup.find('span', class_='stock-pill').find('a')['href']}" | |
for meta_detail in soup.find_all('span', class_='meta-details'): | |
if 'Price at posting:' in meta_detail.text: | |
price_at_posting = meta_detail.text.replace('Price at posting:', '').strip() | |
if 'Sentiment:' in meta_detail.text: | |
sentiment = meta_detail.text.replace('Sentiment:', '').strip() | |
if 'Disclosure' in meta_detail.text: | |
disclosure = meta_detail.text.replace('Disclosure:', '').strip() | |
message = soup.find('blockquote', class_='message-text ugc baseHtml').get_text(strip=True) | |
if 'β' in message: | |
message = message.split('β')[1] | |
if soup.find('a', class_='AttributionLink') is not None: | |
reply_post_id = soup.find('a', class_='AttributionLink')['data-hash'] | |
reply_post_url = f"https://hotcopper.com.au/{soup.find('a', class_='AttributionLink')['href']}" | |
else: | |
reply_post_id = None | |
reply_post_url = None | |
post_data['username'].append(username) | |
post_data['number_of_posts_by_user'].append(number_of_posts_by_user) | |
post_data['number_of_great_analysis_for_user'].append(number_of_great_analysis_for_user) | |
post_data['post_date'].append(post_date) | |
post_data['post_time'].append(post_time) | |
post_data['post_id'].append(post_id) | |
post_data['number_of_upvotes'].append(number_of_upvotes) | |
post_data['stock_pill'].append(stock_pill) | |
post_data['stock_pill_link'].append(stock_pill_link) | |
post_data['price_at_posting'].append(price_at_posting) | |
post_data['sentiment'].append(sentiment) | |
post_data['disclosure'].append(disclosure) | |
post_data['message'].append(message) | |
post_data['reply_post_id'].append(reply_post_id) | |
post_data['reply_post_url'].append(reply_post_url) | |
return post_data | |
def convert_df(df: pd.DataFrame): | |
return df.to_excel('',index=False, engine='openpyxl', sheet_name='Sheet1') | |
def to_excel(df): | |
output = BytesIO() | |
writer = pd.ExcelWriter(output, engine='xlsxwriter') | |
df.to_excel(writer, index=False, sheet_name='Sheet1') | |
writer.save() | |
processed_data = output.getvalue() | |
return processed_data | |
def track_progress(futures, total, message): | |
progress_bar = st.empty() | |
completed = 0 | |
for future in concurrent.futures.as_completed(futures): | |
completed += 1 | |
progress_bar.progress(completed / total, f'Scraped {completed}/{total} {message}...') | |
st.title('Thread Scraper') | |
st.sidebar.title('Settings') | |
company_code = st.sidebar.text_input('Company Code', 'ZIP').lower() | |
scrape = st.sidebar.button('Scrape') | |
if st.session_state['is_df']: | |
with st.spinner('Creating database ..'): | |
df = pd.DataFrame(post_data) | |
csv = to_excel(df) | |
st.download_button( | |
label="Download Data", | |
data=to_excel(st.session_state['df']), | |
file_name=f'{company_code.upper()}.xlsx', | |
mime='application/vnd.ms-excel', | |
) | |
if scrape: | |
with st.spinner('Getting number of post pages to scrape ..'): | |
number_of_posts = get_number_of_posts(company_code) | |
with st.spinner('Getting all post links ..'): | |
pages = [f'https://hotcopper.com.au/asx/{company_code}/discussion/page-{page_number}' for page_number in range(1, get_number_of_posts(company_code)+1)] | |
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: | |
futures = [executor.submit(get_all_posts, url) for url in pages] | |
track_progress(futures, len(pages), 'post pages' ) | |
with st.spinner('Getting all post data ..'): | |
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: | |
futures = [executor.submit(get_post, url) for url in post_links] | |
track_progress(futures, len(post_links), 'posts') | |
st.session_state['df'] = pd.DataFrame(post_data) | |
st.session_state['is_df'] = True | |