0525_APP_INDEX / src /streamlit_app.py
CJRobert's picture
Update src/streamlit_app.py
7dd9f44 verified
import streamlit as st
import yfinance as yf
import pandas as pd
import ta
import plotly.graph_objects as go
from plotly.subplots import make_subplots
from datetime import datetime
import io
# 設定頁面配置
st.set_page_config(
page_title="股票技術分析系統",
page_icon="📈",
layout="wide"
)
def analyze_stock(ticker_input):
"""分析單一股票的技術指標"""
try:
ticker = yf.Ticker(ticker_input)
df = ticker.history(period="200d")
if df.empty:
return None, f"無法取得 {ticker_input} 的資料,請確認股票代碼是否正確。"
except Exception as e:
return None, f"取得 {ticker_input} 資料時發生錯誤: {e}"
# 處理時區和數據
df.index = df.index.tz_localize(None)
df = df.tail(200)
# 計算技術指標
df['MA5'] = df['Close'].rolling(window=5).mean()
df['MA10'] = df['Close'].rolling(window=10).mean()
df['MA20'] = df['Close'].rolling(window=20).mean()
df['RSI'] = ta.momentum.RSIIndicator(df['Close']).rsi()
df['OBV'] = ta.volume.OnBalanceVolumeIndicator(df['Close'], df['Volume']).on_balance_volume()
# 布林帶
bb = ta.volatility.BollingerBands(df['Close'])
df['BB_High'] = bb.bollinger_hband()
df['BB_Low'] = bb.bollinger_lband()
df['%B'] = (df['Close'] - df['BB_Low']) / (df['BB_High'] - df['BB_Low'])
df['BBW'] = (df['BB_High'] - df['BB_Low']) / df['MA20']
# KD指標
kd = ta.momentum.StochasticOscillator(df['High'], df['Low'], df['Close'])
df['K'] = kd.stoch()
df['D'] = kd.stoch_signal()
# MACD
macd = ta.trend.MACD(df['Close'])
df['MACD'] = macd.macd()
df['MACD_Signal'] = macd.macd_signal()
df['MACD_Diff'] = macd.macd_diff()
# CCI和黃金交叉
df['CCI'] = ta.trend.CCIIndicator(df['High'], df['Low'], df['Close']).cci()
df['Golden_Cross'] = df['MA5'] > df['MA20']
return df, None
def create_stock_chart(df, ticker):
"""創建股票圖表"""
fig = make_subplots(
rows=4, cols=1,
subplot_titles=('價格與移動平均線', 'RSI', 'MACD', 'KD指標'),
vertical_spacing=0.08,
row_heights=[0.5, 0.2, 0.2, 0.2]
)
# 價格與移動平均線
fig.add_trace(go.Candlestick(
x=df.index,
open=df['Open'],
high=df['High'],
low=df['Low'],
close=df['Close'],
name='價格'
), row=1, col=1)
fig.add_trace(go.Scatter(x=df.index, y=df['MA5'], name='MA5', line=dict(color='orange')), row=1, col=1)
fig.add_trace(go.Scatter(x=df.index, y=df['MA10'], name='MA10', line=dict(color='blue')), row=1, col=1)
fig.add_trace(go.Scatter(x=df.index, y=df['MA20'], name='MA20', line=dict(color='red')), row=1, col=1)
fig.add_trace(go.Scatter(x=df.index, y=df['BB_High'], name='布林帶上軌', line=dict(color='gray', dash='dash')), row=1, col=1)
fig.add_trace(go.Scatter(x=df.index, y=df['BB_Low'], name='布林帶下軌', line=dict(color='gray', dash='dash')), row=1, col=1)
# RSI
fig.add_trace(go.Scatter(x=df.index, y=df['RSI'], name='RSI', line=dict(color='purple')), row=2, col=1)
fig.add_hline(y=70, line_dash="dash", line_color="red", row=2, col=1)
fig.add_hline(y=30, line_dash="dash", line_color="green", row=2, col=1)
# MACD
fig.add_trace(go.Scatter(x=df.index, y=df['MACD'], name='MACD', line=dict(color='blue')), row=3, col=1)
fig.add_trace(go.Scatter(x=df.index, y=df['MACD_Signal'], name='MACD Signal', line=dict(color='red')), row=3, col=1)
fig.add_trace(go.Bar(x=df.index, y=df['MACD_Diff'], name='MACD Histogram', marker_color='gray'), row=3, col=1)
# KD指標
fig.add_trace(go.Scatter(x=df.index, y=df['K'], name='K', line=dict(color='blue')), row=4, col=1)
fig.add_trace(go.Scatter(x=df.index, y=df['D'], name='D', line=dict(color='red')), row=4, col=1)
fig.add_hline(y=80, line_dash="dash", line_color="red", row=4, col=1)
fig.add_hline(y=20, line_dash="dash", line_color="green", row=4, col=1)
fig.update_layout(
title=f'{ticker} 技術分析圖表',
height=800,
showlegend=True,
xaxis_rangeslider_visible=False
)
return fig
def get_investment_advice(df, ticker):
"""獲取投資建議"""
advice_list = []
latest_data = df.iloc[-1]
# RSI建議
if latest_data['RSI'] < 30:
advice_list.append({
'type': '買進建議',
'reason': f'RSI = {latest_data["RSI"]:.2f} < 30 (超賣)',
'color': 'success'
})
elif latest_data['RSI'] > 70:
advice_list.append({
'type': '賣出建議',
'reason': f'RSI = {latest_data["RSI"]:.2f} > 70 (超買)',
'color': 'error'
})
# %B建議
if latest_data['%B'] < 0:
advice_list.append({
'type': '買進建議',
'reason': f'%B = {latest_data["%B"]:.2f} < 0 (價格低於布林帶下軌)',
'color': 'success'
})
elif latest_data['%B'] > 1:
advice_list.append({
'type': '賣出建議',
'reason': f'%B = {latest_data["%B"]:.2f} > 1 (價格高於布林帶上軌)',
'color': 'error'
})
# 黃金交叉
if latest_data['Golden_Cross']:
advice_list.append({
'type': '看多信號',
'reason': '短期均線突破長期均線 (黃金交叉)',
'color': 'info'
})
return advice_list
def create_excel_download(df_dict):
"""創建Excel下載檔案"""
from openpyxl import Workbook
from openpyxl.styles import PatternFill
from openpyxl.utils.dataframe import dataframe_to_rows
wb = Workbook()
wb.remove(wb.active)
yellow_fill = PatternFill(start_color="FFFF00", end_color="FFFF00", fill_type="solid")
red_fill = PatternFill(start_color="FF0000", end_color="FF0000", fill_type="solid")
green_fill = PatternFill(start_color="00FF00", end_color="00FF00", fill_type="solid")
purple_fill = PatternFill(start_color="800080", end_color="800080", fill_type="solid")
advice_rows = []
for ticker_input, df in df_dict.items():
ws = wb.create_sheet(title=ticker_input)
ws.append(['股票代碼:', ticker_input])
ws.append(['分析日期:', datetime.now().strftime('%Y-%m-%d %H:%M:%S')])
ws.append([])
for r in dataframe_to_rows(df, index=True, header=True):
ws.append(r)
# 格式化和標色邏輯(保持原有邏輯)
header_row = 4
headers = {cell.value: cell.column for cell in ws[header_row]}
rsi_col = headers.get('RSI')
percentage_b_col = headers.get('%B')
if rsi_col and percentage_b_col:
for row in range(header_row + 1, ws.max_row + 1):
rsi_val = ws.cell(row=row, column=rsi_col).value
pct_b_val = ws.cell(row=row, column=percentage_b_col).value
if rsi_val is not None:
rsi_cell = ws.cell(row=row, column=rsi_col)
if rsi_val < 20:
rsi_cell.fill = yellow_fill
elif rsi_val > 70:
rsi_cell.fill = red_fill
if pct_b_val is not None:
b_cell = ws.cell(row=row, column=percentage_b_col)
if pct_b_val < 0:
b_cell.fill = green_fill
elif pct_b_val > 1:
b_cell.fill = purple_fill
# 儲存到記憶體
output = io.BytesIO()
wb.save(output)
output.seek(0)
return output.getvalue()
# Streamlit 應用程式主體
def main():
st.title("📈 股票技術分析系統")
st.markdown("---")
# 側邊欄設定
st.sidebar.header("分析設定")
# 選擇分析模式
analysis_mode = st.sidebar.radio(
"選擇分析模式",
["單一股票分析", "批量股票分析", "從Google Sheets匯入"]
)
if analysis_mode == "單一股票分析":
st.header("🎯 單一股票分析")
ticker_input = st.text_input("請輸入股票代碼 (例如: AAPL, 2330.TW):", value="AAPL")
if st.button("開始分析", type="primary"):
if ticker_input:
with st.spinner(f"正在分析 {ticker_input} ..."):
df, error = analyze_stock(ticker_input)
if error:
st.error(error)
else:
st.success(f"成功分析 {ticker_input}")
# 顯示基本資訊
col1, col2, col3, col4 = st.columns(4)
latest_data = df.iloc[-1]
with col1:
st.metric("最新收盤價", f"${latest_data['Close']:.2f}")
with col2:
st.metric("RSI", f"{latest_data['RSI']:.2f}")
with col3:
st.metric("%B", f"{latest_data['%B']:.2f}")
with col4:
st.metric("成交量", f"{latest_data['Volume']:,}")
# 顯示圖表
fig = create_stock_chart(df, ticker_input)
st.plotly_chart(fig, use_container_width=True)
# 投資建議
st.subheader("💡 投資建議")
advice_list = get_investment_advice(df, ticker_input)
if advice_list:
for advice in advice_list:
if advice['color'] == 'success':
st.success(f"**{advice['type']}**: {advice['reason']}")
elif advice['color'] == 'error':
st.error(f"**{advice['type']}**: {advice['reason']}")
else:
st.info(f"**{advice['type']}**: {advice['reason']}")
else:
st.info("目前沒有明確的買賣建議")
# 顯示詳細數據
with st.expander("查看詳細數據"):
st.dataframe(df.tail(20))
elif analysis_mode == "批量股票分析":
st.header("📊 批量股票分析")
ticker_text = st.text_area(
"請輸入股票代碼 (每行一個):",
value="AAPL\nMSFT\nGOOGL\n2330.TW\n2317.TW",
height=150
)
if st.button("開始批量分析", type="primary"):
tickers = [ticker.strip() for ticker in ticker_text.split('\n') if ticker.strip()]
if tickers:
progress_bar = st.progress(0)
status_text = st.empty()
all_data = {}
for i, ticker in enumerate(tickers):
status_text.text(f"正在分析 {ticker} ({i+1}/{len(tickers)})")
progress_bar.progress((i + 1) / len(tickers))
df, error = analyze_stock(ticker)
if error:
st.warning(f"{ticker}: {error}")
else:
all_data[ticker] = df
status_text.text("分析完成!")
if all_data:
st.success(f"成功分析 {len(all_data)} 支股票")
# 顯示摘要表格
summary_data = []
for ticker, df in all_data.items():
latest = df.iloc[-1]
summary_data.append({
'股票代碼': ticker,
'最新價格': f"${latest['Close']:.2f}",
'RSI': f"{latest['RSI']:.2f}",
'%B': f"{latest['%B']:.2f}",
'黃金交叉': '是' if latest['Golden_Cross'] else '否'
})
summary_df = pd.DataFrame(summary_data)
st.subheader("📋 分析摘要")
st.dataframe(summary_df, use_container_width=True)
# 提供Excel下載
excel_data = create_excel_download(all_data)
st.download_button(
label="📥 下載完整分析報告 (Excel)",
data=excel_data,
file_name=f"技術分析總表_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
else: # Google Sheets 匯入
st.header("📑 從Google Sheets匯入")
sheet_id = st.text_input(
"請輸入Google Sheets ID:",
value="1CZlw53z9Fns3XCQmlY_PXF2qOv2Wyqyn7mRrIzDRQMg"
)
if st.button("從Google Sheets匯入並分析", type="primary"):
try:
sheet_url = f"https://docs.google.com/spreadsheets/d/{sheet_id}/export?format=csv"
df_tickers = pd.read_csv(sheet_url)
tickers = df_tickers["股號"].dropna().astype(str).tolist()
st.info(f"從Google Sheets讀取到 {len(tickers)} 個股票代碼")
progress_bar = st.progress(0)
status_text = st.empty()
all_data = {}
for i, ticker in enumerate(tickers):
status_text.text(f"正在分析 {ticker} ({i+1}/{len(tickers)})")
progress_bar.progress((i + 1) / len(tickers))
df, error = analyze_stock(ticker)
if error:
st.warning(f"{ticker}: {error}")
else:
all_data[ticker] = df
status_text.text("分析完成!")
if all_data:
st.success(f"成功分析 {len(all_data)} 支股票")
# Excel下載
excel_data = create_excel_download(all_data)
st.download_button(
label="📥 下載完整分析報告 (Excel)",
data=excel_data,
file_name=f"技術分析總表_{datetime.now().strftime('%Y%m%d_%H%M%S')}.xlsx",
mime="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet"
)
except Exception as e:
st.error(f"讀取Google Sheets時發生錯誤: {e}")
# 說明資訊
with st.sidebar.expander("📖 使用說明"):
st.markdown("""
**技術指標說明:**
- **RSI < 30**: 超賣,考慮買進
- **RSI > 70**: 超買,考慮賣出
- **%B < 0**: 價格低於布林帶下軌
- **%B > 1**: 價格高於布林帶上軌
- **黃金交叉**: 短期均線突破長期均線
**股票代碼格式:**
- 美股: AAPL, MSFT, GOOGL
- 台股: 2330.TW, 2317.TW
""")
if __name__ == "__main__":
main()