Spaces:
Running
Running
import requests | |
from bs4 import BeautifulSoup | |
from concurrent.futures import ThreadPoolExecutor | |
# 爬取空氣品質 | |
def get_air_quality(): | |
url = 'https://data.moenv.gov.tw/api/v2/aqx_p_432?api_key=221974dd-667c-4243-b308-61b60bc29986&limit=1000&sort=ImportDate desc&format=JSON' | |
data = requests.get(url) # 使用 get 方法透過空氣品質指標 API 取得內容 | |
data_json = data.json() # 將取得的檔案轉換為 JSON 格式 | |
results = [] | |
for i in data_json['records']: # 依序取出 records 內容的每個項目 | |
results.append(i['county'] + ' ' + i['sitename'] + ',') # 印出城市與地點名稱 | |
results.append('AQI:' + i['aqi'] + ',') # 印出 AQI 數值 | |
results.append('空氣品質' + i['status']) # 印出空氣品質狀態 | |
return "\n".join(results) | |
def get_weather(city_name=None): # 增加 city_name 參數,預設為 None | |
url = 'https://opendata.cwa.gov.tw/fileapi/v1/opendataapi/F-C0032-001?Authorization=CWA-AA0919C6-D524-47EE-8225-5FEB6779289B&downloadType=WEB&format=JSON' | |
data = requests.get(url) | |
data_json = data.json() | |
location = data_json['cwaopendata']['dataset']['location'] | |
results = [] | |
target_city_found = False # 標記是否找到目標縣市 | |
for i in location: | |
city = i['locationName'] | |
wx8 = i['weatherElement'][0]['time'][0]['parameter']['parameterName'] | |
maxt8 = i['weatherElement'][1]['time'][0]['parameter']['parameterName'] | |
mint8 = i['weatherElement'][2]['time'][0]['parameter']['parameterName'] | |
ci8 = i['weatherElement'][3]['time'][0]['parameter']['parameterName'] | |
pop8 = i['weatherElement'][4]['time'][0]['parameter']['parameterName'] | |
weather_info = f'{city}未來 8 小時{wx8},最高溫 {maxt8} 度,最低溫 {mint8} 度,降雨機率 {pop8} %' | |
if city_name: # 如果有指定縣市名稱 | |
if city_name in city: # 檢查縣市名稱是否包含在資料中 (可以更精確的匹配) | |
results.append(weather_info) | |
target_city_found = True # 找到目標縣市 | |
break # 找到目標縣市後就可以停止迴圈,因為 F-C0032-001 每個縣市只有一筆資料 | |
else: # 如果沒有指定縣市名稱 (直接問 "天氣預報"),回傳所有縣市 | |
results.append(weather_info) | |
if city_name and not target_city_found: # 如果指定縣市但找不到 | |
return f"抱歉,柴柴找不到 {city_name} 的天氣資訊汪!請確認縣市名稱是否正確。" | |
return "\n".join(results) | |
# 爬取地震資訊 | |
def get_earthquake(): | |
url = 'https://opendata.cwa.gov.tw/api/v1/rest/datastore/E-A0016-001?Authorization=CWA-AA0919C6-D524-47EE-8225-5FEB6779289B&format=JSON' | |
data = requests.get(url) | |
data_json = data.json() | |
eq = data_json['records']['Earthquake'] | |
results = [] | |
for i in eq: | |
loc = i['EarthquakeInfo']['Epicenter']['Location'] | |
val = i['EarthquakeInfo']['EarthquakeMagnitude']['MagnitudeValue'] | |
dep = i['EarthquakeInfo']['FocalDepth'] | |
eq_time = i['EarthquakeInfo']['OriginTime'] | |
img = i['ReportImageURI'] | |
msg = f'{loc},芮氏規模 {val} 級,深度 {dep} 公里,發生時間 {eq_time}' | |
results.append(msg) | |
return "\n".join(results) | |
# 爬取牌告匯率資訊 | |
def get_exchange_rate(): | |
url = 'https://rate.bot.com.tw/xrt/flcsv/0/day' | |
rate = requests.get(url) | |
rate.encoding = 'utf-8' | |
rt = rate.text | |
rts = rt.split('\n') | |
results = [] # 初始化 results list | |
for i in rts: | |
try: | |
a = i.split(',') | |
currency_name = a[0] # 貨幣名稱 | |
spot_rate_selling = a[12] # 即期賣出匯率 | |
if currency_name and spot_rate_selling: # 確保貨幣名稱和匯率都有值 | |
results.append(f"{currency_name}: 即期賣出匯率 {spot_rate_selling}") # 將匯率資訊加入 results | |
# print(a[0] + ': ' + a[12]) # 移除 print,改用 results 儲存 | |
except: | |
break | |
return "\n".join(results) # 正確回傳 results 字串 |