gemiline / requests_handler.py
motaer0206's picture
Update requests_handler.py
c5a455e verified
import requests
from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor
# 爬取空氣品質
def get_air_quality():
url = 'https://data.moenv.gov.tw/api/v2/aqx_p_432?api_key=221974dd-667c-4243-b308-61b60bc29986&limit=1000&sort=ImportDate desc&format=JSON'
data = requests.get(url) # 使用 get 方法透過空氣品質指標 API 取得內容
data_json = data.json() # 將取得的檔案轉換為 JSON 格式
results = []
for i in data_json['records']: # 依序取出 records 內容的每個項目
results.append(i['county'] + ' ' + i['sitename'] + ',') # 印出城市與地點名稱
results.append('AQI:' + i['aqi'] + ',') # 印出 AQI 數值
results.append('空氣品質' + i['status']) # 印出空氣品質狀態
return "\n".join(results)
def get_weather(city_name=None): # 增加 city_name 參數,預設為 None
url = 'https://opendata.cwa.gov.tw/fileapi/v1/opendataapi/F-C0032-001?Authorization=CWA-AA0919C6-D524-47EE-8225-5FEB6779289B&downloadType=WEB&format=JSON'
data = requests.get(url)
data_json = data.json()
location = data_json['cwaopendata']['dataset']['location']
results = []
target_city_found = False # 標記是否找到目標縣市
for i in location:
city = i['locationName']
wx8 = i['weatherElement'][0]['time'][0]['parameter']['parameterName']
maxt8 = i['weatherElement'][1]['time'][0]['parameter']['parameterName']
mint8 = i['weatherElement'][2]['time'][0]['parameter']['parameterName']
ci8 = i['weatherElement'][3]['time'][0]['parameter']['parameterName']
pop8 = i['weatherElement'][4]['time'][0]['parameter']['parameterName']
weather_info = f'{city}未來 8 小時{wx8},最高溫 {maxt8} 度,最低溫 {mint8} 度,降雨機率 {pop8} %'
if city_name: # 如果有指定縣市名稱
if city_name in city: # 檢查縣市名稱是否包含在資料中 (可以更精確的匹配)
results.append(weather_info)
target_city_found = True # 找到目標縣市
break # 找到目標縣市後就可以停止迴圈,因為 F-C0032-001 每個縣市只有一筆資料
else: # 如果沒有指定縣市名稱 (直接問 "天氣預報"),回傳所有縣市
results.append(weather_info)
if city_name and not target_city_found: # 如果指定縣市但找不到
return f"抱歉,柴柴找不到 {city_name} 的天氣資訊汪!請確認縣市名稱是否正確。"
return "\n".join(results)
# 爬取地震資訊
def get_earthquake():
url = 'https://opendata.cwa.gov.tw/api/v1/rest/datastore/E-A0016-001?Authorization=CWA-AA0919C6-D524-47EE-8225-5FEB6779289B&format=JSON'
data = requests.get(url)
data_json = data.json()
eq = data_json['records']['Earthquake']
results = []
for i in eq:
loc = i['EarthquakeInfo']['Epicenter']['Location']
val = i['EarthquakeInfo']['EarthquakeMagnitude']['MagnitudeValue']
dep = i['EarthquakeInfo']['FocalDepth']
eq_time = i['EarthquakeInfo']['OriginTime']
img = i['ReportImageURI']
msg = f'{loc},芮氏規模 {val} 級,深度 {dep} 公里,發生時間 {eq_time}'
results.append(msg)
return "\n".join(results)
# 爬取牌告匯率資訊
def get_exchange_rate():
url = 'https://rate.bot.com.tw/xrt/flcsv/0/day'
rate = requests.get(url)
rate.encoding = 'utf-8'
rt = rate.text
rts = rt.split('\n')
results = [] # 初始化 results list
for i in rts:
try:
a = i.split(',')
currency_name = a[0] # 貨幣名稱
spot_rate_selling = a[12] # 即期賣出匯率
if currency_name and spot_rate_selling: # 確保貨幣名稱和匯率都有值
results.append(f"{currency_name}: 即期賣出匯率 {spot_rate_selling}") # 將匯率資訊加入 results
# print(a[0] + ': ' + a[12]) # 移除 print,改用 results 儲存
except:
break
return "\n".join(results) # 正確回傳 results 字串