Spaces:
Runtime error
Runtime error
from collections import defaultdict | |
import numpy as np | |
import scipy as sp | |
import calculator | |
import data_retriever | |
import streamlit as st | |
# plot the trend of the market as a candlestick graph. | |
def sector_trends(symbol, weeks_back): | |
# 1. get all peers | |
peers = data_retriever.get_peers(symbol) | |
# 2. get data for each peer | |
peers_stock_data = defaultdict(list) | |
dates = [] | |
for peer in peers: | |
peer_data = data_retriever.get_current_stock_data(peer, weeks_back) | |
if len(peer_data) == 0: | |
continue | |
if not any(dates): | |
dates = peer_data.index.format() | |
peers_stock_data[peer] = peer_data | |
# 3. normalize all data (get min->max value, then set each value to (X-min)/(max-min) | |
peers_stock_data_normalized = defaultdict(dict) | |
indicator_stock_data_normalized_peer_sum = defaultdict(list) | |
for peer, peer_stock_data in peers_stock_data.items(): | |
peer_stock_data_normalized = defaultdict(list) | |
for indicator in peer_stock_data: | |
indicator_normalized = peer_stock_data_normalized[indicator] | |
indicator_stock_data_normalized_sum = indicator_stock_data_normalized_peer_sum[indicator] | |
indicator_values = peer_stock_data[indicator] | |
min_val = np.min(indicator_values) | |
max_val = np.max(indicator_values) | |
delta = max_val - min_val | |
value_idx = 0 | |
for value in indicator_values: | |
normalized = (value - min_val)/delta | |
indicator_normalized.append(normalized) | |
while len(indicator_stock_data_normalized_sum) <= value_idx: | |
indicator_stock_data_normalized_sum.append(0) | |
indicator_stock_data_normalized_sum[value_idx] += normalized | |
value_idx += 1 | |
peers_stock_data_normalized[peer] = peer_stock_data_normalized | |
# 4. get the average value for each indicator [open, close, high, low] for each time-step. | |
peer_count = len(peers) | |
indicator_stock_data_normalized_peer_avg = defaultdict(list) | |
for indicator, indicator_sum_values in indicator_stock_data_normalized_peer_sum.items(): | |
indicator_avg_values = indicator_stock_data_normalized_peer_avg[indicator] | |
for sum_value in indicator_sum_values: | |
indicator_avg_values.append(sum_value/peer_count) | |
# 5. plot the resulting normalized-averaged-indicators in a candlestick chart. | |
close_data = defaultdict(list) | |
for peer, peer_data_normalized in peers_stock_data_normalized.items(): | |
close_data[peer] = peer_data_normalized['Close'] | |
relative_close_data = defaultdict(list) | |
for peer in peers_stock_data_normalized: | |
relative_close_data[peer] = [a - b for a, b in zip(close_data[peer], indicator_stock_data_normalized_peer_avg['Close'])] | |
return dates, close_data, relative_close_data, indicator_stock_data_normalized_peer_avg | |
def trend_line(date_indices, data_list, min_trend_size=7, step_size=1, sigma_multiplier=1): | |
trend_dates, trend_values = [], [] | |
if not any(date_indices) or not any(data_list): | |
return trend_dates, trend_values | |
np_dates = np.array([ts.timestamp() for ts in date_indices]) | |
dates = date_indices.format() | |
start_index = 0 | |
index = min_trend_size | |
while index < len(data_list): | |
np_dates_subset = np_dates[start_index:index] | |
np_values_subset = data_list[start_index:index] | |
# for the value range, calculate linear_regression and standard deviation | |
m, c = calculator.linear_regression_line(np_dates_subset, np_values_subset) | |
predicted_points = m * np_dates_subset + c | |
relative_mean = np.mean(np.abs(predicted_points - np_values_subset)) # TODO: use Welford's algorithm | |
# for the next datapoint(s), calculate the next value in that trend, | |
# and check if it is n*trend_sigma away from the current trend line | |
next_index = index + step_size | |
if next_index >= len(data_list): | |
break | |
x = np_dates[next_index:next_index+1] | |
y = data_list[next_index:next_index+1] | |
expected_y = m*x + c | |
dev = np.mean(np.abs(y - expected_y)) | |
if dev > relative_mean * sigma_multiplier: | |
# store the current date and value as a trend changer | |
trend_dates.append(dates[index]) | |
trend_values.append(data_list[index]) | |
# reset the calculation for the next data | |
start_index = next_index - min_trend_size | |
index = next_index | |
return trend_dates, trend_values | |
def vwap(symbol_price_data): | |
typical_price = (symbol_price_data['High'] + symbol_price_data['Low'] + symbol_price_data['Close']) / 3.0 | |
volume = symbol_price_data['Volume'] | |
vwap_values = (typical_price * volume).cumsum() / volume.cumsum() | |
return vwap_values | |
def sma(prices, period=20): | |
values = [] | |
for index in range(len(prices) - period): | |
values.append(np.mean(prices[index:index + period])) | |
return values | |
def bollinger_bands(dates, symbol_price_data, n_days_deviation=20, deviation_multiplier=2): | |
bollinger_dates, bollinger_high, bollinger_low = [], [], [] | |
closes = symbol_price_data['Close'] | |
for index in range(len(closes) - n_days_deviation): | |
bollinger_dates.append(dates[index + n_days_deviation]) | |
closes_subset = closes[index:index + n_days_deviation] | |
sma = np.sum(closes_subset)/len(closes_subset) | |
sigma = deviation_multiplier * np.sqrt(np.sum(np.power(closes_subset - sma, 2))/len(closes_subset)) | |
bollinger_high.append(sma + sigma) | |
bollinger_low.append(sma - sigma) | |
return bollinger_dates, bollinger_low, bollinger_high | |
def support_lines(): | |
a = 1 | |
# a linear regression over a period | |
def linear_regression_trend(prices, period=32): | |
np_dates = np.array([ts.timestamp() for ts in prices.index]) | |
np_values = np.array(prices) | |
value_count = len(prices) | |
trend = [] | |
for index in range(value_count-period): | |
np_dates_subset = np_dates[index:index+period+1] | |
np_values_subset = np_values[index:index+period+1] | |
low, high = calculator.linear_regression(np_dates_subset, np_values_subset) | |
trend.append(high) | |
return trend | |
def bids(dates, symbol_prices, period=14): | |
close_values = symbol_prices['Close'] | |
trend = linear_regression_trend(close_values, period=period) | |
sma_values = sma(close_values, period=period) | |
# get intercepts | |
dates_subset = dates[period:] | |
close_subset = close_values[period:] | |
sma_intercept_indices = calculator.intercepts(sma_values, close_subset) | |
trend_intercept_indices = calculator.intercepts(trend, close_subset) | |
sma_intercept_dates = [dates_subset[index] for index in sma_intercept_indices] | |
bids = [] | |
for index in sma_intercept_indices: | |
is_up = sma_values[index] > close_subset[index] | |
if is_up: | |
bids.append(10) | |
else: | |
bids.append(-10) | |
return sma_intercept_dates, bids | |