import pytz import datetime import io import pandas as pd import table_schema as ts from sqlalchemy import create_engine db_url = 'sqlite:///instance/local.db' def time_in_beijing(strip_time_zone=True): ''' return current time in Beijing as datetime object ''' tz = pytz.timezone('Asia/Shanghai') dt = datetime.datetime.now(tz) if strip_time_zone: dt = dt.replace(tzinfo=None) return dt def add_details_to_stock_df(stock_df): '''return df adding sector, aggregate sector, display_name, name to it Parameters ---------- stock_df: pd.DataFrame the dataframe contain ticker columns Returns ------- merged_df: pd.DataFrame the dataframe with sector, aggregate sector, display_name and name added ''' with create_engine(db_url).connect() as conn: detail_df = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn) merged_df = pd.merge(stock_df, detail_df[ ['sector', 'name', 'aggregate_sector', 'display_name', 'ticker'] ], on='ticker', how='left') merged_df['aggregate_sector'].fillna('其他', inplace=True) return merged_df def convert_string_to_datetime(date_string, time_zone="Asia/Shanghai"): ''' Convert a string to a datetime object with the timezone by default, Shanghai ''' dt = datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f') tz = pytz.timezone(time_zone) dt = tz.localize(dt) return dt def create_stocks_entry_from_excel(byte_string): '''create stock entry from excel file Parameters ---------- byte_string: bytes the byte string of the excel file Returns ------- new_stock_entry: list [{ticker:str, shares:int, mean_price: float, date:datetime.datetime}] the list of stock entry ''' uploaded_df = None with io.BytesIO(byte_string) as f: uploaded_df = pd.read_excel(f) # throw exception if doesn't have required columns if not set(['证券代码', '持仓数量', '平均建仓成本', 'time_stamp']).issubset(uploaded_df.columns): raise Exception('Missing required columns') # print(uploaded_df) # uploaded_df = pd.read_excel() uploaded_df.drop(columns='Unnamed: 0', inplace=True) # Define the regular expression pattern to match the string endings pattern = r'\.(sz|sh)$' # Define the replacement strings for each match group replacements = {'.sz': '.XSHE', '.sh': '.XSHG'} # Use the str.replace method with the pattern and replacements uploaded_df['证券代码'] = uploaded_df['证券代码'].str.lower() uploaded_df['证券代码'] = uploaded_df['证券代码'].str.replace( pattern, lambda m: replacements[m.group()], regex=True) new_stock_entry = [ dict(ticker=ticker, shares=shares, date=time, mean_price=mean_price) for ticker, shares, mean_price, time in zip( uploaded_df['证券代码'], uploaded_df['持仓数量'], uploaded_df['平均建仓成本'], pd.to_datetime(uploaded_df['time_stamp']))] # new_profile, error = api.update_portfolio_profile(new_stock_entry) print(new_stock_entry) return new_stock_entry def style_number(vals): '''color negative number as red, positive as green Parameters ---------- vals: df columns the columns to be styled Returns ------- list the list of style ''' return ['color: red' if v < 0 else 'color: green' for v in vals] def create_share_changes_report(df): '''Create a markdown report of the share changes for certain date Parameters ---------- df: pd.DataFrame the dataframe of profile for a specific date Returns ------- markdown: str ''' date_str = df.date.to_list()[0].strftime('%Y-%m-%d %H:%M:%S') markdown = f"### {date_str}\n\n" markdown += 'Ticker | Display Name | Share Changes\n' markdown += '--- | --- | ---\n' for _, row in df.iterrows(): share_changes = row['share_changes'] # Apply green color to positive numbers and red color to negative numbers if share_changes > 0: share_changes_str = f'{share_changes}' elif share_changes < 0: share_changes_str = f'{share_changes}' else: share_changes_str = str(share_changes) markdown += '{} | {} | {}\n'.format(row['ticker'], row['display_name'], share_changes_str) return markdown