finance /
Jumper-Clown's picture
add paper trading back testing
from collections import defaultdict
import numpy as np
import scipy as sp
import calculator
import data_retriever
import streamlit as st
# plot the trend of the market as a candlestick graph.
def sector_trends(symbol, weeks_back):
# 1. get all peers
peers = data_retriever.get_peers(symbol)
# 2. get data for each peer
peers_stock_data = defaultdict(list)
dates = []
for peer in peers:
peer_data = data_retriever.get_current_stock_data(peer, weeks_back)
if len(peer_data) == 0:
if not any(dates):
dates = peer_data.index.format()
peers_stock_data[peer] = peer_data
# 3. normalize all data (get min->max value, then set each value to (X-min)/(max-min)
peers_stock_data_normalized = defaultdict(dict)
indicator_stock_data_normalized_peer_sum = defaultdict(list)
for peer, peer_stock_data in peers_stock_data.items():
peer_stock_data_normalized = defaultdict(list)
for indicator in peer_stock_data:
indicator_normalized = peer_stock_data_normalized[indicator]
indicator_stock_data_normalized_sum = indicator_stock_data_normalized_peer_sum[indicator]
indicator_values = peer_stock_data[indicator]
min_val = np.min(indicator_values)
max_val = np.max(indicator_values)
delta = max_val - min_val
value_idx = 0
for value in indicator_values:
normalized = (value - min_val)/delta
while len(indicator_stock_data_normalized_sum) <= value_idx:
indicator_stock_data_normalized_sum[value_idx] += normalized
value_idx += 1
peers_stock_data_normalized[peer] = peer_stock_data_normalized
# 4. get the average value for each indicator [open, close, high, low] for each time-step.
peer_count = len(peers)
indicator_stock_data_normalized_peer_avg = defaultdict(list)
for indicator, indicator_sum_values in indicator_stock_data_normalized_peer_sum.items():
indicator_avg_values = indicator_stock_data_normalized_peer_avg[indicator]
for sum_value in indicator_sum_values:
# 5. plot the resulting normalized-averaged-indicators in a candlestick chart.
close_data = defaultdict(list)
for peer, peer_data_normalized in peers_stock_data_normalized.items():
close_data[peer] = peer_data_normalized['Close']
relative_close_data = defaultdict(list)
for peer in peers_stock_data_normalized:
relative_close_data[peer] = [a - b for a, b in zip(close_data[peer], indicator_stock_data_normalized_peer_avg['Close'])]
return dates, close_data, relative_close_data, indicator_stock_data_normalized_peer_avg
def trend_line(date_indices, data_list, min_trend_size=7, step_size=1, sigma_multiplier=1):
trend_dates, trend_values = [], []
if not any(date_indices) or not any(data_list):
return trend_dates, trend_values
np_dates = np.array([ts.timestamp() for ts in date_indices])
dates = date_indices.format()
start_index = 0
index = min_trend_size
while index < len(data_list):
np_dates_subset = np_dates[start_index:index]
np_values_subset = data_list[start_index:index]
# for the value range, calculate linear_regression and standard deviation
m, c = calculator.linear_regression_line(np_dates_subset, np_values_subset)
predicted_points = m * np_dates_subset + c
relative_mean = np.mean(np.abs(predicted_points - np_values_subset)) # TODO: use Welford's algorithm
# for the next datapoint(s), calculate the next value in that trend,
# and check if it is n*trend_sigma away from the current trend line
next_index = index + step_size
if next_index >= len(data_list):
x = np_dates[next_index:next_index+1]
y = data_list[next_index:next_index+1]
expected_y = m*x + c
dev = np.mean(np.abs(y - expected_y))
if dev > relative_mean * sigma_multiplier:
# store the current date and value as a trend changer
# reset the calculation for the next data
start_index = next_index - min_trend_size
index = next_index
return trend_dates, trend_values
def vwap(symbol_price_data):
typical_price = (symbol_price_data['High'] + symbol_price_data['Low'] + symbol_price_data['Close']) / 3.0
volume = symbol_price_data['Volume']
vwap_values = (typical_price * volume).cumsum() / volume.cumsum()
return vwap_values
def sma(prices, period=20):
values = []
for index in range(len(prices) - period):
values.append(np.mean(prices[index:index + period]))
return values
def bollinger_bands(dates, symbol_price_data, n_days_deviation=20, deviation_multiplier=2):
bollinger_dates, bollinger_high, bollinger_low = [], [], []
closes = symbol_price_data['Close']
for index in range(len(closes) - n_days_deviation):
bollinger_dates.append(dates[index + n_days_deviation])
closes_subset = closes[index:index + n_days_deviation]
sma = np.sum(closes_subset)/len(closes_subset)
sigma = deviation_multiplier * np.sqrt(np.sum(np.power(closes_subset - sma, 2))/len(closes_subset))
bollinger_high.append(sma + sigma)
bollinger_low.append(sma - sigma)
return bollinger_dates, bollinger_low, bollinger_high
def support_lines():
a = 1
# a linear regression over a period
def linear_regression_trend(prices, period=32):
np_dates = np.array([ts.timestamp() for ts in prices.index])
np_values = np.array(prices)
value_count = len(prices)
trend = []
for index in range(value_count-period):
np_dates_subset = np_dates[index:index+period+1]
np_values_subset = np_values[index:index+period+1]
low, high = calculator.linear_regression(np_dates_subset, np_values_subset)
return trend
def bids(dates, symbol_prices, period=14):
close_values = symbol_prices['Close']
trend = linear_regression_trend(close_values, period=period)
sma_values = sma(close_values, period=period)
# get intercepts
dates_subset = dates[period:]
close_subset = close_values[period:]
sma_intercept_indices = calculator.intercepts(sma_values, close_subset)
trend_intercept_indices = calculator.intercepts(trend, close_subset)
sma_intercept_dates = [dates_subset[index] for index in sma_intercept_indices]
bids = []
for index in sma_intercept_indices:
is_up = sma_values[index] > close_subset[index]
if is_up:
return sma_intercept_dates, bids