| |
| """ |
| Preprocess distribution statistics for OHLC normalization and token history coverage. |
| |
| This script: |
| 1. Computes global mean/std figures for price/volume so downstream code can normalize. |
| 2. Prints descriptive stats about how much price history (in seconds) each token has, |
| helping decide which horizons are realistic. |
| |
| All configuration is done via environment variables (see below). |
| """ |
|
|
| import os |
| import pathlib |
| import sys |
| from typing import List |
|
|
| import numpy as np |
| import clickhouse_connect |
|
|
|
|
| |
| CLICKHOUSE_HOST = os.getenv("CLICKHOUSE_HOST", "localhost") |
| CLICKHOUSE_PORT = int(os.getenv("CLICKHOUSE_PORT", "8123")) |
| CLICKHOUSE_USERNAME = os.getenv("CLICKHOUSE_USERNAME", "default") |
| CLICKHOUSE_PASSWORD = os.getenv("CLICKHOUSE_PASSWORD", "") |
| CLICKHOUSE_DATABASE = os.getenv("CLICKHOUSE_DATABASE", "default") |
|
|
| OUTPUT_PATH = pathlib.Path(os.getenv("OHLC_STATS_PATH", "ohlc_stats.npz")) |
| MIN_PRICE_USD = float(os.getenv("OHLC_MIN_PRICE_USD", "0.0")) |
| MIN_VOLUME_USD = float(os.getenv("OHLC_MIN_VOLUME_USD", "0.0")) |
|
|
| TOKEN_ADDRESSES_ENV = os.getenv("OHLC_TOKEN_ADDRESSES", "") |
| TOKEN_ADDRESSES = tuple(addr.strip() for addr in TOKEN_ADDRESSES_ENV.split(",") if addr.strip()) or None |
|
|
|
|
| def build_where_clause() -> List[str]: |
| clauses = ["t.price_usd > %(min_price)s", "t.total_usd > %(min_vol)s"] |
| if TOKEN_ADDRESSES: |
| clauses.append("t.base_address IN %(token_addresses)s") |
| return clauses |
|
|
|
|
| def build_stats_query(where_sql: str) -> str: |
| return f""" |
| SELECT |
| AVG(t.price_usd) AS mean_price_usd, |
| stddevPop(t.price_usd) AS std_price_usd, |
| AVG(t.price) AS mean_price_native, |
| stddevPop(t.price) AS std_price_native, |
| AVG(t.total_usd) AS mean_trade_value_usd, |
| stddevPop(t.total_usd) AS std_trade_value_usd |
| FROM trades AS t |
| INNER JOIN mints AS m |
| ON m.mint_address = t.base_address |
| WHERE {where_sql} |
| """ |
|
|
|
|
| def build_history_query(where_sql: str) -> str: |
| return f""" |
| SELECT |
| t.base_address AS token_address, |
| toUnixTimestamp(min(t.timestamp)) AS first_ts, |
| toUnixTimestamp(max(t.timestamp)) AS last_ts, |
| toUnixTimestamp(max(t.timestamp)) - toUnixTimestamp(min(t.timestamp)) AS history_seconds |
| FROM trades AS t |
| INNER JOIN mints AS m |
| ON m.mint_address = t.base_address |
| WHERE {where_sql} |
| GROUP BY token_address |
| """ |
|
|
|
|
| def summarize_histories(histories: np.ndarray) -> None: |
| if histories.size == 0: |
| print("No token history stats available (no qualifying trades).") |
| return |
|
|
| stats = { |
| "count": histories.size, |
| "min": histories.min(), |
| "median": float(np.median(histories)), |
| "mean": histories.mean(), |
| "p90": float(np.percentile(histories, 90)), |
| "max": histories.max(), |
| } |
|
|
| def format_seconds(sec: float) -> str: |
| hours = sec / 3600.0 |
| days = hours / 24.0 |
| return f"{sec:.0f}s ({hours:.2f}h / {days:.2f}d)" |
|
|
| print("\nToken history coverage (seconds):") |
| print(f" Tokens analyzed: {int(stats['count'])}") |
| print(f" Min history: {format_seconds(stats['min'])}") |
| print(f" Median history: {format_seconds(stats['median'])}") |
| print(f" Mean history: {format_seconds(stats['mean'])}") |
| print(f" 90th percentile: {format_seconds(stats['p90'])}") |
| print(f" Max history: {format_seconds(stats['max'])}") |
|
|
|
|
| def main() -> int: |
| where_clauses = build_where_clause() |
| where_sql = " AND ".join(where_clauses) if where_clauses else "1" |
| params: dict[str, object] = { |
| "min_price": max(MIN_PRICE_USD, 0.0), |
| "min_vol": max(MIN_VOLUME_USD, 0.0), |
| } |
| if TOKEN_ADDRESSES: |
| params["token_addresses"] = TOKEN_ADDRESSES |
|
|
| client = clickhouse_connect.get_client( |
| host=CLICKHOUSE_HOST, |
| port=CLICKHOUSE_PORT, |
| username=CLICKHOUSE_USERNAME, |
| password=CLICKHOUSE_PASSWORD, |
| database=CLICKHOUSE_DATABASE, |
| ) |
|
|
| |
| stats_query = build_stats_query(where_sql) |
| stats_result = client.query(stats_query, parameters=params) |
| if not stats_result.result_rows: |
| print("ERROR: Stats query returned no rows. Check filters / connectivity.", file=sys.stderr) |
| return 1 |
| ( |
| mean_price_usd, |
| std_price_usd, |
| mean_price_native, |
| std_price_native, |
| mean_trade_value_usd, |
| std_trade_value_usd, |
| ) = stats_result.result_rows[0] |
|
|
| stats = { |
| "mean_price_usd": float(mean_price_usd or 0.0), |
| "std_price_usd": float(std_price_usd or 1.0), |
| "mean_price_native": float(mean_price_native or 0.0), |
| "std_price_native": float(std_price_native or 1.0), |
| "mean_trade_value_usd": float(mean_trade_value_usd or 0.0), |
| "std_trade_value_usd": float(std_trade_value_usd or 1.0), |
| } |
|
|
| OUTPUT_PATH.parent.mkdir(parents=True, exist_ok=True) |
| np.savez(OUTPUT_PATH, **stats) |
|
|
| print(f"Saved stats to {OUTPUT_PATH.resolve()}:") |
| for key, value in stats.items(): |
| print(f" {key}: {value:.6f}") |
|
|
| |
| history_query = build_history_query(where_sql) |
| history_result = client.query(history_query, parameters=params) |
| history_seconds = np.array( |
| [float(row[3]) for row in history_result.result_rows if row[3] is not None], |
| dtype=np.float64 |
| ) |
| summarize_histories(history_seconds) |
| return 0 |
|
|
|
|
| if __name__ == "__main__": |
| raise SystemExit(main()) |
|
|