|
|
""" |
|
|
Database Query Functions for Cached Market Data |
|
|
Provides REAL data access from cached_market_data and cached_ohlc tables |
|
|
|
|
|
CRITICAL RULES: |
|
|
- ONLY read from database - NEVER generate fake data |
|
|
- Return empty list if no data found |
|
|
- All queries must be REAL database operations |
|
|
""" |
|
|
|
|
|
import logging |
|
|
from datetime import datetime, timedelta |
|
|
from typing import Optional, List, Dict, Any |
|
|
from sqlalchemy import desc, and_, func |
|
|
from sqlalchemy.orm import Session |
|
|
|
|
|
from database.models import CachedMarketData, CachedOHLC |
|
|
from database.db_manager import DatabaseManager |
|
|
from utils.logger import setup_logger |
|
|
|
|
|
logger = setup_logger("cache_queries") |
|
|
|
|
|
|
|
|
class CacheQueries: |
|
|
""" |
|
|
Database query operations for cached market data |
|
|
|
|
|
CRITICAL: All methods return REAL data from database ONLY |
|
|
""" |
|
|
|
|
|
def __init__(self, db_manager: DatabaseManager): |
|
|
self.db = db_manager |
|
|
|
|
|
def get_cached_market_data( |
|
|
self, |
|
|
symbols: Optional[List[str]] = None, |
|
|
limit: int = 100 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get cached market data from database |
|
|
|
|
|
CRITICAL RULES: |
|
|
- ONLY read from cached_market_data table |
|
|
- NEVER generate or fake data |
|
|
- Return empty list if no data found |
|
|
- Use DISTINCT ON to get latest data per symbol |
|
|
|
|
|
Args: |
|
|
symbols: List of symbols to filter (e.g., ['BTC', 'ETH']) |
|
|
limit: Maximum number of records |
|
|
|
|
|
Returns: |
|
|
List of dictionaries with REAL market data from database |
|
|
""" |
|
|
try: |
|
|
with self.db.get_session() as session: |
|
|
|
|
|
subq = session.query( |
|
|
CachedMarketData.symbol, |
|
|
func.max(CachedMarketData.fetched_at).label('max_fetched_at') |
|
|
).group_by(CachedMarketData.symbol) |
|
|
|
|
|
if symbols: |
|
|
subq = subq.filter(CachedMarketData.symbol.in_(symbols)) |
|
|
|
|
|
subq = subq.subquery() |
|
|
|
|
|
|
|
|
query = session.query(CachedMarketData).join( |
|
|
subq, |
|
|
and_( |
|
|
CachedMarketData.symbol == subq.c.symbol, |
|
|
CachedMarketData.fetched_at == subq.c.max_fetched_at |
|
|
) |
|
|
).order_by(desc(CachedMarketData.fetched_at)).limit(limit) |
|
|
|
|
|
results = query.all() |
|
|
|
|
|
if not results: |
|
|
logger.info(f"No cached market data found for symbols={symbols}") |
|
|
return [] |
|
|
|
|
|
|
|
|
data = [] |
|
|
for row in results: |
|
|
data.append({ |
|
|
"symbol": row.symbol, |
|
|
"price": float(row.price), |
|
|
"market_cap": float(row.market_cap) if row.market_cap else None, |
|
|
"volume_24h": float(row.volume_24h) if row.volume_24h else None, |
|
|
"change_24h": float(row.change_24h) if row.change_24h else None, |
|
|
"high_24h": float(row.high_24h) if row.high_24h else None, |
|
|
"low_24h": float(row.low_24h) if row.low_24h else None, |
|
|
"provider": row.provider, |
|
|
"fetched_at": row.fetched_at |
|
|
}) |
|
|
|
|
|
logger.info(f"Retrieved {len(data)} cached market records") |
|
|
return data |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Database error in get_cached_market_data: {e}", exc_info=True) |
|
|
|
|
|
return [] |
|
|
|
|
|
def get_cached_ohlc( |
|
|
self, |
|
|
symbol: str, |
|
|
interval: str = "1h", |
|
|
limit: int = 1000 |
|
|
) -> List[Dict[str, Any]]: |
|
|
""" |
|
|
Get cached OHLC data from database |
|
|
|
|
|
CRITICAL RULES: |
|
|
- ONLY read from cached_ohlc table |
|
|
- NEVER generate fake candles |
|
|
- Return empty list if no data found |
|
|
- Order by timestamp ASC for chart display |
|
|
|
|
|
Args: |
|
|
symbol: Trading pair symbol (e.g., 'BTCUSDT') |
|
|
interval: Candle interval (e.g., '1h', '4h', '1d') |
|
|
limit: Maximum number of candles |
|
|
|
|
|
Returns: |
|
|
List of dictionaries with REAL OHLC data from database |
|
|
""" |
|
|
try: |
|
|
with self.db.get_session() as session: |
|
|
|
|
|
query = session.query(CachedOHLC).filter( |
|
|
and_( |
|
|
CachedOHLC.symbol == symbol, |
|
|
CachedOHLC.interval == interval |
|
|
) |
|
|
).order_by(desc(CachedOHLC.timestamp)).limit(limit) |
|
|
|
|
|
results = query.all() |
|
|
|
|
|
if not results: |
|
|
logger.info(f"No cached OHLC data found for {symbol} {interval}") |
|
|
return [] |
|
|
|
|
|
|
|
|
|
|
|
data = [] |
|
|
for row in reversed(results): |
|
|
data.append({ |
|
|
"timestamp": row.timestamp, |
|
|
"open": float(row.open), |
|
|
"high": float(row.high), |
|
|
"low": float(row.low), |
|
|
"close": float(row.close), |
|
|
"volume": float(row.volume), |
|
|
"provider": row.provider, |
|
|
"fetched_at": row.fetched_at |
|
|
}) |
|
|
|
|
|
logger.info(f"Retrieved {len(data)} OHLC candles for {symbol} {interval}") |
|
|
return data |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Database error in get_cached_ohlc: {e}", exc_info=True) |
|
|
|
|
|
return [] |
|
|
|
|
|
def save_market_data( |
|
|
self, |
|
|
symbol: str, |
|
|
price: float, |
|
|
market_cap: Optional[float] = None, |
|
|
volume_24h: Optional[float] = None, |
|
|
change_24h: Optional[float] = None, |
|
|
high_24h: Optional[float] = None, |
|
|
low_24h: Optional[float] = None, |
|
|
provider: str = "unknown" |
|
|
) -> bool: |
|
|
""" |
|
|
Save market data to cache |
|
|
|
|
|
CRITICAL: Only used by background workers to store REAL API data |
|
|
|
|
|
Args: |
|
|
symbol: Crypto symbol |
|
|
price: Current price (REAL from API) |
|
|
market_cap: Market cap (REAL from API) |
|
|
volume_24h: 24h volume (REAL from API) |
|
|
change_24h: 24h change (REAL from API) |
|
|
high_24h: 24h high (REAL from API) |
|
|
low_24h: 24h low (REAL from API) |
|
|
provider: Data provider name |
|
|
|
|
|
Returns: |
|
|
bool: True if saved successfully |
|
|
""" |
|
|
try: |
|
|
with self.db.get_session() as session: |
|
|
|
|
|
record = CachedMarketData( |
|
|
symbol=symbol, |
|
|
price=price, |
|
|
market_cap=market_cap, |
|
|
volume_24h=volume_24h, |
|
|
change_24h=change_24h, |
|
|
high_24h=high_24h, |
|
|
low_24h=low_24h, |
|
|
provider=provider, |
|
|
fetched_at=datetime.utcnow() |
|
|
) |
|
|
|
|
|
session.add(record) |
|
|
session.commit() |
|
|
|
|
|
logger.info(f"Saved market data for {symbol} from {provider}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving market data for {symbol}: {e}", exc_info=True) |
|
|
return False |
|
|
|
|
|
def save_ohlc_candle( |
|
|
self, |
|
|
symbol: str, |
|
|
interval: str, |
|
|
timestamp: datetime, |
|
|
open_price: float, |
|
|
high: float, |
|
|
low: float, |
|
|
close: float, |
|
|
volume: float, |
|
|
provider: str = "unknown" |
|
|
) -> bool: |
|
|
""" |
|
|
Save OHLC candle to cache |
|
|
|
|
|
CRITICAL: Only used by background workers to store REAL candle data |
|
|
|
|
|
Args: |
|
|
symbol: Trading pair symbol |
|
|
interval: Candle interval |
|
|
timestamp: Candle open time (REAL from API) |
|
|
open_price: Open price (REAL from API) |
|
|
high: High price (REAL from API) |
|
|
low: Low price (REAL from API) |
|
|
close: Close price (REAL from API) |
|
|
volume: Volume (REAL from API) |
|
|
provider: Data provider name |
|
|
|
|
|
Returns: |
|
|
bool: True if saved successfully |
|
|
""" |
|
|
try: |
|
|
with self.db.get_session() as session: |
|
|
|
|
|
existing = session.query(CachedOHLC).filter( |
|
|
and_( |
|
|
CachedOHLC.symbol == symbol, |
|
|
CachedOHLC.interval == interval, |
|
|
CachedOHLC.timestamp == timestamp |
|
|
) |
|
|
).first() |
|
|
|
|
|
if existing: |
|
|
|
|
|
existing.open = open_price |
|
|
existing.high = high |
|
|
existing.low = low |
|
|
existing.close = close |
|
|
existing.volume = volume |
|
|
existing.provider = provider |
|
|
existing.fetched_at = datetime.utcnow() |
|
|
else: |
|
|
|
|
|
record = CachedOHLC( |
|
|
symbol=symbol, |
|
|
interval=interval, |
|
|
timestamp=timestamp, |
|
|
open=open_price, |
|
|
high=high, |
|
|
low=low, |
|
|
close=close, |
|
|
volume=volume, |
|
|
provider=provider, |
|
|
fetched_at=datetime.utcnow() |
|
|
) |
|
|
session.add(record) |
|
|
|
|
|
session.commit() |
|
|
|
|
|
logger.debug(f"Saved OHLC candle for {symbol} {interval} at {timestamp}") |
|
|
return True |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error saving OHLC candle for {symbol}: {e}", exc_info=True) |
|
|
return False |
|
|
|
|
|
def cleanup_old_data(self, days: int = 7) -> Dict[str, int]: |
|
|
""" |
|
|
Remove old cached data to manage storage |
|
|
|
|
|
Args: |
|
|
days: Remove data older than N days |
|
|
|
|
|
Returns: |
|
|
Dictionary with counts of deleted records |
|
|
""" |
|
|
try: |
|
|
with self.db.get_session() as session: |
|
|
cutoff_time = datetime.utcnow() - timedelta(days=days) |
|
|
deleted_counts = {} |
|
|
|
|
|
|
|
|
deleted = session.query(CachedMarketData).filter( |
|
|
CachedMarketData.fetched_at < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['market_data'] = deleted |
|
|
|
|
|
|
|
|
deleted = session.query(CachedOHLC).filter( |
|
|
CachedOHLC.fetched_at < cutoff_time |
|
|
).delete() |
|
|
deleted_counts['ohlc'] = deleted |
|
|
|
|
|
session.commit() |
|
|
|
|
|
total_deleted = sum(deleted_counts.values()) |
|
|
logger.info(f"Cleaned up {total_deleted} old cache records (older than {days} days)") |
|
|
|
|
|
return deleted_counts |
|
|
|
|
|
except Exception as e: |
|
|
logger.error(f"Error cleaning up old data: {e}", exc_info=True) |
|
|
return {} |
|
|
|
|
|
|
|
|
|
|
|
_cache_queries = None |
|
|
|
|
|
def get_cache_queries(db_manager: Optional[DatabaseManager] = None) -> CacheQueries: |
|
|
""" |
|
|
Get global CacheQueries instance |
|
|
|
|
|
Args: |
|
|
db_manager: DatabaseManager instance (optional, will use global if not provided) |
|
|
|
|
|
Returns: |
|
|
CacheQueries instance |
|
|
""" |
|
|
global _cache_queries |
|
|
|
|
|
if _cache_queries is None: |
|
|
if db_manager is None: |
|
|
from database.db_manager import db_manager as global_db |
|
|
db_manager = global_db |
|
|
_cache_queries = CacheQueries(db_manager) |
|
|
|
|
|
return _cache_queries |
|
|
|