Spaces:
Sleeping
Sleeping
import streamlit as st | |
import requests | |
from bs4 import BeautifulSoup | |
from datetime import datetime | |
import pandas as pd | |
from concurrent.futures import ThreadPoolExecutor | |
import jieba | |
from wordcloud import WordCloud | |
import matplotlib.pyplot as plt | |
from io import BytesIO | |
import matplotlib.font_manager as fm | |
import matplotlib as mpl | |
class PTTScraper: | |
base_url = "https://www.ptt.cc" | |
def __init__(self, _board): | |
self.base_url = PTTScraper.base_url | |
self.url = self.base_url + f"/bbs/{_board}/index.html" | |
def get_post_content(self, post_url): | |
soup = PTTScraper.get_soup(self.base_url + post_url) | |
content = soup.find(id='main-content').text | |
pushes = soup.find_all('div', class_='push') | |
with ThreadPoolExecutor() as executor: | |
push_list = list(executor.map(self.get_push, pushes)) | |
return content, push_list | |
def get_push(self, push): | |
try: | |
if push.find('span', class_='push-tag') is None: | |
return dict() | |
push_tag = push.find('span', class_='push-tag').text.strip() | |
push_userid = push.find('span', class_='push-userid').text.strip() | |
push_content = push.find('span', class_='push-content').text.strip().lstrip(":") | |
push_ipdatetime = push.find('span', class_='push-ipdatetime').text.strip() | |
push_dict = { | |
"Tag": push_tag, | |
"Userid": push_userid, | |
"Content": push_content, | |
"Ipdatetime": push_ipdatetime | |
} | |
except Exception as e: | |
st.error(f"解析推文內容時發生錯誤:{e}") | |
return push_dict | |
def get_soup(url): | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) " | |
"Chrome/58.0.3029.110 Safari/537.3", } | |
cookies = {"over18": "1"} | |
response = requests.get(url, headers=headers, cookies=cookies) | |
return BeautifulSoup(response.text, 'html.parser') | |
def fetch_post(self, url): | |
soup = PTTScraper.get_soup(self.base_url + url) | |
content, author, title, date = None, None, None, None | |
try: | |
if soup.find(id='main-content') is not None: | |
content = soup.find(id='main-content').text | |
content = content.split('※ 發信站')[0] | |
if soup.find(class_='article-meta-value') is not None: | |
author = soup.find(class_='article-meta-value').text | |
title = soup.find_all(class_='article-meta-value')[-2].text | |
date_str = soup.find_all(class_='article-meta-value')[-1].text | |
date = datetime.strptime(date_str, '%a %b %d %H:%M:%S %Y') | |
except Exception as e: | |
st.error(f"抓取文章時發生錯誤:{e}") | |
st.error(self.base_url + url) | |
pushes = soup.find_all('div', class_='push') | |
with ThreadPoolExecutor() as executor: | |
push_list = list(executor.map(self.get_push, pushes)) | |
return {'Title': title, 'Author': author, 'Date': date, 'Content': content, 'Link': url, 'Pushes': push_list} | |
def get_latest_posts(self, max_posts=100): | |
data = [] | |
links_num = 0 | |
while len(data) < max_posts: | |
soup = PTTScraper.get_soup(self.url) | |
data_curr, num = self.get_data_current_page(soup, max_posts=max_posts, links_num=links_num) | |
data.extend(data_curr) | |
if len(data) >= max_posts: | |
return data[:max_posts] | |
links_num += num | |
prev_link = soup.find('a', string='‹ 上頁')['href'] | |
self.url = self.base_url + prev_link | |
return data | |
def get_data_current_page(self, soup=None, max_posts=100, links_num=0): | |
if soup is None: | |
soup = PTTScraper.get_soup(self.url) | |
links = [] | |
for entry in reversed(soup.select('.r-ent')): | |
try: | |
title = entry.find("div", "title").text.strip() | |
if entry.find("div", "title").a is None: | |
continue | |
links.append(entry.select('.title a')[0]['href']) | |
if len(links) + links_num >= max_posts: | |
break | |
except Exception as e: | |
st.error(f"解析文章連結時發生錯誤:{e}") | |
with ThreadPoolExecutor() as executor: | |
data = list(executor.map(self.fetch_post, links)) | |
return data, len(links) | |
# Step 1: Download the font from Google Drive | |
font_url = "https://drive.google.com/uc?id=1eGAsTN1HBpJAkeVM57_C7ccp7hbgSz3_&export=download" | |
font_response = requests.get(font_url) | |
# Step 2: Save the font locally | |
font_path = "TaipeiSansTCBeta-Regular.ttf" | |
with open(font_path, "wb") as font_file: | |
font_file.write(font_response.content) | |
# Step 3: Add the font to the font manager and set it as the default font | |
fm.fontManager.addfont(font_path) | |
mpl.rc('font', family='Taipei Sans TC Beta') | |
# Streamlit app | |
st.title("PTT 爬蟲與分析工具") | |
board = st.text_input("輸入 PTT 看板名稱:", "Stock") | |
max_posts = st.number_input("輸入要抓取的文章數量上限:", min_value=1, max_value=1000, value=100) | |
if st.button("抓取資料"): | |
scraper = PTTScraper(board) | |
data = scraper.get_latest_posts(max_posts=max_posts) | |
if data: | |
df = pd.DataFrame(data) | |
st.write("抓取的文章資料:") | |
st.dataframe(df[['Title', 'Author', 'Date', 'Link']]) | |
# 使用 Jieba 進行斷詞 | |
sentence_list = list(df['Content']) | |
word_sentence_list = [" ".join(jieba.cut(sentence)) for sentence in sentence_list] | |
# 使用詞頻進行簡單的實體抽取 | |
word_freq = pd.Series(" ".join(word_sentence_list).split()).value_counts() | |
# 生成並顯示詞雲 | |
text = " ".join(word_sentence_list) | |
try: | |
# 使用下載的字體 | |
wordcloud = WordCloud(width=2000, height=1000, max_font_size=400, max_words=400, | |
background_color="black", font_path=font_path, | |
colormap="Dark2").generate(text) | |
except Exception as e: | |
st.warning(f"生成詞雲時發生錯誤:{e}。將使用默認設置。") | |
wordcloud = WordCloud(width=2000, height=1000, max_font_size=400, max_words=400, | |
background_color="black", colormap="Dark2").generate(text) | |
st.write("詞雲:") | |
plt.figure(dpi=600) | |
plt.imshow(wordcloud) | |
plt.axis("off") | |
buf = BytesIO() | |
plt.savefig(buf, format="png") | |
buf.seek(0) | |
st.image(buf) | |