huggingface112's picture
refactor add_details_to_stock_df to unitls.py
c8b5a38
raw
history blame
11.9 kB
'''
contain method for api call to jqdatasdk
'''
from dotenv import load_dotenv
from datetime import datetime, timedelta
import jqdatasdk as jq
import pandas as pd
from typing import List, Optional
from sqlalchemy import create_engine
import table_schema as ts
import os
import utils
db_url = 'sqlite:///instance/local.db'
load_dotenv()
user_name = os.environ.get('JQDATA_USER')
password = os.environ.get('JQDATA_PASSWORD')
def auth_api(func):
"""
decorator for function require jqdatasdk api
"""
def wrapper(*args, **kwargs):
if (not jq.is_auth()):
jq.auth(user_name, password)
result = func(*args, **kwargs)
return result
return wrapper
def aggregate_sector(input: str) -> Optional[str]:
'''
mapping from sector to aggregated sector retur None if not found
this handling is for spotting undefined sector in current mapping
later
Return: str -- aggregated sector
None if no mapping
'''
mapping = {
'电气设备I': '工业',
'建筑装饰I': '工业',
'交通运输I': '工业',
'机械设备I': '工业',
'国防军工I': '工业',
'综合I': '工业',
'电子I': '信息与通信',
'计算机I': '信息与通信',
'通信I': '信息与通信',
'传媒I': '信息与通信',
'纺织服装I': '消费',
'家用电器I': '消费',
'汽车I': '消费',
'休闲服务I': '消费',
'商业贸易I': '消费',
'食品饮料I': '消费',
'美容护理I': '消费',
'农林牧渔I': '消费',
'钢铁I': '原料与能源',
'建筑材料I': '原料与能源',
'有色金属I': '原料与能源',
'化工I': '原料与能源',
'轻工制造I': '原料与能源',
'煤炭I': '原料与能源',
'石油石化I': '原料与能源',
'采掘I': '原料与能源',
'医药生物I': '医药卫生',
'公用事业I': '公用事业',
'环保I': '公用事业',
'房地产I': '金融与地产',
'银行I': '金融与地产',
'非银金融I': '金融与地产'
}
# return the first mapping found
sectors = input.split(" ")
maped_name = "其他"
for sector in sectors:
maped_name = mapping.get(sector, None)
if maped_name is not None:
return maped_name
return maped_name
@auth_api
def get_all_stock_info() -> tuple[pd.DataFrame, List[str]]:
'''
return all stock information
Return
------
tuple: tuple(pd.DataFrame, List[str])
DataFrame -- display_name | name | start_date | end_date | type
'''
error = []
try:
df = jq.get_all_securities()
df['ticker'] = df.index
df.reset_index(drop=True, inplace=True)
# df.reset_index(inplace=True)
return df, error
except Exception as e:
error.append(f'get_all_stock_info\n{e}')
return None, error
@auth_api
def add_detail_to_stocks(df: pd.DataFrame) -> List[str]:
"""
add display_name, name, sector, and aggregate sector to each stock if not exist already
return a list of error message
Args: pd.DataFrame
ticker | date | weight | sector | aggregate_sector | display_name | name
Returns: List[str], error messages
"""
error = []
df[['sector', 'aggregate_sector']] = df.groupby(
'ticker')[['sector', 'aggregate_sector']].ffill()
df[['display_name', 'name']] = df.groupby(
'ticker')[['display_name', 'name']].ffill()
not_have_sector = list(
df[df['aggregate_sector'].isnull()]['ticker'].unique())
not_have_name = list(df[df['name'].isnull()]['ticker'].unique())
# sector and aggregate sector
if len(not_have_sector) != 0:
try:
sectors = jq.get_industry(security=not_have_sector)
df['sector'] = df.apply(lambda x: x.sector if not pd.isna(x.sector)
else " ".join(value['industry_name']
for value in sectors[x.ticker].values()), axis=1)
df['aggregate_sector'] = df.apply(
lambda x: x.aggregate_sector if not pd.isna(x.aggregate_sector)
else aggregate_sector(x.sector), axis=1
)
except Exception as e:
error.append(f'Error on creaet_sector_information\n{ticker}\n{e}')
# display_name and name
if len(not_have_name) != 0:
try:
for ticker in not_have_name:
detail = jq.get_security_info(ticker)
df.loc[df.ticker.isin(not_have_name)
]['display_name'] = detail.display_name
df.loc[df.ticker.isin(not_have_name)]['name'] = detail.name
except Exception as e:
error.append(f'Error on get display_name and name\n{ticker}\n{e}')
return error
@auth_api
def update_portfolio_profile(stocks: List[dict], current_p: pd.DataFrame = None) -> tuple[pd.DataFrame, List[str]]:
"""create or update a portfolio profile,
return a time series of profile
Parameters
----------
stocks : List[{ticker: Str, shares: float, date:datetime}]
update profile with a list of stock information
current_p : pd.DataFrame, optional
current portfolio profile, default is None
Returns
-------
updated_profile : pd.DataFrame
ticker | date | weight | sector | aggregate_sector | display_name | name
error : List[str]
a list of error message
"""
error = []
profile_df = pd.DataFrame(stocks)
profile_df['sector'] = None
profile_df['aggregate_sector'] = None
# add display_name
try:
with create_engine(db_url).connect() as conn:
info_df = pd.read_sql_table(ts.STOCKS_DETAILS_TABLE, conn)
profile_df = pd.merge(
profile_df, info_df[['display_name', 'ticker', 'name', 'aggregate_sector', ]], on='ticker', how='left')
except Exception as e:
error.append(f'create_portfolio \n{e}')
# get sector information
incoming_error = add_detail_to_stocks(profile_df)
error.extend(incoming_error)
# concate to existing profile if exist
if current_p is not None:
profile_df = pd.concat([profile_df, current_p], ignore_index=True)
profile_df.drop_duplicates(
subset=['ticker', 'date'], keep='last', inplace=True)
profile_df.reset_index(drop=True, inplace=True)
return profile_df, error
@auth_api
def get_all_stocks_detail():
'''get df contain all stock display_name, name, sector, aggregate_sector'''
detail_df = jq.get_all_securities()
detail_df['ticker'] = detail_df.index
detail_df.reset_index(drop=True, inplace=True)
industry_info = jq.get_industry(detail_df.ticker.to_list())
detail_df['sector'] = detail_df.apply(lambda x: " ".join(
value['industry_name']for value in industry_info[x.ticker].values()), axis=1)
detail_df['aggregate_sector'] = detail_df.apply(
lambda x: aggregate_sector(x.sector), axis=1)
return detail_df
@auth_api
def get_api_usage():
return jq.get_query_count()
@auth_api
def get_stocks_price(profile: pd.DataFrame, start_date: datetime, end_date: datetime, frequency='daily') -> tuple[pd.DataFrame, List[str]]:
"""
Return a dataframe contain stock price between period of time for price in a portfolio profile
Arguments:
profile {pd.DataFrame} -- ticker | date | weight | sector | aggregate_sector | display_name | name
start_date {datetime} -- start date of the period include start date
end_date {datetime} -- end date of the period include end date
frequency {str} -- resolution of the price, default is daily
Returns: Tuple(pd.DataFrame, List[str])
pd.DataFrame -- ticker date open close high low volumn money
error_message {list} -- a list of error message
"""
error_message = []
start_str = start_date.strftime('%Y-%m-%d')
end_str = end_date.strftime('%Y-%m-%d')
if profile.date.min() < start_date:
# hanlde benchmark doesn't have weight on the exact date
start_str = profile.date.min().strftime('%Y-%m-%d')
ticker = profile['ticker'].to_list()
try:
data = jq.get_price(ticker, start_date=start_str,
end_date=end_str, frequency=frequency)
data.rename(columns={'time': 'date', 'code': "ticker"}, inplace=True)
return data, error_message
except Exception as e:
error_message.append(f'Error when fetching {ticker} \n {e}')
return None, error_message
@auth_api
def fetch_stocks_price(**params):
'''request list of stock price from start_date to end_date with frequency or count'''
stocks_df = jq.get_price(**params)
stocks_df.rename(columns={'code': 'ticker'}, inplace=True)
return stocks_df
@auth_api
def update_benchmark_profile(start_date: datetime,
end_date: datetime,
benchmark="000905.XSHG",
delta_time=timedelta(days=7),
profile: pd.DataFrame = None
) -> tuple[pd.DataFrame, List[str]]:
"""
update benchmark profile with available new update between start_date and end_date,
if no profile is given, create a new profile for that duration
the minimum period is 1 day
return an updated dataframe if new update exist
Returns:
pd.DateFrame -- ticker | date | weight | sector | aggregate_sector | display_name | name
date| weight | display_name | actual_data(the date in the api database) | ticker
"""
error_message = []
results = []
while start_date < end_date:
try:
date_str = start_date.strftime('%Y-%m-%d')
result = jq.get_index_weights(benchmark, date=date_str)
results.append(result)
except Exception as e:
error_message.append(f'Error when fetching {benchmark}\n\
update on {date_str} is missing\n\
{e}')
start_date += delta_time
# inlcude end date
try:
date_str = end_date.strftime('%Y-%m-%d')
result = jq.get_index_weights(benchmark, date=date_str)
results.append(result)
except Exception as e:
error_message.append(f'Error when fetching {benchmark}\n\
update on {date_str} is missing\n\
{e}')
# no update
if len(results) == 0:
return profile, error_message
# concate all result
update_df = pd.concat(results)
update_df['ticker'] = update_df.index
update_df['sector'] = None
update_df['aggregate_sector'] = None
update_df.reset_index(drop=True, inplace=True)
update_df['date'] = pd.to_datetime(update_df['date'])
# add display_name
try:
with create_engine(db_url).connect() as conn:
info_df = pd.read_sql('all_stock_info', conn)
update_df = pd.merge(
update_df, info_df[['ticker', 'name']], on='ticker', how='left')
except Exception as e:
# if all_stock_info not exist then create a name column manually
update_df['name'] = None
error_message.append(f'create_portfolio \n{e}')
# combine with existing profile if given
if profile is not None:
update_df = pd.concat([profile, update_df])
# remove duplicate result
update_df.drop_duplicates(
subset=['ticker', 'date'], keep='last', inplace=True)
# update deail
merged_df = utils.add_details_to_stock_df(update_df)
# error_message.extend(incoming_error)
return merged_df, error_message
# get_all_stocks_detail()