Dmitry Beresnev commited on
Commit
e803494
·
1 Parent(s): 2f9dfbe

add core moduler of the ticker scanner

Browse files
pyproject.toml CHANGED
@@ -69,6 +69,7 @@ dependencies = [
69
  "hmmlearn>=0.3.3",
70
  "xgboost>=3.1.1",
71
  "optuna>=4.5.0",
 
72
  ]
73
 
74
  [build-system]
 
69
  "hmmlearn>=0.3.3",
70
  "xgboost>=3.1.1",
71
  "optuna>=4.5.0",
72
+ "schedule>=1.2.2",
73
  ]
74
 
75
  [build-system]
src/core/ticker_scanner/__init__.py ADDED
File without changes
src/core/ticker_scanner/core_enums.py ADDED
@@ -0,0 +1,24 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from enum import Enum
2
+
3
+
4
+ class StockExchange(Enum):
5
+ NYSE = "NYSE" # New York Stock Exchange
6
+ NASDAQ = "NASDAQ" # NASDAQ
7
+ LSE = "LSE" # London Stock Exchange
8
+ TSE = "TSE" # Tokyo Stock Exchange
9
+ SSE = "SSE" # Shanghai Stock Exchange
10
+ HKEX = "HKEX" # Hong Kong Stock Exchange
11
+ BSE = "BSE" # Bombay Stock Exchange
12
+ NSE = "NSE" # National Stock Exchange of India
13
+ ASX = "ASX" # Australian Securities Exchange
14
+ TSX = "TSX" # Toronto Stock Exchange
15
+ SIX = "SIX" # Swiss Exchange
16
+ FWB = "FWB" # Frankfurt Stock Exchange
17
+
18
+
19
+ class GrowthCategory(Enum):
20
+ """Growth speed classification"""
21
+ EXPLOSIVE = "explosive" # Top 10%
22
+ STRONG = "strong" # Top 25%
23
+ MODERATE = "moderate" # Top 50%
24
+ SLOW = "slow" # Bottom 50%
src/core/ticker_scanner/parallel_data_downloader.py ADDED
@@ -0,0 +1,110 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """
2
+ parallel_yf_downloader.py
3
+ Parallel downloading of ticker historical prices using multiprocessing,
4
+ with retry and rate-limit handling and batching.
5
+ """
6
+
7
+ import time
8
+ import random
9
+ from itertools import islice
10
+ from typing import Any
11
+ from concurrent.futures import ProcessPoolExecutor, as_completed
12
+
13
+ import yfinance as yf
14
+
15
+ from src.core.ticker_scanner.core_enums import StockExchange
16
+ from src.core.ticker_scanner.tickers_provider import TickersProvider
17
+
18
+
19
+ MAX_WORKERS = 8 # Number of parallel processes
20
+ MAX_RETRIES = 3 # Retry count on failure
21
+ SLEEP_BETWEEN_RETRIES = 1.0 # Seconds between retries
22
+ BATCH_SIZE = 50 # Number of tickers per batch
23
+ MIN_DATA_POINTS = 50 # Minimum number of price points required
24
+
25
+
26
+ def fetch_prices(ticker: str, max_retries: int = MAX_RETRIES) -> dict[str, Any]:
27
+ """
28
+ Download all-time closing prices for a single ticker safely.
29
+ Returns dict {'ticker': ticker, 'prices': ndarray} or None if failed.
30
+ """
31
+ for attempt in range(max_retries):
32
+ try:
33
+ df = yf.download(ticker, period="max", progress=False, auto_adjust=True)
34
+ closes = df["Close"].dropna().values
35
+ if len(closes) < MIN_DATA_POINTS:
36
+ return None
37
+ return {"ticker": ticker, "prices": closes}
38
+ except yf.shared.YFRateLimitError:
39
+ wait = SLEEP_BETWEEN_RETRIES + random.random()
40
+ print(f"⚠️ Rate limited for {ticker}. Waiting {wait:.1f}s and retrying...")
41
+ time.sleep(wait)
42
+ except Exception:
43
+ return None
44
+ return None
45
+
46
+ def batch(iterable: list[str], n: int = BATCH_SIZE):
47
+ """
48
+ Yield successive n-sized batches from iterable.
49
+ """
50
+ it = iter(iterable)
51
+ while True:
52
+ chunk = list(islice(it, n))
53
+ if not chunk:
54
+ break
55
+ yield chunk
56
+
57
+ def download_tickers_parallel(tickers: list[str], max_workers: int = MAX_WORKERS) -> list[dict[str, Any]]:
58
+ """
59
+ Download a large list of tickers in parallel batches.
60
+ Returns a list of {'ticker': ..., 'prices': ...} dicts.
61
+ """
62
+ all_results = []
63
+ all_failed = []
64
+
65
+ for batch_num, ticker_batch in enumerate(batch(tickers, BATCH_SIZE), start=1):
66
+ print(f"🔹 Processing batch {batch_num}: {len(ticker_batch)} tickers")
67
+ results, failed = process_batch(ticker_batch, max_workers)
68
+ all_results.extend(results)
69
+ all_failed.extend(failed)
70
+ # small sleep between batches to reduce rate-limit chance
71
+ time.sleep(1 + random.random())
72
+
73
+ print(f"\n✅ Total downloaded: {len(all_results)}")
74
+ if all_failed:
75
+ print(f"❌ Total failed: {len(all_failed)}")
76
+ print("Failed tickers:", all_failed)
77
+ return all_results
78
+
79
+ def process_batch(ticker_batch: list[str], max_workers: int) -> tuple[list[dict[str, Any]], list[Any]]:
80
+ """
81
+ Process a batch of tickers in parallel using multiprocessing.
82
+ Returns tuple (successful_results, failed_tickers)
83
+ """
84
+ results = []
85
+ failed = []
86
+ with ProcessPoolExecutor(max_workers=max_workers) as executor:
87
+ futures = {executor.submit(fetch_prices, t): t for t in ticker_batch}
88
+ for future in as_completed(futures):
89
+ ticker = futures[future]
90
+ try:
91
+ res = future.result()
92
+ if res:
93
+ results.append(res)
94
+ else:
95
+ failed.append(ticker)
96
+ except Exception:
97
+ failed.append(ticker)
98
+ return results, failed
99
+
100
+ def run_parallel_data_downloader():
101
+ all_tickers = TickersProvider().get_tickers(StockExchange.NASDAQ)
102
+ tickers = all_tickers[:200] # Limit to first 200 for testing
103
+ print("🚀 Starting parallel download...")
104
+ data = download_tickers_parallel(tickers)
105
+ for d in data:
106
+ print(f"{d['ticker']}: {len(d['prices'])} price points")
107
+
108
+
109
+ if __name__ == "__main__":
110
+ run_parallel_data_downloader()
src/core/ticker_scanner/scheduler.py ADDED
@@ -0,0 +1,52 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import time
2
+ import signal
3
+
4
+ import schedule
5
+
6
+ from src.telegram_bot.logger import main_logger as logger
7
+
8
+
9
+ class Scheduler:
10
+ """Schedule and manage periodic analysis"""
11
+ def __init__(self, exchange: Exchange = Exchange.SP500,
12
+ schedule_time: str = "18:00"):
13
+ self.exchange = exchange
14
+ self.schedule_time = schedule_time
15
+ self.running = True
16
+ # Setup signal handlers for graceful shutdown
17
+ signal.signal(signal.SIGINT, self._signal_handler)
18
+ signal.signal(signal.SIGTERM, self._signal_handler)
19
+
20
+ def _signal_handler(self, signum, frame):
21
+ """Handle shutdown signals"""
22
+ logger.info("Received shutdown signal. Cleaning up...")
23
+ self.running = False
24
+
25
+ def run_scheduled_job(self):
26
+ """Execute the analysis job"""
27
+ try:
28
+ analyzer = AsyncTrendAnalyzer(self.exchange)
29
+ asyncio.run(analyzer.run_analysis())
30
+ analyzer.cleanup()
31
+ except Exception as e:
32
+ logger.error(f"Scheduled job failed: {e}", exc_info=True)
33
+
34
+ def start(self):
35
+ """Start the scheduler"""
36
+ logger.info(f"Starting scheduled analyzer for {self.exchange.value}")
37
+ logger.info(f"Schedule: Daily at {self.schedule_time}")
38
+
39
+ # Schedule the job
40
+ schedule.every().day.at(self.schedule_time).do(self.run_scheduled_job)
41
+
42
+ # Run immediately on startup
43
+ logger.info("Running initial analysis...")
44
+ self.run_scheduled_job()
45
+
46
+ # Main scheduler loop
47
+ logger.info("Scheduler active. Press Ctrl+C to stop.")
48
+ while self.running:
49
+ schedule.run_pending()
50
+ time.sleep(60) # Check every minute
51
+
52
+ logger.info("Scheduler stopped gracefully")
src/core/ticker_scanner/tickers_provider.py ADDED
@@ -0,0 +1,28 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import pandas as pd
2
+
3
+ from src.core.ticker_scanner.core_enums import StockExchange
4
+
5
+
6
+ class TickersProvider:
7
+ def load_active_nasdaq_tickers(self) -> list[str]:
8
+ url = "ftp://ftp.nasdaqtrader.com/SymbolDirectory/nasdaqtraded.txt"
9
+ df = pd.read_csv(url, sep="|")
10
+ # Keep only active tickers (Test Issue == 'N')
11
+ df_active = df[df["Test Issue"] == "N"]
12
+ tickers = df_active["NASDAQ Symbol"].tolist()
13
+ return tickers
14
+
15
+ def load_active_nyse_tickers(self) -> list[str]:
16
+ url = "https://eodhistoricaldata.com/api/exchange-symbol-list/NYSE.csv"
17
+ df = pd.read_csv(url)
18
+ # Keep only active, common stocks
19
+ df_active = df[(df['Type'] == 'Common Stock') & (df['Delisted'] != 1)]
20
+ tickers = df_active['Code'].tolist()
21
+ return tickers
22
+
23
+ def get_tickers(self, exchange: StockExchange) -> list[str]:
24
+ if exchange == exchange.NASDAQ:
25
+ return self.load_active_nasdaq_tickers()
26
+ elif exchange == exchange.NYSE:
27
+ return self.load_active_nyse_tickers()
28
+ return []