# ETH with Vector Autoregressive (VAR) model


## Importing/Downloading all the libraries required


In [None]:
import re
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
from statsmodels.tsa.api import VAR
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.stattools import grangercausalitytests
from statsmodels.tsa.vector_ar.vecm import coint_johansen
from statsmodels.stats.stattools import durbin_watson

## Data Preprocessing


### Importing and summarizing the datasets


In [None]:
sentimentdf = pd.read_parquet("hf://datasets/tmotagam/Cryptocurrencies-sentiment-from-X/ETH-sentiment-dataset.parquet")
sentimentdf.drop('id', axis=1, inplace=True)
sentimentdf.set_index('date', inplace=True)
ethdf = pd.read_excel('ETH-USD.xlsx', parse_dates=['timestamp'], index_col=0)
print('====================================================================================')
print('ETH Sentiment Summary:')
print(sentimentdf.describe())
print('====================================================================================')
print('ETH Sentiment Data:')
print(sentimentdf.tail())
print('====================================================================================')
print('ETH Price Summary:')
print(ethdf.describe())
print('====================================================================================')
print('ETH Price Data:')
print(ethdf.tail())
print('====================================================================================')

### Removing duplicate and unwanted data points, columns from the datasets

In [None]:
sentimentdf['tmpdate'] = sentimentdf.index
date_ids = sentimentdf['tmpdate'].unique()
for date in date_ids:
  tmpdf = sentimentdf[sentimentdf['tmpdate'] == date]
  tmpdf = tmpdf.drop_duplicates()
  sentimentdf = pd.concat([sentimentdf, tmpdf]).drop_duplicates()
sentimentdf = sentimentdf.drop('tmpdate', axis=1)
ethdf.drop(['low', 'open', 'volume', 'close', 'high'], axis=1, inplace=True)
ethdf = ethdf.loc['2021-12-29':]
print('====================================================================================')
print('ETH Sentiment Summary:')
print(sentimentdf.describe())
print('====================================================================================')
print('ETH Sentiment Data:')
print(sentimentdf.head())
print('====================================================================================')
print('ETH Price Summary:')
print(ethdf.describe())
print('====================================================================================')
print('ETH Price Data:')
print(ethdf.head())
print('====================================================================================')

### Getting sentiment score and there average using VADER

In [None]:
analyzer = SentimentIntensityAnalyzer()
sentimentdf['neg'] = [analyzer.polarity_scores(re.sub(r"(@[A-Za-z0–9_]+)|[^\w\s]|#|http\S+", "", x.replace("\n"," ")))['neg'] for x in sentimentdf['content']]
sentimentdf['pos'] = [analyzer.polarity_scores(re.sub(r"(@[A-Za-z0–9_]+)|[^\w\s]|#|http\S+", "", x.replace("\n"," ")))['pos'] for x in sentimentdf['content']]
sentimentdf['neu'] = [analyzer.polarity_scores(re.sub(r"(@[A-Za-z0–9_]+)|[^\w\s]|#|http\S+", "", x.replace("\n"," ")))['neu'] for x in sentimentdf['content']]
sentimentdf.drop(['content'], axis=1, inplace=True)
df_grouped = sentimentdf.groupby(sentimentdf.index.date)
averages = df_grouped.apply(lambda x: np.sum(x, axis=0) / x.shape[0])
averages_reshape = np.vstack(averages.values)
df_averages = pd.DataFrame(averages_reshape, index=averages.index, columns=sentimentdf.columns)
print('====================================================================================')
print('ETH Sentiment Summary:')
print(df_averages.describe())
print('====================================================================================')
print('ETH Sentiment Data:')
print(df_averages.head())
print('====================================================================================')

### Combining the two datasets

In [None]:
df = ethdf.assign(neg=df_averages['neg'], pos=df_averages['pos'], neu=df_averages['neu'])
print('====================================================================================')
print('Summary:')
print(df.describe())
print('====================================================================================')
print('Data:')
print(df.head())
print('====================================================================================')

### Plotting the dataset


In [None]:
fig, axes = plt.subplots(nrows=4, ncols=1, dpi=120, figsize=(10,6))
for i, ax in enumerate(axes.flatten()):
    data = df[df.columns[i]]
    ax.plot(data, color='red', linewidth=1)
    ax.set_title(df.columns[i])
    ax.xaxis.set_ticks_position('none')
    ax.yaxis.set_ticks_position('none')
    ax.spines["top"].set_alpha(0)
    ax.tick_params(labelsize=6)

plt.tight_layout()
plt.show()

### Granger Causality Test

Granger Causality Test is of all possible combinations of the Time series.
The rows are the response variable, columns are predictors. The values in the table
are the P-Values. P-Values lesser than the significance level (0.05), implies
the Null Hypothesis that the coefficients of the corresponding past values is
zero, that is, the X does not cause Y can be rejected.


In [None]:
maxlag=12
test = 'ssr_chi2test'
def grangers_causation_matrix(data, variables, test='ssr_chi2test', verbose=False):
    df = pd.DataFrame(np.zeros((len(variables), len(variables))), columns=variables, index=variables)
    for c in df.columns:
        for r in df.index:
            test_result = grangercausalitytests(data[[r, c]], maxlag=maxlag)
            p_values = [round(test_result[i+1][0][test][1],4) for i in range(maxlag)]
            min_p_value = np.min(p_values)
            df.loc[r, c] = min_p_value
    df.columns = [var + '_x' for var in variables]
    df.index = [var + '_y' for var in variables]
    return df

grangers_causation_matrix(df, variables = df.columns)

### Johanson's Cointegration Test

The Johansen test, named after Søren Johansen, is a procedure for testing cointegration of several, say k, I(1) time series.
This test permits more than one cointegrating relationship so is more generally applicable than the Engle–Granger test which is based on the Dickey–Fuller (or the augmented) test for unit roots in the residuals from a single (estimated) cointegrating relationship.


In [None]:
def cointegration_test(df, alpha=0.05):
    out = coint_johansen(df,-1,5)
    d = {'0.90':0, '0.95':1, '0.99':2}
    traces = out.lr1
    cvts = out.cvt[:, d[str(1-alpha)]]
    def adjust(val, length= 6): return str(val).ljust(length)

    # Summary
    print('Name   ::  Test Stat > C(95%)    =>   Signif  \n', '--'*20)
    for col, trace, cvt in zip(df.columns, traces, cvts):
        print(adjust(col), ':: ', adjust(round(trace,2), 9), ">", adjust(cvt, 8), ' =>  ' , trace > cvt)

cointegration_test(df)

### Train and Test Split


In [None]:
nobs = 10 # number of observations to be forecasted
df_train, df_test = df[0:-nobs], df[-nobs:]

print(df_train.shape)
print(df_test.shape)

### ADFuller to test for Stationarity of given series

An augmented Dickey–Fuller test (ADF) tests the null hypothesis that a unit root is present in a time series sample.
The alternative hypothesis is different depending on which version of the test is used, but is usually stationarity or trend-stationarity.
It is an augmented version of the Dickey–Fuller test for a larger and more complicated set of time series models.

The augmented Dickey–Fuller (ADF) statistic, used in the test, is a negative number.
The more negative it is, the stronger the rejection of the hypothesis that there is a unit root at some level of confidence.


In [None]:
def adfuller_test(series,name, signif=0.05, verbose=False):
    r = adfuller(series, autolag='AIC')
    output = {'test_statistic':round(r[0], 4), 'pvalue':round(r[1], 4), 'n_lags':round(r[2], 4), 'n_obs':r[3]}
    p_value = output['pvalue']
    def adjust(val, length= 6): return str(val).ljust(length)

    print(f'    Augmented Dickey-Fuller Test on "{name}"', "\n   ", '-'*47)
    print(f' Null Hypothesis: Data has unit root. Non-Stationary.')
    print(f' Significance Level    = {signif}')
    print(f' Test Statistic        = {output["test_statistic"]}')
    print(f' No. Lags Chosen       = {output["n_lags"]}')

    for key,val in r[4].items():
        print(f' Critical value {adjust(key)} = {round(val, 3)}')

    if p_value <= signif:
        print(f" => P-Value = {p_value}. Rejecting Null Hypothesis.")
        print(f" => Series is Stationary.")
    else:
        print(f" => P-Value = {p_value}. Weak evidence to reject the Null Hypothesis.")
        print(f" => Series is Non-Stationary.")

for name, column in df_train.items():
    adfuller_test(column, name=name)
    print('\n')

### Since the series is non stationary we will perform differencing and run the ADF test again


In [None]:
df_differenced = df_train.diff().dropna()

In [None]:
for name, column in df_differenced.items():
    adfuller_test(column, name=name)
    print('\n')

### Selecting Lag Order (p) for VAR model


In [None]:
model = VAR(df_differenced)
for i in [1,2,3,4,5,6,7,8,9]:
    result = model.fit(i)
    print('Lag Order =', i)
    print('AIC : ', result.aic)
    print('BIC : ', result.bic)
    print('FPE : ', result.fpe)
    print('HQIC: ', result.hqic, '\n')

x = model.select_order(maxlags=12)
x.summary()

## Model Training


In [None]:
model_fitted = model.fit(5)
model_fitted.summary()

## Durbin Watson Test

The Durbin–Watson statistic is a test statistic used to detect the presence of autocorrelation at lag 1 in the residuals (prediction errors) from a regression analysis.
It is named after James Durbin and Geoffrey Watson.
The small sample distribution of this ratio was derived by John von Neumann (von Neumann, 1941).
Durbin and Watson (1950, 1951) applied this statistic to the residuals from least squares regressions, and developed bounds tests for the null hypothesis that the errors are serially uncorrelated against the alternative that they follow a first order autoregressive process.
Note that the distribution of this test statistic does not depend on the estimated regression coefficients and the variance of the errors.


In [None]:
out = durbin_watson(model_fitted.resid)

for col, val in zip(df.columns, out):
    print(col, ':', round(val, 2))

### Forecasting


In [None]:
# Get the lag order
lag_order = model_fitted.k_ar
print(lag_order)

# Input data for forecasting
forecast_input = df_differenced.values[-lag_order:]
print(forecast_input)

fc = model_fitted.forecast(y=forecast_input, steps=nobs)
df_forecast = pd.DataFrame(fc, index=df.index[-nobs:], columns=df.columns + '_1d')
df_forecast

## Inversion of differencing


In [None]:
def invert_transformation(df_train, df_forecast, second_diff=False):
    df_fc = df_forecast.copy()
    columns = df_train.columns
    for col in columns:
        # Roll back 2nd Diff
        if second_diff:
            df_fc[str(col)+'_1d'] = (df_train[col].iloc[-1]-df_train[col].iloc[-2]) + df_fc[str(col)+'_2d'].cumsum()
        # Roll back 1st Diff
        df_fc[str(col)+'_forecast'] = df_train[col].iloc[-1] + df_fc[str(col)+'_1d'].cumsum()
    return df_fc

df_results = invert_transformation(df_train, df_forecast, second_diff=False)
df_results.loc[:, ['adjclose_forecast', 'neg_forecast', 'pos_forecast', 'neu_forecast']]

## Plot Forcast


In [None]:
fig, axes = plt.subplots(nrows=len(df.columns), ncols=1, dpi=150, figsize=(10,10))
for i, (col,ax) in enumerate(zip(df.columns, axes.flatten())):
    df_results[col+'_forecast'].plot(legend=True, ax=ax).autoscale(axis='x',tight=True)
    df_test[col][-nobs:].plot(legend=True, ax=ax)
    ax.set_title(col + ": Forecast vs Actuals")
    ax.xaxis.set_ticks_position('none')
    ax.yaxis.set_ticks_position('none')
    ax.spines["top"].set_alpha(0)
    ax.tick_params(labelsize=6)

plt.tight_layout()
plt.show()

## Error of Forecast


In [None]:
def forecast_accuracy(forecast, actual):
    mape = np.mean(np.abs(forecast - actual)/np.abs(actual))  # MAPE
    me = np.mean(forecast - actual)             # ME
    mae = np.mean(np.abs(forecast - actual))    # MAE
    mpe = np.mean((forecast - actual)/actual)   # MPE
    rmse = np.mean((forecast - actual)**2)**.5  # RMSE
    corr = np.corrcoef(forecast, actual)[0,1]   # corr
    mins = np.amin(np.hstack([forecast[:,None],
                              actual[:,None]]), axis=1)
    maxs = np.amax(np.hstack([forecast[:,None],
                              actual[:,None]]), axis=1)
    minmax = 1 - np.mean(mins/maxs)             # minmax
    return({'mape':mape, 'me':me, 'mae': mae,
            'mpe': mpe, 'rmse':rmse, 'corr':corr, 'minmax':minmax})

print('Forecast Accuracy of: adjclose')
accuracy_prod = forecast_accuracy(df_results['adjclose_forecast'].values, df_test['adjclose'].values)
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))

print('\nForecast Accuracy of: pos')
accuracy_prod = forecast_accuracy(df_results['pos_forecast'].values, df_test['pos'].values)
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))

print('\nForecast Accuracy of: neg')
accuracy_prod = forecast_accuracy(df_results['neg_forecast'].values, df_test['neg'].values)
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))

print('\nForecast Accuracy of: neu')
accuracy_prod = forecast_accuracy(df_results['neu_forecast'].values, df_test['neu'].values)
for k, v in accuracy_prod.items():
    print(k, ': ', round(v,4))