| |
|
| | |
| | import numpy as np |
| | from tensorflow.keras import regularizers |
| | import tensorflow as tf |
| | from tensorflow.keras.layers import ( |
| | Input, Conv1D, LSTM, Dense, Dropout, |
| | MultiHeadAttention, Reshape, Concatenate, |
| | TimeDistributed, GlobalAveragePooling1D, LayerNormalization |
| | ) |
| | from tensorflow.keras.models import Model |
| | from transformers import AutoTokenizer, TFAutoModelForSequenceClassification |
| | |
| | |
| |
|
| | |
| | |
| | WINDOW_SIZE_CONF = 60 |
| | NUM_ASSETS_CONF = 4 |
| | NUM_FEATURES_PER_ASSET_CONF = 26 |
| | |
| | L2_REG = 0.0001 |
| |
|
| | |
| | |
| |
|
| | |
| | |
| | |
| | |
| | |
| |
|
| | class AssetProcessor(tf.keras.Model): |
| | def __init__(self, sequence_length, num_features, cnn_filters1=32, cnn_filters2=64, lstm_units1=64, lstm_units2=32, dropout_rate=0.2, name="single_asset_processor_module", **kwargs): |
| | super(AssetProcessor, self).__init__(name=name, **kwargs) |
| | self.sequence_length = sequence_length |
| | self.num_features = num_features |
| | self.cnn_filters1 = cnn_filters1 |
| | self.cnn_filters2 = cnn_filters2 |
| | self.lstm_units1 = lstm_units1 |
| | self.lstm_units2 = lstm_units2 |
| | self.dropout_rate = dropout_rate |
| | |
| | self.conv1 = Conv1D(filters=cnn_filters1, kernel_size=3, activation='relu', padding='same', name="asset_cnn1") |
| | self.dropout_cnn1 = Dropout(dropout_rate, name="asset_cnn1_dropout") |
| | self.conv2 = Conv1D(filters=cnn_filters2, kernel_size=3, activation='relu', padding='same', name="asset_cnn2") |
| | self.dropout_cnn2 = Dropout(dropout_rate, name="asset_cnn2_dropout") |
| | self.lstm1 = LSTM(lstm_units1, return_sequences=True, name="asset_lstm1") |
| | self.dropout_lstm1 = Dropout(dropout_rate, name="asset_lstm1_dropout") |
| | self.lstm2 = LSTM(lstm_units2, return_sequences=False, name="asset_lstm2_final") |
| | self.dropout_lstm2 = Dropout(dropout_rate, name="asset_lstm2_dropout") |
| |
|
| | def call(self, inputs, training=False): |
| | x = self.conv1(inputs) |
| | x = self.dropout_cnn1(x, training=training) |
| | x = self.conv2(x) |
| | x = self.dropout_cnn2(x, training=training) |
| | x = self.lstm1(x, training=training) |
| | x = self.dropout_lstm1(x, training=training) |
| | x = self.lstm2(x, training=training) |
| | x_processed_asset = self.dropout_lstm2(x, training=training) |
| | return x_processed_asset |
| |
|
| | def get_config(self): |
| | config = super().get_config() |
| | config.update({ |
| | "sequence_length": self.sequence_length, |
| | "num_features": self.num_features, |
| | "cnn_filters1": self.cnn_filters1, |
| | "cnn_filters2": self.cnn_filters2, |
| | "lstm_units1": self.lstm_units1, |
| | "lstm_units2": self.lstm_units2, |
| | "dropout_rate": self.dropout_rate, |
| | }) |
| | return config |
| |
|
| | class DeepPortfolioAgentNetwork(tf.keras.Model): |
| | def __init__(self, |
| | num_assets=int(NUM_ASSETS_CONF), |
| | sequence_length=int(WINDOW_SIZE_CONF), |
| | num_features_per_asset=int(NUM_FEATURES_PER_ASSET_CONF), |
| | asset_cnn_filters1=32, asset_cnn_filters2=64, |
| | asset_lstm_units1=64, asset_lstm_units2=32, asset_dropout=0.2, |
| | mha_num_heads=4, mha_key_dim_divisor=2, |
| | final_dense_units1=128, final_dense_units2=64, final_dropout=0.3, |
| | use_sentiment_analysis=True, |
| | output_latent_features=False, **kwargs): |
| | super(DeepPortfolioAgentNetwork, self).__init__(name="deep_portfolio_agent_network", **kwargs) |
| | |
| |
|
| | print(f"DPN __init__ > num_assets ENTRADA: {num_assets}, tipo: {type(num_assets)}") |
| |
|
| | |
| | def get_int_value(param_name, val): |
| | if isinstance(val, (tf.Tensor, tf.Variable)): |
| | if val.shape == tf.TensorShape([]): |
| | print(f"DPN __init__: Convertendo {param_name} (Tensor/Variable escalar) para int.") |
| | return int(val.numpy()) |
| | else: |
| | raise ValueError(f"{param_name} é um Tensor/Variable mas não é escalar. Shape: {val.shape}") |
| | elif isinstance(val, dict): |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | print(f"DPN __init__: Tentando converter {param_name} (dict-like) para int.") |
| | try: |
| | return int(val) |
| | except TypeError: |
| | |
| | |
| | |
| | |
| | |
| | |
| | print(f"DPN __init__: Conversão direta de {param_name} (dict-like) para int falhou. Investigar TrackedDict.") |
| | |
| | |
| | |
| | |
| | raise TypeError(f"{param_name} é {type(val)} e não pôde ser convertido para int diretamente. Valor: {val}") |
| | else: |
| | return int(val) |
| |
|
| | try: |
| | self.num_assets = get_int_value("num_assets", num_assets) |
| | self.sequence_length = get_int_value("sequence_length", sequence_length) |
| | self.num_features_per_asset = get_int_value("num_features_per_asset", num_features_per_asset) |
| | self.asset_lstm_output_dim = get_int_value("asset_lstm_units2", asset_lstm_units2) |
| | |
| | |
| | |
| | |
| | |
| |
|
| | except Exception as e_conv: |
| | print(f"ERRO CRÍTICO DE CONVERSÃO DE TIPO no __init__ da DeepPortfolioAgentNetwork: {e_conv}") |
| | raise |
| |
|
| | print(f"DPN __init__ > self.num_assets APÓS conversão: {self.num_assets}, tipo: {type(self.num_assets)}") |
| | |
| |
|
| |
|
| |
|
| |
|
| |
|
| | self.num_assets = num_assets |
| | self.sequence_length = sequence_length |
| | self.num_features_per_asset = num_features_per_asset |
| | self.asset_lstm_output_dim = asset_lstm_units2 |
| |
|
| | self.asset_processor = AssetProcessor( |
| | sequence_length=self.sequence_length, num_features=self.num_features_per_asset, |
| | cnn_filters1=asset_cnn_filters1, cnn_filters2=asset_cnn_filters2, |
| | lstm_units1=asset_lstm_units1, lstm_units2=asset_lstm_units2, |
| | dropout_rate=asset_dropout |
| | ) |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | calculated_key_dim = self.asset_lstm_output_dim // mha_key_dim_divisor |
| | if calculated_key_dim == 0: |
| | calculated_key_dim = self.asset_lstm_output_dim |
| | print(f"AVISO: asset_lstm_output_dim ({self.asset_lstm_output_dim}) muito pequeno para mha_key_dim_divisor ({mha_key_dim_divisor}). Usando key_dim = {calculated_key_dim}") |
| |
|
| | self.attention = MultiHeadAttention(num_heads=mha_num_heads, key_dim=calculated_key_dim, dropout=0.1, name="multi_asset_attention") |
| | self.attention_norm = LayerNormalization(epsilon=1e-6, name="attention_layernorm") |
| | self.global_avg_pool_attention = GlobalAveragePooling1D(name="gap_after_attention") |
| |
|
| | self.use_sentiment = use_sentiment_analysis |
| | self.sentiment_embedding_size = 3 |
| | if self.use_sentiment: |
| | try: |
| | self.tokenizer = AutoTokenizer.from_pretrained('ProsusAI/finbert') |
| | self.sentiment_model = TFAutoModelForSequenceClassification.from_pretrained('ProsusAI/finbert', from_pt=True) |
| | print("Modelo FinBERT carregado para análise de sentimento.") |
| | except Exception as e: |
| | print(f"AVISO: Falha ao carregar FinBERT: {e}. Análise de sentimento será desabilitada.") |
| | self.use_sentiment = False |
| |
|
| | |
| | dense_input_dim = self.use_sentiment |
| | |
| | |
| | self.dense1 = Dense(final_dense_units1, activation='relu', kernel_regularizer=regularizers.l2(L2_REG), name="final_dense1") |
| | self.dropout1 = Dropout(final_dropout, name="final_dropout1") |
| | self.dense2 = Dense(final_dense_units2, activation='relu', kernel_regularizer=regularizers.l2(L2_REG), name="final_dense2") |
| | self.dropout2 = Dropout(final_dropout, name="final_dropout2") |
| | self.output_allocation = Dense(self.num_assets, activation='softmax', name="portfolio_allocation_output") |
| |
|
| | def call(self, inputs, training=False): |
| | market_data_flat = inputs |
| | |
| | print(type(self.num_assets)) |
| | asset_representations_list = [] |
| | for i in range(self.num_assets): |
| | start_idx = i * self.num_features_per_asset |
| | end_idx = (i + 1) * self.num_features_per_asset |
| | current_asset_data = market_data_flat[:, :, start_idx:end_idx] |
| | processed_asset_representation = self.asset_processor(current_asset_data, training=training) |
| | asset_representations_list.append(processed_asset_representation) |
| | |
| | stacked_asset_features = tf.stack(asset_representations_list, axis=1) |
| | |
| | |
| | |
| | attention_output = self.attention( |
| | query=stacked_asset_features, value=stacked_asset_features, key=stacked_asset_features, |
| | training=training |
| | ) |
| | attention_output = self.attention_norm(stacked_asset_features + attention_output) |
| | |
| | context_vector_from_attention = self.global_avg_pool_attention(attention_output) |
| | |
| | current_features_for_dense = context_vector_from_attention |
| | |
| |
|
| | x = self.dense1(current_features_for_dense) |
| | x = self.dropout1(x, training=training) |
| | x = self.dense2(x) |
| | x = self.dropout2(x, training=training) |
| | |
| | portfolio_weights = self.output_allocation(x) |
| | return portfolio_weights |
| |
|
| | def get_config(self): |
| | config = super().get_config() |
| | config.update({ |
| | "num_assets": self.num_assets, |
| | "sequence_length": self.sequence_length, |
| | "num_features_per_asset": self.num_features_per_asset, |
| | |
| | "asset_lstm_output_dim": self.asset_lstm_output_dim, |
| | |
| | }) |
| | return config |
| | |
| | |
| | |
| | |
| | |
| |
|
| |
|
| | if __name__ == '__main__': |
| | print("Testando o Forward Pass do DeepPortfolioAgentNetwork...") |
| |
|
| | |
| | batch_size_test = 2 |
| | seq_len_test = WINDOW_SIZE_CONF |
| | num_assets_test = NUM_ASSETS_CONF |
| | num_features_per_asset_test = NUM_FEATURES_PER_ASSET_CONF |
| | total_features_flat = num_assets_test * num_features_per_asset_test |
| |
|
| | print(f"Configuração do Teste:") |
| | print(f" Batch Size: {batch_size_test}") |
| | print(f" Sequence Length (Window): {seq_len_test}") |
| | print(f" Number of Assets: {num_assets_test}") |
| | print(f" Features per Asset: {num_features_per_asset_test}") |
| | print(f" Total Flat Features per Timestep: {total_features_flat}") |
| |
|
| | |
| | |
| | mock_market_data_flat = tf.random.normal( |
| | shape=(batch_size_test, seq_len_test, total_features_flat) |
| | ) |
| | print(f"Shape do Input Mockado (market_data_flat): {mock_market_data_flat.shape}") |
| |
|
| | |
| | |
| | print("\nInstanciando DeepPortfolioAgentNetwork...") |
| | agent_network = DeepPortfolioAgentNetwork( |
| | num_assets=num_assets_test, |
| | sequence_length=seq_len_test, |
| | num_features_per_asset=num_features_per_asset_test, |
| | |
| | asset_cnn_filters1=32, asset_cnn_filters2=64, |
| | asset_lstm_units1=64, asset_lstm_units2=32, |
| | asset_dropout=0.1, |
| | mha_num_heads=4, mha_key_dim_divisor=4, |
| | final_dense_units1=64, final_dense_units2=32, final_dropout=0.2, |
| | use_sentiment_analysis=False |
| | ) |
| |
|
| | |
| | |
| | |
| | print("\nConstruindo o modelo com input mockado (primeira chamada)...") |
| | try: |
| | |
| | |
| | _ = agent_network(mock_market_data_flat) |
| | print("\n--- Summary da Rede Principal (DeepPortfolioAgentNetwork) ---") |
| | agent_network.summary() |
| | |
| | |
| | |
| | |
| | print("\n--- Summary do AssetProcessor (Sub-Modelo) ---") |
| | agent_network.asset_processor.summary() |
| |
|
| |
|
| | except Exception as e: |
| | print(f"Erro ao construir a rede principal: {e}", exc_info=True) |
| | exit() |
| |
|
| | |
| | print("\nExecutando Forward Pass...") |
| | try: |
| | predictions = agent_network(mock_market_data_flat, training=False) |
| | print("Forward Pass concluído com sucesso!") |
| | except Exception as e: |
| | print(f"Erro durante o Forward Pass: {e}", exc_info=True) |
| | exit() |
| |
|
| | |
| | print(f"\nShape da Saída (predictions): {predictions.shape}") |
| | expected_output_shape = (batch_size_test, num_assets_test) |
| | if predictions.shape == expected_output_shape: |
| | print(f"Shape da Saída está CORRETO! Esperado: {expected_output_shape}") |
| | else: |
| | print(f"ERRO: Shape da Saída INCORRETO. Esperado: {expected_output_shape}, Obtido: {predictions.shape}") |
| |
|
| | |
| | if hasattr(predictions, 'numpy'): |
| | output_sum = tf.reduce_sum(predictions, axis=-1).numpy() |
| | print(f"Soma das probabilidades de saída por amostra no batch (deve ser próximo de 1): {output_sum}") |
| | if np.allclose(output_sum, 1.0): |
| | print("Saída Softmax parece CORRETA (soma 1).") |
| | else: |
| | print("AVISO: Saída Softmax pode NÃO estar correta (soma diferente de 1).") |
| | |
| | print("\nExemplo das primeiras predições (pesos do portfólio):") |
| | print(predictions.numpy()[:min(5, batch_size_test)]) |
| |
|
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| | |
| |
|
| | print("\nTeste do Forward Pass Concluído!") |
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|