Spaces:
Runtime error
Runtime error
# -*- coding: utf-8 -*- | |
# File: app.py | |
# Project: 'Homework #3 OTUS.ML.Advanced' | |
# Created by Gennady Matveev (gm@og.ly) on 02-01-2022. | |
# %% | |
# Import libraries | |
import re | |
import pandas as pd | |
import numpy as np | |
import streamlit as st | |
import requests | |
import pickle | |
from sklearn.preprocessing import StandardScaler | |
from sklearn.cluster import KMeans | |
import tsfel | |
from kneed import KneeLocator | |
import cryptocompare as cc | |
import matplotlib.pyplot as plt | |
import plotly.express as px | |
from umap import UMAP | |
import warnings | |
plt.style.use("ggplot") | |
plt.rcParams["figure.figsize"] = (10, 5) | |
warnings.filterwarnings("ignore") | |
# pd.options.display.precision = 4 | |
random_state = 17 | |
scaler = StandardScaler() | |
n_jobs = -1 | |
# %% | |
st.set_page_config(page_title="Cryptocurrencies clustering", | |
page_icon='./head.ico', layout='centered', initial_sidebar_state='expanded') # wide | |
padding = 0 | |
st.markdown(f""" <style> | |
.reportview-container .main .block-container{{ | |
padding-top: {padding}rem; | |
padding-right: {padding}rem; | |
padding-left: {padding}rem; | |
padding-bottom: {padding}rem; | |
}} </style> """, unsafe_allow_html=True) | |
st.image('./mundus.png') | |
st.subheader('Clustering analysis of cryptocurrencies') | |
st.markdown( | |
'*Explore similarities in statisticial, temporal and spectral domains*') | |
st.markdown('''Top 100 cryptocurrencies' daily closing prices are downloaded. | |
Their dynamics can be analized in search of similarities between coins. | |
Up to 8 currencies from each cluster are shown below.''') | |
st.markdown("""---""") | |
# %% | |
# Set cryptocompare API key: | |
api_key = st.secrets["api_key"] | |
# %% | |
headers = { | |
"User-Agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/32.0.1700.107 Safari/537.36" | |
} | |
req = f"https://min-api.cryptocompare.com/data/top/mktcapfull?limit=100&tsym=USD&api_key={api_key}" | |
# Utility functions for data download | |
def get_price(ticker: str, limit: int): | |
return cc.get_historical_price_day(ticker, currency="USD", | |
limit=limit) | |
def get_all_cc(limit: int): | |
df = pd.DataFrame(index=range(limit)) | |
for tick in tickers: | |
# print(tick, end="\t") | |
try: | |
d = get_price(tick, limit) | |
one_cc = pd.DataFrame.from_dict(d)["close"] | |
one_cc.rename(index=tick, inplace=True) | |
df = pd.concat([df, one_cc], axis=1) | |
except: | |
pass | |
return df | |
# Utility functions for clustering analysis | |
def elbow_study(data, k_max: int = 10, model=KMeans): | |
X = scaler.fit_transform(data) | |
inertia = [] | |
for k in range(2, k_max): | |
clusterer = model(n_clusters=k, random_state=random_state) | |
X_km = clusterer.fit(X) | |
inertia.append(np.sqrt(X_km.inertia_)) | |
# Find a knee | |
kneedle = KneeLocator(range(2, k_max), inertia, S=2, | |
curve="convex", direction="decreasing") | |
# Use 3 clusters in case kneed doesn't find a knee | |
n_clusters = kneedle.knee or 3 | |
return n_clusters | |
def plot_clusters_2(data, Xt, n_clusters, random_state): | |
clusterer = KMeans(n_clusters=n_clusters, max_iter=100, | |
random_state=random_state) | |
X = scaler.fit_transform(Xt) | |
dd = data.copy() | |
dd.loc["cluster"] = clusterer.fit_predict(X.T) | |
color = ["red", "green", "blue", "purple", | |
"orange", "magenta", "goldenrod"] | |
clusters_no = dd.loc["cluster"].value_counts(sort=False) | |
for c in range(n_clusters): | |
cc = color[c] | |
fig, ax = plt.subplots(2, 4, sharex='col', figsize=(15, 5)) | |
cluster_ticks = dd.T[dd.T.loc[:, "cluster"] == c].index | |
for i, tick in enumerate(cluster_ticks[:8]): | |
ax[i % 2, i//2].plot(dd.iloc[:-1][tick], | |
color=cc) # , label=tick) | |
ax[i % 2, i//2].set_title(tick) | |
fig.suptitle(f"Cluster {c}, {clusters_no[c]} items\n", y=1.02) | |
st.pyplot(fig) | |
return dd | |
def visualize(Xt, n_clusters): | |
clusterer = KMeans(n_clusters=n_clusters, max_iter=100, | |
random_state=random_state) | |
X = scaler.fit_transform(Xt.T) | |
X_clust = clusterer.fit_predict(X) | |
X_color = X_clust.astype(str) | |
features = Xt.values | |
# UMAP | |
umap_3d = UMAP(n_components=3, init='random', | |
random_state=random_state) | |
proj_3d = umap_3d.fit_transform(features) | |
fig_3d = px.scatter_3d( | |
proj_3d, x=0, y=1, z=2, | |
color=X_color, labels={'color': 'clusters'}, | |
color_discrete_sequence=["red", "green", "blue", | |
"purple", "orange", "magenta", "goldenrod"], | |
title=f"UMAP projection from feature space", | |
width=800, height=600, | |
) | |
fig_3d.update_traces(marker_size=5) | |
# fig_3d.show() | |
st.write(fig_3d) | |
# %% | |
# START Sidebar ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
st.sidebar.image('./blau.png') | |
demo = st.sidebar.checkbox(label="Use demo data?", value=True, help="Use demo data or fetch actual?") | |
days=st.sidebar.number_input('Number of days for analysis', | |
min_value=30, max_value=100, value=60) | |
domain=st.sidebar.selectbox('Domain', ('statistical', 'temporal', 'spectral', 'all'), | |
index=1, help='Domain to use feature extraction') | |
st.sidebar.markdown("""---""") | |
analyze=st.sidebar.button('Start analysis') | |
# END Sidebar ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ | |
# Analysis | |
if analyze: | |
with st.spinner('Downloading data...'): | |
if demo==True: | |
with open("./demo_data.pkl", "rb") as f: | |
demo_data = pickle.load(f) | |
dl = demo_data.shape[0] | |
data_day = demo_data.iloc[dl-days:] | |
tickers = demo_data.columns | |
else: | |
top100=requests.get(req, headers=headers) | |
rs=re.compile(r"\"Name\":\"(?P<ticker>[A-Z0-9]+)\"") | |
tickers=rs.findall(top100.text) | |
data_day=get_all_cc(limit=days).copy() | |
with st.spinner(f'Extracting {domain} features...'): | |
dom=domain if domain != 'all' else None | |
cfg_file=tsfel.get_features_by_domain(dom) | |
# tsfel analysis | |
x_temp=tsfel.time_series_features_extractor( | |
cfg_file, data_day["BTC"], window_size=days) | |
tf_columns=x_temp.columns | |
xtf=pd.DataFrame(columns=data_day.columns, index=tf_columns) | |
# Fill df with features | |
for col in xtf.columns: | |
xtf[col]=tsfel.time_series_features_extractor( | |
cfg_file, data_day[col], window_size=days | |
).T | |
xtf.dropna(inplace=True) | |
# Features dataframe | |
Xt=pd.DataFrame(scaler.fit_transform( | |
xtf), columns=data_day.columns, index=xtf.index) | |
with st.spinner('Calculating optimal number of clusters...'): | |
# Get optimal no of clusters | |
n_clusters=elbow_study(Xt.T, model=KMeans) # metric="euclidean", | |
# Plot clusters | |
plot_clusters_2(data_day, Xt=Xt, n_clusters=n_clusters, | |
random_state=random_state | |
) | |
# Plot umap | |
# visualize(Xt, n_clusters) |