File size: 6,431 Bytes
976166f
 
 
 
c8b5a38
 
976166f
c8b5a38
976166f
6eb9bc1
c121d97
 
 
 
 
 
 
c8b5a38
976166f
 
 
 
 
c8b5a38
 
976166f
 
6eb9bc1
c8b5a38
 
6eb9bc1
c8b5a38
 
 
 
6eb9bc1
c8b5a38
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
976166f
6eb9bc1
976166f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
c121d97
976166f
 
 
 
 
 
c121d97
976166f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b9d37d3
 
 
 
 
 
 
 
 
 
 
 
 
c121d97
b9d37d3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
import pytz
import datetime
import io
import pandas as pd
import table_schema as ts
from sqlalchemy import create_engine

db_url = 'sqlite:///instance/local.db'


def clip_df(start, end, df: pd.DataFrame, on='time'):
    '''
    return a copy of df between start and end date inclusive
    '''
    return df[df.time.between(start, end, inclusive='both')].copy()


def time_in_beijing(strip_time_zone=True):
    '''
    return current time in Beijing as datetime object
    '''
    tz = pytz.timezone('Asia/Shanghai')
    dt = datetime.datetime.now(tz)
    if strip_time_zone:
        dt = dt.replace(tzinfo=None)
    return dt


def add_details_to_stock_df(stock_df):
    '''return df adding sector, aggregate sector, display_name, name to it

    Parameters
    ----------
    stock_df: pd.DataFrame
        the dataframe contain ticker columns

    Returns
    -------
    merged_df: pd.DataFrame
        the dataframe with sector, aggregate sector, display_name and name added

    '''
    with create_engine(db_url).connect() as conn:
        detail_df = pd.read_sql(ts.STOCKS_DETAILS_TABLE, con=conn)
        merged_df = pd.merge(stock_df, detail_df[
            ['sector', 'name',
             'aggregate_sector',
             'display_name',
             'ticker']
        ], on='ticker', how='left')
        merged_df['aggregate_sector'].fillna('其他', inplace=True)
        return merged_df


def convert_string_to_datetime(date_string, time_zone="Asia/Shanghai"):
    '''
    Convert a string to a datetime object with the timezone by default,
    Shanghai
    '''
    dt = datetime.datetime.strptime(date_string, '%Y-%m-%d %H:%M:%S.%f')
    tz = pytz.timezone(time_zone)
    dt = tz.localize(dt)
    return dt


def create_stocks_entry_from_excel(byte_string):
    '''create stock entry from excel file
    Parameters
    ----------
    byte_string: bytes
        the byte string of the excel file
    Returns
    -------
    new_stock_entry: list
        [{ticker:str, shares:int, mean_price: float, date:datetime.datetime}]
        the list of stock entry
    '''
    uploaded_df = None
    with io.BytesIO(byte_string) as f:
        uploaded_df = pd.read_excel(f, index_col=None)

    # throw exception if doesn't have required columns
    if not set(['证券代码', '持仓数量', '平均建仓成本', 'time_stamp']).issubset(uploaded_df.columns):
        raise Exception('Missing required columns')
    # print(uploaded_df)
    # uploaded_df = pd.read_excel()
    # uploaded_df.drop(columns='Unnamed: 0', inplace=True)
    # Define the regular expression pattern to match the string endings
    pattern = r'\.(sz|sh)$'
    # Define the replacement strings for each match group
    replacements = {'.sz': '.XSHE', '.sh': '.XSHG'}
    # Use the str.replace method with the pattern and replacements
    uploaded_df['证券代码'] = uploaded_df['证券代码'].str.lower()
    uploaded_df['证券代码'] = uploaded_df['证券代码'].str.replace(
        pattern, lambda m: replacements[m.group()], regex=True)
    new_stock_entry = [
        dict(ticker=ticker, shares=shares, date=time, mean_price=mean_price)
        for ticker, shares, mean_price, time in zip(
            uploaded_df['证券代码'],
            uploaded_df['持仓数量'],
            uploaded_df['平均建仓成本'],
            pd.to_datetime(uploaded_df['time_stamp']))]
    # new_profile, error = api.update_portfolio_profile(new_stock_entry)
    print(new_stock_entry)
    return new_stock_entry


def style_number(vals):
    '''color negative number as red, positive as green
    Parameters
    ----------
    vals: df columns
        the columns to be styled
    Returns
    -------
    list
        the list of style

    '''
    return ['color: red' if v < 0 else 'color: green' for v in vals]


def create_share_changes_report(df):
    '''Create a markdown report of the share changes for certain date
    Parameters
    ----------
    df: pd.DataFrame
        the dataframe of profile for a specific date
    Returns
    -------
    markdown: str
    '''

    date_str = df.date.to_list()[0].strftime('%Y-%m-%d %H:%M:%S')
    markdown = f"### {date_str}\n\n"
    markdown += 'Ticker | Display Name | Share Changes\n'
    markdown += '--- | --- | ---\n'
    for _, row in df.iterrows():
        share_changes = row['share_changes']
        # Apply green color to positive numbers and red color to negative numbers
        if share_changes > 0:
            share_changes_str = f'<span style="color:green">{share_changes}</span>'
        elif share_changes < 0:
            share_changes_str = f'<span style="color:red">{share_changes}</span>'
        else:
            share_changes_str = str(share_changes)
        markdown += '{} | {} | {}\n'.format(row['ticker'],
                                            row['display_name'], share_changes_str)
    return markdown


def create_html_report(result: list[tuple]):
    '''
    a flex box with 2 flex item on each row where justified space-between

    Parameters
    ----------
    result: list of tuple
        (title, value, type)
        title: str, title to display
        value: any, value to display
        type: str, used to format value

    Returns
    -------
    html: str
    '''
    style = '''
<style>
.compact-container {{
    display: flex;
    flex-direction: column;
    gap: 5px;
}}

.compact-container > div {{
    display: flex;
    justify-content: space-between;
    margin-bottom: 2px;
}}

.compact-container > div > h2,
.compact-container > div > h3,
.compact-container > div > p,
.compact-container > div > ul > li {{
    margin: 0;
}}

.compact-container > ul {{
    padding: 0;
    margin: 0;
    list-style-type: none;
}}

.compact-container > ul > li {{
    display: flex;
    margin-bottom: 2px;
}}
</style>
'''

    def _get_color(num):
        return 'green' if num >= 0 else 'red'

    def _format_percentage_number(num):
        return f'{round(num * 100, 2)}%'

    def _create_flex_item(result_entry):
        key, value, value_type = result_entry
        return f"""
            <div>
            <p style="margin: 0;">{key}</p>
            <p style='color: {_get_color(value)}; margin: 0;'>{_format_percentage_number(value)}</p>
            </div>
        """
    html = f"""
        <div class="compact-container">
            {''.join([_create_flex_item(entry) for entry in result])}
        </div>
    """
    return style + html