File size: 11,127 Bytes
86f4dfa
 
d9f8113
86f4dfa
6578daf
 
 
d9f8113
3abff25
 
 
 
 
86f4dfa
 
4944958
 
 
 
 
 
3abff25
4944958
3abff25
86f4dfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d9f8113
86f4dfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
d9f8113
86f4dfa
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4944958
86f4dfa
4944958
 
 
 
 
 
 
d9f8113
3abff25
 
 
4944958
 
 
6578daf
4944958
 
6578daf
3abff25
 
6578daf
4944958
 
 
86f4dfa
3abff25
86f4dfa
 
 
 
 
 
 
 
 
d9f8113
86f4dfa
d9f8113
 
3abff25
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
import pandas as pd
import numpy as np
import yfinance as yf
import streamlit as st
from datetime import datetime, timedelta
import pytz




# Function to fetch stock data
def fetch_stock_data(ticker, start_datetime, end_datetime):
    stock_data = yf.download(ticker, start=start_datetime, end=end_datetime)
    return stock_data

def fetch_stock_data(ticker, start_datetime, end_datetime):
    # Fetch live data using yfinance
    stock_data = yf.download(ticker, start=start_datetime, end=end_datetime)

    # Convert DatetimeIndex to a column
    df.reset_index(inplace=True)

    return stock_data

# Function to detect head and shoulder patterns
def detect_head_shoulder(df, window=3):
    roll_window = window
    df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
    df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
    mask_head_shoulder = (
        (df['high_roll_max'] > df['High'].shift(1)) &
        (df['high_roll_max'] > df['High'].shift(-1)) &
        (df['High'] < df['High'].shift(1)) &
        (df['High'] < df['High'].shift(-1))
    )
    mask_inv_head_shoulder = (
        (df['low_roll_min'] < df['Low'].shift(1)) &
        (df['low_roll_min'] < df['Low'].shift(-1)) &
        (df['Low'] > df['Low'].shift(1)) &
        (df['Low'] > df['Low'].shift(-1))
    )
    df['head_shoulder_pattern'] = np.nan
    df.loc[mask_head_shoulder, 'head_shoulder_pattern'] = 'Head and Shoulder'
    df.loc[mask_inv_head_shoulder, 'head_shoulder_pattern'] = 'Inverse Head and Shoulder'
    return df

# Function to detect multiple tops and bottoms
def detect_multiple_tops_bottoms(df, window=3):
    roll_window = window
    df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
    df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
    df['close_roll_max'] = df['Close'].rolling(window=roll_window).max()
    df['close_roll_min'] = df['Close'].rolling(window=roll_window).min()
    mask_top = (df['high_roll_max'] >= df['High'].shift(1)) & (df['close_roll_max'] < df['Close'].shift(1))
    mask_bottom = (df['low_roll_min'] <= df['Low'].shift(1)) & (df['close_roll_min'] > df['Close'].shift(1))
    df['multiple_top_bottom_pattern'] = np.nan
    df.loc[mask_top, 'multiple_top_bottom_pattern'] = 'Multiple Top'
    df.loc[mask_bottom, 'multiple_top_bottom_pattern'] = 'Multiple Bottom'
    return df

# Function to calculate support and resistance levels
def calculate_support_resistance(df, window=3):
    roll_window = window
    std_dev = 2
    df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
    df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
    mean_high = df['High'].rolling(window=roll_window).mean()
    std_high = df['High'].rolling(window=roll_window).std()
    mean_low = df['Low'].rolling(window=roll_window).mean()
    std_low = df['Low'].rolling(window=roll_window).std()
    df['support'] = mean_low - std_dev * std_low
    df['resistance'] = mean_high + std_dev * std_high
    return df

# Function to detect triangle patterns
def detect_triangle_pattern(df, window=3):
    roll_window = window
    df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
    df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
    mask_asc = (
        (df['high_roll_max'] >= df['High'].shift(1)) &
        (df['low_roll_min'] <= df['Low'].shift(1)) &
        (df['Close'] > df['Close'].shift(1))
    )
    mask_desc = (
        (df['high_roll_max'] <= df['High'].shift(1)) &
        (df['low_roll_min'] >= df['Low'].shift(1)) &
        (df['Close'] < df['Close'].shift(1))
    )
    df['triangle_pattern'] = np.nan
    df.loc[mask_asc, 'triangle_pattern'] = 'Ascending Triangle'
    df.loc[mask_desc, 'triangle_pattern'] = 'Descending Triangle'
    return df

# Function to detect wedge patterns
def detect_wedge(df, window=3):
    roll_window = window
    df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
    df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
    df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
    df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
    mask_wedge_up = (
        (df['high_roll_max'] >= df['High'].shift(1)) &
        (df['low_roll_min'] <= df['Low'].shift(1)) &
        (df['trend_high'] == 1) &
        (df['trend_low'] == 1)
    )
    mask_wedge_down = (
        (df['high_roll_max'] <= df['High'].shift(1)) &
        (df['low_roll_min'] >= df['Low'].shift(1)) &
        (df['trend_high'] == -1) &
        (df['trend_low'] == -1)
    )
    df['wedge_pattern'] = np.nan
    df.loc[mask_wedge_up, 'wedge_pattern'] = 'Wedge Up'
    df.loc[mask_wedge_down, 'wedge_pattern'] = 'Wedge Down'
    return df

# Function to detect channel patterns
def detect_channel(df, window=3):
    roll_window = window
    channel_range = 0.1
    df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
    df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
    df['trend_high'] = df['High'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) < 0 else 0)
    df['trend_low'] = df['Low'].rolling(window=roll_window).apply(lambda x: 1 if (x[-1]-x[0]) > 0 else -1 if (x[-1]-x[0]) <0 else 0)
    mask_channel_up = (
        (df['high_roll_max'] >= df['High'].shift(1)) &
        (df['low_roll_min'] <= df['Low'].shift(1)) &
        (df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) &
        (df['trend_high'] == 1) &
        (df['trend_low'] == 1)
    )
    mask_channel_down = (
        (df['high_roll_max'] <= df['High'].shift(1)) &
        (df['low_roll_min'] >= df['Low'].shift(1)) &
        (df['high_roll_max'] - df['low_roll_min'] <= channel_range * (df['high_roll_max'] + df['low_roll_min'])/2) &
        (df['trend_high'] == -1) &
        (df['trend_low'] == -1)
    )
    df['channel_pattern'] = np.nan
    df.loc[mask_channel_up, 'channel_pattern'] = 'Channel Up'
    df.loc[mask_channel_down, 'channel_pattern'] = 'Channel Down'
    return df

# Function to detect double top and bottom patterns
def detect_double_top_bottom(df, window=3, threshold=0.05):
    roll_window = window
    range_threshold = threshold
    df['high_roll_max'] = df['High'].rolling(window=roll_window).max()
    df['low_roll_min'] = df['Low'].rolling(window=roll_window).min()
    mask_double_top = (
        (df['high_roll_max'] >= df['High'].shift(1)) &
        (df['high_roll_max'] >= df['High'].shift(-1)) &
        (df['High'] < df['High'].shift(1)) &
        (df['High'] < df['High'].shift(-1)) &
        ((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) &
        ((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2)
    )
    mask_double_bottom = (
        (df['low_roll_min'] <= df['Low'].shift(1)) &
        (df['low_roll_min'] <= df['Low'].shift(-1)) &
        (df['Low'] > df['Low'].shift(1)) &
        (df['Low'] > df['Low'].shift(-1)) &
        ((df['High'].shift(1) - df['Low'].shift(1)) <= range_threshold * (df['High'].shift(1) + df['Low'].shift(1))/2) &
        ((df['High'].shift(-1) - df['Low'].shift(-1)) <= range_threshold * (df['High'].shift(-1) + df['Low'].shift(-1))/2)
    )
    df['double_pattern'] = np.nan
    df.loc[mask_double_top, 'double_pattern'] = 'Double Top'
    df.loc[mask_double_bottom, 'double_pattern'] = 'Double Bottom'
    return df

# Function to detect trendlines
def detect_trendline(df, window=2):
    roll_window = window
    df['slope'] = np.nan
    df['intercept'] = np.nan

    for i in range(window, len(df)):
        x = np.array(range(i-window, i))
        y = df['Close'][i-window:i]
        A = np.vstack([x, np.ones(len(x))]).T
        m, c = np.linalg.lstsq(A, y, rcond=None)[0]
        df.at[df.index[i], 'slope'] = m
        df.at[df.index[i], 'intercept'] = c

    mask_support = df['slope'] > 0
    mask_resistance = df['slope'] < 0
    df['support'] = np.nan
    df['resistance'] = np.nan
    df.loc[mask_support, 'support'] = df['Close'] * df['slope'] + df['intercept']
    df.loc[mask_resistance, 'resistance'] = df['Close'] * df['slope'] + df['intercept']

    return df

# Function to find pivots
def find_pivots(df):
    high_diffs = df['High'].diff()
    low_diffs = df['Low'].diff()
    higher_high_mask = (high_diffs > 0) & (high_diffs.shift(-1) < 0)
    lower_low_mask = (low_diffs < 0) & (low_diffs.shift(-1) > 0)
    lower_high_mask = (high_diffs < 0) & (high_diffs.shift(-1) > 0)
    higher_low_mask = (low_diffs > 0) & (low_diffs.shift(-1) < 0)
    df['signal'] = ''
    df.loc[higher_high_mask, 'signal'] = 'HH'
    df.loc[lower_low_mask, 'signal'] = 'LL'
    df.loc[lower_high_mask, 'signal'] = 'LH'
    df.loc[higher_low_mask, 'signal'] = 'HL'
    return df

# Streamlit App
def main():
    st.title('Live Stock Pattern Detection App')
    ticker = st.text_input('Enter Stock Ticker:', 'AAPL')
    
    # Placeholder for start and end date
    start_date_placeholder = st.empty()
    end_date_placeholder = st.empty()

    start_date = start_date_placeholder.date_input('Start Date', pd.to_datetime('2022-01-01'))
    end_date = end_date_placeholder.date_input('End Date', pd.to_datetime('2022-02-01'))

    st.info("Select Preferred Timezone:")
    preferred_timezone = st.selectbox('Timezone', list(pytz.all_timezones))

    # Time selection components
    start_hour, start_minute, start_second = st.slider('Select Start Time', 0, 23, 0), st.slider('', 0, 59, 0), st.slider('', 0, 59, 0)
    end_hour, end_minute, end_second = st.slider('Select End Time', 0, 23, 23), st.slider('', 0, 59, 59), st.slider('', 0, 59, 59)

    start_datetime = datetime(start_date.year, start_date.month, start_date.day, start_hour, start_minute, start_second)
    end_datetime = datetime(end_date.year, end_date.month, end_date.day, end_hour, end_minute, end_second)

    start_datetime = pytz.timezone(preferred_timezone).localize(start_datetime)
    end_datetime = pytz.timezone(preferred_timezone).localize(end_datetime)

    start_date_placeholder.info(f"Start Date (UTC): {start_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}")
    end_date_placeholder.info(f"End Date (UTC): {end_datetime.strftime('%Y-%m-%d %H:%M:%S %Z')}")

    if st.button('Detect Patterns'):
        stock_data = fetch_stock_data(ticker, start_datetime, end_datetime)
        stock_data = detect_head_shoulder(stock_data)
        stock_data = detect_multiple_tops_bottoms(stock_data)
        stock_data = calculate_support_resistance(stock_data)
        stock_data = detect_triangle_pattern(stock_data)
        stock_data = detect_wedge(stock_data)
        stock_data = detect_channel(stock_data)
        stock_data = detect_double_top_bottom(stock_data)
        stock_data = detect_trendline(stock_data)
        stock_data = find_pivots(stock_data)

        st.write(stock_data)

if __name__ == "__main__":
    main()