Spaces:
Running
Running
import tensorflow as tf | |
from tensorflow.keras.layers import ( # type: ignore | |
Input, Dense, GRU, LSTM, Bidirectional, MultiHeadAttention, BatchNormalization, | |
Dropout, Concatenate, TimeDistributed, RepeatVector, Add, Lambda, LayerNormalization, GaussianNoise, Reshape | |
) | |
from tensorflow.keras.models import Model # type: ignore | |
from tensorflow.keras.regularizers import l2 # type: ignore | |
# 自定义 Transformer Encoder 层 | |
# 使用自定义层替代 Lambda 层 | |
class ExpandDimension(tf.keras.layers.Layer): | |
def call(self, inputs): | |
return tf.expand_dims(inputs, axis=1) | |
class ConcatenateTimesteps(tf.keras.layers.Layer): | |
def call(self, inputs): | |
return tf.concat(inputs, axis=1) | |
class TransformerEncoder(tf.keras.layers.Layer): | |
def __init__(self, num_heads, embed_dim, ff_dim, rate=0.1, **kwargs): | |
super(TransformerEncoder, self).__init__(**kwargs) | |
self.attention = MultiHeadAttention(num_heads=num_heads, key_dim=embed_dim) # 将 key_dim 设置为 embed_dim | |
self.ffn = tf.keras.Sequential( | |
[Dense(ff_dim, activation="relu"), Dense(embed_dim)] | |
) | |
self.layernorm1 = LayerNormalization(epsilon=1e-6) | |
self.layernorm2 = LayerNormalization(epsilon=1e-6) | |
self.dropout1 = Dropout(rate) | |
self.dropout2 = Dropout(rate) | |
def build(self, input_shape): | |
query_shape = input_shape # 输入形状为 (batch_size, seq_len, embed_dim) | |
key_shape = input_shape # 假定 key 和 query 形状一致 | |
value_shape = input_shape # 假定 value 和 key 形状一致 | |
# 调用 attention 的 build 方法 | |
self.attention.build(query_shape, value_shape) | |
# 构建 FFN 和归一化层 | |
self.ffn.build(input_shape) | |
self.layernorm1.build(input_shape) | |
self.layernorm2.build(input_shape) | |
self.built = True | |
def call(self, inputs, training): | |
attn_output, attn_weights = self.attention(inputs, inputs, return_attention_scores=True) | |
attn_output = self.dropout1(attn_output, training=training) | |
attn_output += tf.random.normal(tf.shape(attn_output), mean=0.0, stddev=0.01) # 加入噪声 | |
out1 = self.layernorm1(inputs + attn_output) | |
ffn_output = self.ffn(out1) | |
ffn_output = self.dropout2(ffn_output, training=training) | |
return self.layernorm2(out1 + ffn_output), attn_weights | |
def get_config(self): | |
config = super(TransformerEncoder, self).get_config() | |
config.update({ | |
"num_heads": self.attention.num_heads, | |
"embed_dim": self.attention.key_dim, | |
"ff_dim": self.ffn.layers[0].units, | |
"rate": self.dropout1.rate | |
}) | |
return config | |
def from_config(cls, config): | |
return cls(**config) | |
def build_model_1118(word2vec_embedding_dim, pos_tag_dim, entity_dim, time_series_input_shape): | |
import tensorflow as tf | |
from tensorflow.keras.layers import ( # type: ignore | |
Input, Dense, GRU, LSTM, Bidirectional, MultiHeadAttention, BatchNormalization, | |
Dropout, Concatenate, TimeDistributed, RepeatVector, Add, Lambda, LayerNormalization, GaussianNoise, Reshape | |
) | |
from tensorflow.keras.models import Model # type: ignore | |
from tensorflow.keras.regularizers import l2 # type: ignore | |
# 1. 文本特征处理 | |
text_input = Input(shape=(word2vec_embedding_dim,), name='text_input') | |
text_dense = Dense(256, activation='relu', kernel_regularizer=l2(0.01), name='text_dense')(text_input) | |
text_batch_norm = BatchNormalization(name='text_batch_norm')(text_dense) | |
text_output = Dropout(0.3, name='text_dropout')(text_batch_norm) | |
# 2. POS 特征处理 | |
pos_input = Input(shape=(pos_tag_dim,), name='pos_input') | |
pos_dense = Dense(64, activation='relu', kernel_regularizer=l2(0.01), name='pos_dense')(pos_input) | |
pos_batch_norm = BatchNormalization(name='pos_batch_norm')(pos_dense) | |
pos_output = Dropout(0.3, name='pos_dropout')(pos_batch_norm) | |
# 3. 命名实体识别特征处理 | |
entity_input = Input(shape=(entity_dim,), name='entity_input') | |
entity_dense = Dense(64, activation='relu', kernel_regularizer=l2(0.01), name='entity_dense')(entity_input) | |
entity_batch_norm = BatchNormalization(name='entity_batch_norm')(entity_dense) | |
entity_output = Dropout(0.3, name='entity_dropout')(entity_batch_norm) | |
# 4. 情感分析特征处理 | |
sentiment_input = Input(shape=(1,), name='sentiment_input') | |
sentiment_dense = Dense(256, activation='relu', kernel_regularizer=l2(0.01), name='sentiment_dense')(sentiment_input) | |
sentiment_batch_norm = BatchNormalization(name='sentiment_batch_norm')(sentiment_dense) | |
sentiment_output = Dropout(0.3, name='sentiment_dropout')(sentiment_batch_norm) | |
# 5. 时间序列特征处理(大盘数据) | |
def process_index(index_input, index_name, training): | |
# 第一个双向 LSTM 层,用于初步提取时间序列特征 | |
x = Bidirectional(LSTM(256, return_sequences=True), name=f'{index_name}_bidirectional_lstm_1')(index_input) | |
# 第二个双向 LSTM 层,进一步挖掘时间序列的深层特征 | |
x = Bidirectional(LSTM(128, return_sequences=True), name=f'{index_name}_bidirectional_lstm_2')(x) | |
# Transformer Encoder,用于捕捉全局的时间步间关系 | |
x, attn_weights = TransformerEncoder(num_heads=4, embed_dim=256, ff_dim=512)(x, training=training) | |
# 投影到一个固定维度 | |
x = Dense(128, activation='relu', name=f'{index_name}_project')(x) # 调整为 128 维 | |
# 批量归一化,防止梯度消失或爆炸 | |
x = BatchNormalization(name=f'{index_name}_batch_norm')(x) | |
# Dropout,防止过拟合 | |
x = Dropout(0.3, name=f'{index_name}_dropout')(x) | |
return x, attn_weights | |
index_inx_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_INX') | |
index_dj_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_DJ') | |
index_ixic_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_IXIC') | |
index_ndx_input = Input(shape=(30, time_series_input_shape[1]), name='index_us_stock_index_NDX') | |
index_inx_processed, _ = process_index(index_inx_input, 'index_inx', training=True) | |
index_dj_processed, _ = process_index(index_dj_input, 'index_dj', training=True) | |
index_ixic_processed, _ = process_index(index_ixic_input, 'index_ixic', training=True) | |
index_ndx_processed, _ = process_index(index_ndx_input, 'index_ndx', training=True) | |
# 6. 时间序列特征处理(个股数据) | |
stock_input = Input(shape=(30, time_series_input_shape[1]), name='stock_input') | |
stock_gru = Bidirectional(GRU(256, return_sequences=True), name='stock_bidirectional_gru')(stock_input) | |
stock_attention = MultiHeadAttention(num_heads=4, key_dim=64, name='stock_attention')(stock_gru, stock_gru) | |
stock_dense = Dense(128, activation='relu', name='stock_dense')(stock_attention) | |
stock_batch_norm = BatchNormalization(name='stock_batch_norm')(stock_dense) | |
stock_dropout = Dropout(0.3, name='stock_dropout')(stock_batch_norm) | |
stock_processed = stock_dropout | |
# 7. 静态特征融合 | |
static_features = Concatenate(name='static_features_concatenate')([ | |
text_output * 2, | |
pos_output, | |
entity_output, | |
sentiment_output * 2 | |
]) | |
# 8. 合并所有特征 | |
combined_features = Concatenate(name='combined_features')([ | |
index_inx_processed, | |
index_dj_processed, | |
index_ixic_processed, | |
index_ndx_processed, | |
stock_processed | |
]) | |
# 9. 静态特征扩展与时间序列结合 | |
static_features_expanded = RepeatVector(30, name='static_features_expanded')(static_features) | |
combined_with_static = Concatenate(name='combined_with_static')([ | |
combined_features, | |
static_features_expanded | |
]) | |
# 10. 解码器 | |
combined_dense = TimeDistributed(Dense(256, activation='relu', kernel_regularizer=l2(0.01)), name='combined_dense')(combined_with_static) | |
combined_dropout = Dropout(0.3, name='combined_dropout')(combined_dense) | |
decoder_gru = GRU(128, return_sequences=False, name='decoder_gru')(combined_dropout) | |
decoder_gru = Dropout(0.2)(decoder_gru) # Dropout | |
decoder_gru = GaussianNoise(0.02)(decoder_gru) # GaussianNois | |
# 独立预测未来 3 个时间步 | |
future_day_1 = Dense(128, activation='relu', name='future_day_1')(decoder_gru) | |
future_day_2 = Dense(128, activation='relu', name='future_day_2')(decoder_gru) | |
future_day_3 = Dense(128, activation='relu', name='future_day_3')(decoder_gru) | |
future_day_1_expanded = ExpandDimension(name='future_day_1_expanded')(future_day_1) | |
future_day_2_expanded = ExpandDimension(name='future_day_2_expanded')(future_day_2) | |
future_day_3_expanded = ExpandDimension(name='future_day_3_expanded')(future_day_3) | |
future_reshaped = ConcatenateTimesteps(name='future_reshaped')( | |
[future_day_1_expanded, future_day_2_expanded, future_day_3_expanded] | |
) | |
# **为每个指数设计独立的输出层** | |
def create_output_layer(input_tensor, name): | |
x = TimeDistributed(Dense(64, activation='relu'), name=f'{name}_dense1')(input_tensor) | |
x = TimeDistributed(Dense(32, activation='relu'), name=f'{name}_dense2')(x) | |
x = Dense(6, activation='linear', name=f'{name}_final_output')(x) | |
return x | |
index_inx_output_final = create_output_layer(future_reshaped, 'index_inx') | |
index_dj_output_final = create_output_layer(future_reshaped, 'index_dj') | |
index_ixic_output_final = create_output_layer(future_reshaped, 'index_ixic') | |
index_ndx_output_final = create_output_layer(future_reshaped, 'index_ndx') | |
stock_output_final = create_output_layer(future_reshaped, 'stock') | |
news_sentiment_loss = Dense(1, activation='linear', name='news_sentiment_output')(text_output) | |
# 构建模型 | |
model = Model( | |
inputs=[ | |
text_input, pos_input, entity_input, sentiment_input, | |
index_inx_input, index_dj_input, index_ixic_input, index_ndx_input, | |
stock_input | |
], | |
outputs=[ | |
index_inx_output_final, index_dj_output_final, index_ixic_output_final, | |
index_ndx_output_final, stock_output_final | |
] | |
) | |
# 优化器与学习率调度 | |
lr_schedule = tf.keras.optimizers.schedules.CosineDecay( | |
initial_learning_rate=0.0005, # 初始学习率降低 | |
decay_steps=10000, | |
alpha=0.1 | |
) | |
optimizer = tf.keras.optimizers.AdamW(learning_rate=lr_schedule, weight_decay=0.01) | |
model.compile(optimizer=optimizer, loss=tf.keras.losses.Huber(), metrics=[['mae', 'mse']] * 5) | |
return model |