Spaces:
No application file
No application file
# %% | |
from dotenv import load_dotenv | |
import os | |
from alpha_vantage.timeseries import TimeSeries | |
import pandas as pd | |
import hopsworks | |
import re | |
import modal | |
#prepocessing | |
import requests | |
import pandas as pd | |
import json | |
#import pandas_market_calendars as mcal | |
import datetime | |
import numpy as np | |
from datetime import datetime, timedelta | |
# %% | |
load_dotenv() | |
api_key = os.environ.get('stocks_api') # Replace this with your actual API key | |
ts = TimeSeries(key=api_key, output_format='pandas') | |
# Fetch daily adjusted stock prices; adjust the symbol as needed | |
data, meta_data = ts.get_daily(symbol='TSLA', outputsize='full') | |
print(data.head()) | |
# %% | |
data | |
# %% | |
data.info() | |
# %% | |
meta_data | |
# %% | |
#Stock market: | |
def today_is_a_business_day(today): | |
# Get the NYSE calendar | |
cal = mcal.get_calendar('NYSE') | |
schedule = cal.schedule(start_date=today, end_date=today) # Get the NYSE calendar's open and close times for the specified period | |
try: | |
isBusinessDay = schedule.market_open.dt.strftime('%Y-%m-%d') | |
return True | |
except: | |
print('Today {} is not a business day'.format(today)) | |
return False | |
# %% | |
def next_business_day(today): | |
# Real tomorrow | |
tomorrow = (today + timedelta(days=1)).strftime("%Y-%m-%d") | |
# Get the NYSE calendar | |
cal = mcal.get_calendar('NYSE') | |
found_next_business_day = False | |
while not found_next_business_day: | |
schedule = cal.schedule(start_date=tomorrow, end_date=tomorrow) # Get the NYSE calendar's open and close times for the specified period | |
try: | |
isBusinessDay = schedule.market_open.dt.strftime('%Y-%m-%d') # Only need a list of dates when it's open (not open and close times) | |
found_next_business_day = True | |
except: | |
print('The date {} is not a business day'.format(tomorrow)) | |
tomorrow = (datetime.datetime.strptime(tomorrow,"%Y-%m-%d") + timedelta(days=1)).strftime("%Y-%m-%d") | |
return isBusinessDay.to_numpy()[0] | |
# %% | |
def extract_business_day(start_date,end_date): | |
""" | |
Given a start_date and end_date. | |
`Returns`: | |
isBusinessDay: list of str (with all dates being business days) | |
is_open: boolean list | |
e.g is_open = [1,0,...,1] means that start_date = open, day after start_date = closed, and end_date = open | |
""" | |
# Save for later | |
end_date_save = end_date | |
# Get the NYSE calendar | |
cal = mcal.get_calendar('NYSE') | |
# Get the NYSE calendar's open and close times for the specified period | |
schedule = cal.schedule(start_date=start_date, end_date=end_date) | |
# Only need a list of dates when it's open (not open and close times) | |
isBusinessDay = np.array(schedule.market_open.dt.strftime('%Y-%m-%d')) | |
# Go over all days: | |
delta = datetime.timedelta(days=1) | |
start_date = datetime.datetime.strptime(start_date,"%Y-%m-%d") #datetime.date(2015, 7, 16) | |
end_date = datetime.datetime.strptime(end_date,"%Y-%m-%d") #datetime.date(2023, 1, 4) | |
# Extract days from the timedelta object | |
num_days = (end_date - start_date).days + 1 | |
# Create boolean array for days being open (1) and closed (0) | |
is_open = np.zeros(num_days) | |
# iterate over range of dates | |
current_BusinessDay = isBusinessDay[0] | |
count_dates = 0 | |
next_BusinessDay = 0 | |
while (start_date <= end_date): | |
if start_date.strftime('%Y-%m-%d') == current_BusinessDay: | |
is_open[count_dates] = True | |
if current_BusinessDay == end_date_save or current_BusinessDay==isBusinessDay[-1]: | |
break | |
else: | |
next_BusinessDay += 1 | |
current_BusinessDay = isBusinessDay[next_BusinessDay] | |
else: | |
is_open[count_dates] = False | |
count_dates += 1 | |
start_date += delta | |
print(np.shape(is_open)) | |
return isBusinessDay, is_open | |
# %% | |
def clean_column_name(name): | |
# Remove all non-letter characters | |
cleaned_name = re.sub(r'[^a-zA-Z]', '', name) | |
return cleaned_name | |
# %% | |
data.columns = [clean_column_name(col) for col in data.columns] | |
# %% | |
data.head() | |
# %% | |
data.reset_index(inplace=True) | |
# %% | |
data.head() | |
# %% | |
data | |
# %% | |
# Define the date range you're interested in | |
yesterday =datetime.now()-timedelta(days=1) | |
two_years_back = yesterday - timedelta(days=684) | |
# %% | |
# Filter the DataFrame to this range | |
filtered_df = data[(data['date'] >= two_years_back) & (data['date'] <= yesterday)] | |
# %% | |
filtered_df.head() | |
# %% | |
print(filtered_df['date'].min()) | |
print(filtered_df['date'].max()) | |
# %% | |
filtered_df.shape | |
# %% | |