portfolio_management / script /stream_processing.py
huggingface112's picture
move files to normal tracking except .db
976166f
raw
history blame
948 Bytes
import pandas as pd
import math
from datetime import datetime
import hvplot.pandas
import math
import numpy as np
import time
from streamz import Stream
b_stocks = pd.read_pickle('../data/b_stocks.pkl')
p_stocks = pd.read_pickle('../data/p_stocks.pkl')
p_profile = pd.read_pickle('../data/p_profile.pkl')
b_profile = pd.read_pickle('../data/b_profile.pkl')
# start stream here
dates = b_stocks.date.unique()
b_stocks[b_stocks.date == dates[1]]
def add(prev_df, new_df):
merged_df = pd.concat([prev_df, new_df])
merged_df.sort_values(by=['date'], inplace=True)
merged_df['pct'] = merged_df.groupby('ticker')['close'].pct_change()
# remove prev_df
merged_df = merged_df[~merged_df.isin(prev_df)].dropna()
return merged_df
source = Stream()
source.accumulate(add).sink(print)
source.emit(b_stocks[b_stocks.date == dates[0]])
source.emit(b_stocks[b_stocks.date == dates[1]])
source.emit(b_stocks[b_stocks.date == dates[2]])