Spaces:
Runtime error
Runtime error
File size: 5,476 Bytes
de6e775 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 |
import calendar
from datetime import datetime
from typing import List
import ccxt
import numpy as np
import pandas as pd
from meta.data_processors._base import _Base
# from basic_processor import _Base
class Ccxt(_Base):
def __init__(
self,
data_source: str,
start_date: str,
end_date: str,
time_interval: str,
**kwargs,
):
super().__init__(data_source, start_date, end_date, time_interval, **kwargs)
self.binance = ccxt.binance()
def download_data(
self, ticker_list: List[str], save_path: str = "./data/dataset.csv"
):
crypto_column = pd.MultiIndex.from_product(
[ticker_list, ["open", "high", "low", "close", "volume"]]
)
first_time = True
for ticker in ticker_list:
start_dt = datetime.strptime(self.start_date, "%Y%m%d %H:%M:%S")
end_dt = datetime.strptime(self.end_date, "%Y%m%d %H:%M:%S")
start_timestamp = calendar.timegm(start_dt.utctimetuple())
end_timestamp = calendar.timegm(end_dt.utctimetuple())
if self.time_interval == "1Min":
date_list = [
datetime.utcfromtimestamp(float(time))
for time in range(start_timestamp, end_timestamp, 60 * 720)
]
else:
date_list = [
datetime.utcfromtimestamp(float(time))
for time in range(start_timestamp, end_timestamp, 60 * 1440)
]
df = self.ohlcv(date_list, ticker, self.time_interval)
if first_time:
dataset = pd.DataFrame(columns=crypto_column, index=df["time"].values)
first_time = False
temp_col = pd.MultiIndex.from_product(
[[ticker], ["open", "high", "low", "close", "volume"]]
)
dataset[temp_col] = df[["open", "high", "low", "close", "volume"]].values
print("Actual end time: " + str(df["time"].values[-1]))
self.dataframe = dataset
self.save_data(save_path)
print(
f"Download complete! Dataset saved to {save_path}. \nShape of DataFrame: {self.dataframe.shape}"
)
# def add_technical_indicators(self, df, pair_list, tech_indicator_list = [
# 'macd', 'boll_ub', 'boll_lb', 'rsi_30', 'dx_30',
# 'close_30_sma', 'close_60_sma']):
# df = df.dropna()
# df = df.copy()
# column_list = [pair_list, ['open','high','low','close','volume']+(tech_indicator_list)]
# column = pd.MultiIndex.from_product(column_list)
# index_list = df.index
# dataset = pd.DataFrame(columns=column,index=index_list)
# for pair in pair_list:
# pair_column = pd.MultiIndex.from_product([[pair],['open','high','low','close','volume']])
# dataset[pair_column] = df[pair]
# temp_df = df[pair].reset_index().sort_values(by=['index'])
# temp_df = temp_df.rename(columns={'index':'date'})
# crypto_df = Sdf.retype(temp_df.copy())
# for indicator in tech_indicator_list:
# temp_indicator = crypto_df[indicator].values.tolist()
# dataset[(pair,indicator)] = temp_indicator
# print('Succesfully add technical indicators')
# return dataset
def df_to_ary(self, pair_list, tech_indicator_list=None):
if tech_indicator_list is None:
tech_indicator_list = [
"macd",
"boll_ub",
"boll_lb",
"rsi_30",
"dx_30",
"close_30_sma",
"close_60_sma",
]
df = self.dataframe
df = df.dropna()
date_ary = df.index.values
price_array = df[pd.MultiIndex.from_product([pair_list, ["close"]])].values
tech_array = df[
pd.MultiIndex.from_product([pair_list, tech_indicator_list])
].values
return price_array, tech_array, date_ary
def min_ohlcv(self, dt, pair, limit):
since = calendar.timegm(dt.utctimetuple()) * 1000
return self.binance.fetch_ohlcv(
symbol=pair, timeframe="1m", since=since, limit=limit
)
def ohlcv(self, dt, pair, period="1d"):
ohlcv = []
limit = 1000
if period == "1Min":
limit = 720
elif period == "1D":
limit = 1
elif period == "1H":
limit = 24
elif period == "5Min":
limit = 288
for i in dt:
start_dt = i
since = calendar.timegm(start_dt.utctimetuple()) * 1000
if period == "1Min":
ohlcv.extend(self.min_ohlcv(start_dt, pair, limit))
else:
ohlcv.extend(
self.binance.fetch_ohlcv(
symbol=pair, timeframe=period, since=since, limit=limit
)
)
df = pd.DataFrame(
ohlcv, columns=["time", "open", "high", "low", "close", "volume"]
)
df["time"] = [datetime.fromtimestamp(float(time) / 1000) for time in df["time"]]
df["open"] = df["open"].astype(np.float64)
df["high"] = df["high"].astype(np.float64)
df["low"] = df["low"].astype(np.float64)
df["close"] = df["close"].astype(np.float64)
df["volume"] = df["volume"].astype(np.float64)
return df
|