Spaces:
Sleeping
Sleeping
| # models/pattern_recognition.py | |
| import pandas as pd | |
| import numpy as np | |
| from scipy.signal import find_peaks | |
| class PatternRecognition: | |
| def __init__(self, data: pd.DataFrame): | |
| self.data = data | |
| self.patterns = pd.DataFrame(index=data.index) | |
| def detect_patterns(self) -> pd.DataFrame: | |
| """Detect various chart patterns.""" | |
| self._detect_double_top_bottom() | |
| self._detect_head_and_shoulders() | |
| self._detect_triangles() | |
| self._detect_channels() | |
| return self.patterns | |
| def _detect_double_top_bottom(self): | |
| """Detect double top and double bottom patterns.""" | |
| prices = self.data['Close'].values | |
| peaks, _ = find_peaks(prices, distance=20) | |
| troughs, _ = find_peaks(-prices, distance=20) | |
| self.patterns['Double_Top'] = 0 | |
| self.patterns['Double_Bottom'] = 0 | |
| # Detect double tops | |
| for i in range(len(peaks)-1): | |
| if abs(prices[peaks[i]] - prices[peaks[i+1]]) < prices[peaks[i]] * 0.02: | |
| self.patterns.iloc[peaks[i+1], self.patterns.columns.get_loc('Double_Top')] = 1 | |
| # Detect double bottoms | |
| for i in range(len(troughs)-1): | |
| if abs(prices[troughs[i]] - prices[troughs[i+1]]) < prices[troughs[i]] * 0.02: | |
| self.patterns.iloc[troughs[i+1], self.patterns.columns.get_loc('Double_Bottom')] = 1 | |
| def _detect_head_and_shoulders(self): | |
| """Detect head and shoulders pattern.""" | |
| self.patterns['Head_And_Shoulders'] = 0 | |
| prices = self.data['Close'].values | |
| peaks, _ = find_peaks(prices, distance=20) | |
| for i in range(len(peaks)-2): | |
| if (prices[peaks[i+1]] > prices[peaks[i]] and | |
| prices[peaks[i+1]] > prices[peaks[i+2]] and | |
| abs(prices[peaks[i]] - prices[peaks[i+2]]) < prices[peaks[i]] * 0.02): | |
| self.patterns.iloc[peaks[i+2], self.patterns.columns.get_loc('Head_And_Shoulders')] = 1 | |
| def _detect_triangles(self): | |
| """Detect ascending and descending triangles.""" | |
| self.patterns['Triangle'] = 0 | |
| window = 20 | |
| for i in range(window, len(self.data)): | |
| subset = self.data.iloc[i-window:i] | |
| highs = subset['High'] | |
| lows = subset['Low'] | |
| high_slope = np.polyfit(range(window), highs, 1)[0] | |
| low_slope = np.polyfit(range(window), lows, 1)[0] | |
| if abs(high_slope) < 0.1 and low_slope > 0.1: # Ascending triangle | |
| self.patterns.iloc[i, self.patterns.columns.get_loc('Triangle')] = 1 | |
| elif high_slope < -0.1 and abs(low_slope) < 0.1: # Descending triangle | |
| self.patterns.iloc[i, self.patterns.columns.get_loc('Triangle')] = 1 | |
| def _detect_channels(self): | |
| """Detect price channels.""" | |
| self.patterns['Channel'] = 0 | |
| window = 20 | |
| for i in range(window, len(self.data)): | |
| subset = self.data.iloc[i-window:i] | |
| highs = subset['High'] | |
| lows = subset['Low'] | |
| high_slope = np.polyfit(range(window), highs, 1)[0] | |
| low_slope = np.polyfit(range(window), lows, 1)[0] | |
| if abs(high_slope - low_slope) < 0.05: # Parallel channel | |
| self.patterns.iloc[i, self.patterns.columns.get_loc('Channel')] = 1 |