Spaces:
Sleeping
Sleeping
import ccxt | |
from models.market_data import MarketData, SymbolInfo | |
class KuCoinClient: | |
def __init__(self, session): | |
self.client = ccxt.kucoinfutures( | |
{ | |
"apiKey": session.kucoin_key, | |
"secret": session.kucoin_secret, | |
"password": session.kucoin_passphrase, | |
} | |
) | |
def get_kline_data(self, symbol, granularity, from_time, to_time): | |
klines = self.client.fetch_ohlcv( | |
symbol, timeframe=f"{granularity}m", since=from_time, limit=1000 | |
) | |
response = { | |
"code": "200000", | |
"data": [ | |
[k[0], str(k[1]), str(k[2]), str(k[3]), str(k[4]), str(k[5])] | |
for k in klines | |
], | |
} | |
return MarketData.from_kucoin_response(response, symbol, granularity) | |
def get_current_price(self, symbol: str) -> float: | |
ticker = self.client.fetch_ticker(symbol) | |
return float(ticker["last"]) | |