Pt / app.py
Geek7's picture
Update app.py
4944958 verified
import pandas as pd
import numpy as np
import yfinance as yf
import streamlit as st
from datetime import datetime, timedelta
import pytz
# Function to fetch stock data
def fetch_stock_data(ticker, start_datetime, end_datetime):
stock_data = yf.download(ticker, start=start_datetime, end=end_datetime)
return stock_data
def fetch_stock_data(ticker, start_datetime, end_datetime):
# Fetch live data using yfinance
stock_data = yf.download(ticker, start=start_datetime, end=end_datetime)
# Convert DatetimeIndex to a column
df.reset_index(inplace=True)
return stock_data
# Function to detect head and shoulder patterns
def detect_head_shoulder(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mask_head_shoulder = (
(df['high_roll_max'] > df['High'].shift(1)) &
(df['high_roll_max'] > df['High'].shift(-1)) &
(df['High'] < df['High'].shift(1)) &
(df['High'] < df['High'].shift(-1))
)
mask_inv_head_shoulder = (
(df['low_roll_min'] < df['Low'].shift(1)) &
(df['low_roll_min'] < df['Low'].shift(-1)) &
(df['Low'] > df['Low'].shift(1)) &
(df['Low'] > df['Low'].shift(-1))
)
df['head_shoulder_pattern'] = np.nan
df.loc[mask_head_shoulder, 'head_shoulder_pattern'] = 'Head and Shoulder'
df.loc[mask_inv_head_shoulder, 'head_shoulder_pattern'] = 'Inverse Head and Shoulder'
return df
# Function to detect multiple tops and bottoms
def detect_multiple_tops_bottoms(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
df['close_roll_max'] = df['Close'].rolling(window=roll_window).max()
df['close_roll_min'] = df['Close'].rolling(window=roll_window).min()
mask_top = (df['high_roll_max'] >= df['High'].shift(1)) & (df['close_roll_max'] < df['Close'].shift(1))
mask_bottom = (df['low_roll_min'] <= df['Low'].shift(1)) & (df['close_roll_min'] > df['Close'].shift(1))
df['multiple_top_bottom_pattern'] = np.nan
df.loc[mask_top, 'multiple_top_bottom_pattern'] = 'Multiple Top'
df.loc[mask_bottom, 'multiple_top_bottom_pattern'] = 'Multiple Bottom'
return df
# Function to calculate support and resistance levels
def calculate_support_resistance(df, window=3):
roll_window = window
std_dev = 2
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mean_high = df['High'].rolling(window=roll_window).mean()
std_high = df['High'].rolling(window=roll_window).std()
mean_low = df['Low'].rolling(window=roll_window).mean()
std_low = df['Low'].rolling(window=roll_window).std()
df['support'] = mean_low - std_dev * std_low
df['resistance'] = mean_high + std_dev * std_high
return df
# Function to detect triangle patterns
def detect_triangle_pattern(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mask_asc = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['Close'] > df['Close'].shift(1))
)
mask_desc = (
(df['high_roll_max'] <= df['High'].shift(1)) &
(df['low_roll_min'] >= df['Low'].shift(1)) &
(df['Close'] < df['Close'].shift(1))
)
df['triangle_pattern'] = np.nan
df.loc[mask_asc, 'triangle_pattern'] = 'Ascending Triangle'
df.loc[mask_desc, 'triangle_pattern'] = 'Descending Triangle'
return df
# Function to detect wedge patterns
def detect_wedge(df, window=3):
roll_window = window
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
mask_wedge_up = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['trend_high'] == 1) &
(df['trend_low'] == 1)
)
mask_wedge_down = (
(df['high_roll_max'] <= df['High'].shift(1)) &
(df['low_roll_min'] >= df['Low'].shift(1)) &
(df['trend_high'] == -1) &
(df['trend_low'] == -1)
)
df['wedge_pattern'] = np.nan
df.loc[mask_wedge_up, 'wedge_pattern'] = 'Wedge Up'
df.loc[mask_wedge_down, 'wedge_pattern'] = 'Wedge Down'
return df
# Function to detect channel patterns
def detect_channel(df, window=3):
roll_window = window
channel_range = 0.1
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) <0 else 0)
mask_channel_up = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) &
(df['trend_high'] == 1) &
(df['trend_low'] == 1)
)
mask_channel_down = (
(df['high_roll_max'] <= df['High'].shift(1)) &
(df['low_roll_min'] >= df['Low'].shift(1)) &
(df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) &
(df['trend_high'] == -1) &
(df['trend_low'] == -1)
)
df['channel_pattern'] = np.nan
df.loc[mask_channel_up, 'channel_pattern'] = 'Channel Up'
df.loc[mask_channel_down, 'channel_pattern'] = 'Channel Down'
return df
# Function to detect double top and bottom patterns
def detect_double_top_bottom(df, window=3, threshold=0.05):
roll_window = window
range_threshold = threshold
df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
mask_double_top = (
(df['high_roll_max'] >= df['High'].shift(1)) &
(df['high_roll_max'] >= df['High'].shift(-1)) &
(df['High'] < df['High'].shift(1)) &
(df['High'] < df['High'].shift(-1)) &
((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) &
((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2)
)
mask_double_bottom = (
(df['low_roll_min'] <= df['Low'].shift(1)) &
(df['low_roll_min'] <= df['Low'].shift(-1)) &
(df['Low'] > df['Low'].shift(1)) &
(df['Low'] > df['Low'].shift(-1)) &
((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) &
((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2)
)
df['double_pattern'] = np.nan
df.loc[mask_double_top, 'double_pattern'] = 'Double Top'
df.loc[mask_double_bottom, 'double_pattern'] = 'Double Bottom'
return df
# Function to detect trendlines
def detect_trendline(df, window=2):
roll_window = window
df['slope'] = np.nan
df['intercept'] = np.nan
for i in range(window, len(df)):
x = np.array(range(i-window, i))
y = df['Close'][i-window:i]
A = np.vstack([x, np.ones(len(x))]).T
m, c = np.linalg.lstsq(A, y, rcond=None)[0]
df.at[df.index[i], 'slope'] = m
df.at[df.index[i], 'intercept'] = c
mask_support = df['slope'] > 0
mask_resistance = df['slope'] < 0
df['support'] = np.nan
df['resistance'] = np.nan
df.loc[mask_support, 'support'] = df['Close'] * df['slope'] + df['intercept']
df.loc[mask_resistance, 'resistance'] = df['Close'] * df['slope'] + df['intercept']
return df
# Function to find pivots
def find_pivots(df):
high_diffs = df['High'].diff()
low_diffs = df['Low'].diff()
higher_high_mask = (high_diffs > 0) & (high_diffs.shift(-1) < 0)
lower_low_mask = (low_diffs < 0) & (low_diffs.shift(-1) > 0)
lower_high_mask = (high_diffs < 0) & (high_diffs.shift(-1) > 0)
higher_low_mask = (low_diffs > 0) & (low_diffs.shift(-1) < 0)
df['signal'] = ''
df.loc[higher_high_mask, 'signal'] = 'HH'
df.loc[lower_low_mask, 'signal'] = 'LL'
df.loc[lower_high_mask, 'signal'] = 'LH'
df.loc[higher_low_mask, 'signal'] = 'HL'
return df
# Streamlit App
def main():
st.title('Live Stock Pattern Detection App')
ticker = st.text_input('Enter Stock Ticker:', 'AAPL')
# Placeholder for start and end date
start_date_placeholder = st.empty()
end_date_placeholder = st.empty()
start_date = start_date_placeholder.date_input('Start Date', pd.to_datetime('2022-01-01'))
end_date = end_date_placeholder.date_input('End Date', pd.to_datetime('2022-02-01'))
st.info("Select Preferred Timezone:")
preferred_timezone = st.selectbox('Timezone', list(pytz.all_timezones))
# Time selection components
start_hour, start_minute, start_second = st.slider('Select Start Time', 0, 23, 0), st.slider('', 0, 59, 0), st.slider('', 0, 59, 0)
end_hour, end_minute, end_second = st.slider('Select End Time', 0, 23, 23), st.slider('', 0, 59, 59), st.slider('', 0, 59, 59)
start_datetime = datetime(start_date.year, start_date.month, start_date.day, start_hour, start_minute, start_second)
end_datetime = datetime(end_date.year, end_date.month, end_date.day, end_hour, end_minute, end_second)
start_datetime = pytz.timezone(preferred_timezone).localize(start_datetime)
end_datetime = pytz.timezone(preferred_timezone).localize(end_datetime)
start_date_placeholder.info(f"Start Date (UTC): {start_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}")
end_date_placeholder.info(f"End Date (UTC): {end_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}")
if st.button('Detect Patterns'):
stock_data = fetch_stock_data(ticker, start_datetime, end_datetime)
stock_data = detect_head_shoulder(stock_data)
stock_data = detect_multiple_tops_bottoms(stock_data)
stock_data = calculate_support_resistance(stock_data)
stock_data = detect_triangle_pattern(stock_data)
stock_data = detect_wedge(stock_data)
stock_data = detect_channel(stock_data)
stock_data = detect_double_top_bottom(stock_data)
stock_data = detect_trendline(stock_data)
stock_data = find_pivots(stock_data)
st.write(stock_data)
if __name__ == "__main__":
main()