|
|
|
"""liberal mind beta |
|
|
|
Automatically generated by Colab. |
|
|
|
Original file is located at |
|
https://colab.research.google.com/drive/1WZLrb1Gf63n6vXBvGFEWwIPt1BU_5xc9 |
|
|
|
# ***INSTALL LIBRARIES*** |
|
""" |
|
|
|
pip install stable_baselines3 |
|
|
|
pip install datasets |
|
|
|
pip install torch-geometric |
|
|
|
pip install gym==0.21.0 |
|
|
|
pip install shimmy |
|
|
|
"""# ***DOWNLOAD DATASETS***""" |
|
|
|
from datasets import load_dataset |
|
from huggingface_hub import list_datasets |
|
|
|
def find_and_load_dataset(keyword): |
|
|
|
matching_datasets = [ds.id for ds in list_datasets() if keyword.lower() in ds.id.lower()] |
|
|
|
|
|
if not matching_datasets: |
|
print("Нет датасетов, содержащих указанное ключевое слово.") |
|
return None |
|
|
|
|
|
dataset_name = matching_datasets[0] |
|
print(f"Найден датасет: {dataset_name}. Загружаем его...") |
|
|
|
|
|
dataset = load_dataset(dataset_name) |
|
return dataset |
|
|
|
|
|
keyword = input("Введите ключевое слово для поиска датасета: ") |
|
dataset = find_and_load_dataset(keyword) |
|
|
|
if dataset: |
|
print("Датасет загружен успешно!") |
|
print(dataset) |
|
|
|
import os |
|
import shutil |
|
from datasets import load_dataset |
|
|
|
|
|
dataset_name = "Unified-Language-Model-Alignment/Anthropic_HH_Golden" |
|
save_dir = "/content/dataset" |
|
|
|
|
|
dataset = load_dataset(dataset_name) |
|
|
|
|
|
os.makedirs(save_dir, exist_ok=True) |
|
|
|
|
|
for split_cache_files in dataset.cache_files.values(): |
|
for cache_file in split_cache_files: |
|
shutil.copy2(cache_file['filename'], save_dir) |
|
|
|
print(f"Датасет '{dataset_name}' сохранен в папке {save_dir}") |
|
|
|
"""# ***ENCODER TRANSFORMERS MATH AI***""" |
|
|
|
import numpy as np |
|
import tensorflow as tf |
|
import torch |
|
from torch import nn, optim |
|
from sklearn.ensemble import RandomForestClassifier |
|
from scipy.cluster.hierarchy import linkage, fcluster |
|
from sklearn.preprocessing import StandardScaler |
|
from stable_baselines3 import PPO |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
from stable_baselines3 import PPO |
|
from gym import Env |
|
from gym.spaces import Discrete, Box |
|
|
|
|
|
ppo_hyperparameters = { |
|
"n_steps": 1024, |
|
"batch_size": 64, |
|
"n_epochs": 1, |
|
"gamma": 0.99, |
|
"learning_rate": 3e-4, |
|
"clip_range": 0.2, |
|
"gae_lambda": 0.95, |
|
"vf_coef": 0.5, |
|
"ent_coef": 0.01, |
|
"max_grad_norm": 0.5, |
|
"target_kl": 0.03, |
|
"penalty_coef": 0.05, |
|
"epsilon": 0.15, |
|
"adv_norm": True, |
|
"weight_init": "xavier" |
|
} |
|
|
|
|
|
class ComplexEnv(Env): |
|
def __init__(self): |
|
super(ComplexEnv, self).__init__() |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32) |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
|
|
def reset(self): |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
return self.state |
|
|
|
def step(self, action): |
|
reward = -0.1 |
|
self.step_count += 1 |
|
|
|
if action == 0: |
|
reward += 1 if self.state[0] > 0 else -1 * ppo_hyperparameters["penalty_coef"] |
|
elif action == 1: |
|
reward += 0.5 if np.sum(self.state) > 0 else -2 * ppo_hyperparameters["penalty_coef"] |
|
else: |
|
reward += -0.5 if self.state[1] < 0 else 1 * ppo_hyperparameters["penalty_coef"] |
|
|
|
self.state += np.random.normal(0, 0.5, size=self.state.shape) |
|
|
|
done = self.step_count >= 100 |
|
return self.state, reward, done, {} |
|
|
|
|
|
env = ComplexEnv() |
|
|
|
|
|
class EpsilonPPO(PPO): |
|
def __init__(self, policy, env, **kwargs): |
|
super(EpsilonPPO, self).__init__(policy, env, **kwargs) |
|
self.epsilon = ppo_hyperparameters["epsilon"] |
|
|
|
|
|
for layer in self.policy.modules(): |
|
if isinstance(layer, (nn.Linear, nn.Conv2d)): |
|
if ppo_hyperparameters["weight_init"] == "xavier": |
|
nn.init.xavier_uniform_(layer.weight) |
|
elif ppo_hyperparameters["weight_init"] == "kaiming": |
|
nn.init.kaiming_uniform_(layer.weight) |
|
|
|
def _predict(self, observation, deterministic=False): |
|
if np.random.rand() < self.epsilon: |
|
return self.env.action_space.sample() |
|
else: |
|
return super().predict(observation, deterministic=deterministic) |
|
|
|
|
|
model = EpsilonPPO( |
|
policy="MlpPolicy", |
|
env=env, |
|
verbose=1, |
|
n_steps=ppo_hyperparameters["n_steps"], |
|
batch_size=ppo_hyperparameters["batch_size"], |
|
n_epochs=ppo_hyperparameters["n_epochs"], |
|
gamma=ppo_hyperparameters["gamma"], |
|
learning_rate=ppo_hyperparameters["learning_rate"], |
|
clip_range=ppo_hyperparameters["clip_range"], |
|
gae_lambda=ppo_hyperparameters["gae_lambda"], |
|
vf_coef=ppo_hyperparameters["vf_coef"], |
|
ent_coef=ppo_hyperparameters["ent_coef"], |
|
max_grad_norm=ppo_hyperparameters["max_grad_norm"], |
|
target_kl=ppo_hyperparameters["target_kl"] |
|
) |
|
|
|
|
|
model.learn(total_timesteps=50000) |
|
|
|
|
|
obs = env.reset() |
|
for _ in range(100): |
|
action, _ = model.predict(obs) |
|
obs, reward, done, _ = env.step(action) |
|
if done: |
|
obs = env.reset() |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
from sklearn.ensemble import RandomForestClassifier |
|
from sklearn.cluster import AgglomerativeClustering |
|
from scipy.spatial.distance import euclidean |
|
from sklearn.preprocessing import StandardScaler |
|
from stable_baselines3 import PPO |
|
from gym import Env |
|
from gym.spaces import Discrete, Box |
|
|
|
|
|
def classify_data(X, y): |
|
rf = RandomForestClassifier(n_estimators=100) |
|
rf.fit(X, y) |
|
feature_importances = rf.feature_importances_ |
|
return feature_importances, rf |
|
|
|
|
|
def cluster_data(X): |
|
clustering = AgglomerativeClustering(n_clusters=5, affinity='euclidean', linkage='ward') |
|
clusters = clustering.fit_predict(X) |
|
return clusters |
|
|
|
class LSTMModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size): |
|
super(LSTMModel, self).__init__() |
|
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True) |
|
self.fc = nn.Linear(hidden_size, output_size) |
|
|
|
def forward(self, x): |
|
_, (hn, _) = self.lstm(x) |
|
out = self.fc(hn[-1]) |
|
return out |
|
|
|
|
|
input_size = 5 |
|
hidden_size = 32 |
|
output_size = 3 |
|
lstm_agent = LSTMModel(input_size, hidden_size, output_size) |
|
|
|
class ComplexEnvWithLSTM(Env): |
|
def __init__(self): |
|
super(ComplexEnvWithLSTM, self).__init__() |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32) |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
|
|
def reset(self): |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
return self.state |
|
|
|
def step(self, action): |
|
reward = -0.1 |
|
self.step_count += 1 |
|
|
|
|
|
if action == 0: |
|
reward += 1 if self.state[0] > 0 else -0.5 |
|
elif action == 1: |
|
reward += 0.5 if np.sum(self.state) > 0 else -1 |
|
else: |
|
reward += -0.5 if self.state[1] < 0 else 1 |
|
|
|
|
|
self.state += np.random.normal(0, 0.5, size=self.state.shape) |
|
|
|
done = self.step_count >= 100 |
|
return self.state, reward, done, {} |
|
|
|
env = ComplexEnvWithLSTM() |
|
|
|
def classify_data(X, y): |
|
rf = RandomForestClassifier(n_estimators=100) |
|
rf.fit(X, y) |
|
feature_importances = rf.feature_importances_ |
|
return feature_importances, rf |
|
|
|
def cluster_data(X): |
|
clustering = AgglomerativeClustering(n_clusters=5, metric='euclidean', linkage='ward') |
|
clusters = clustering.fit_predict(X) |
|
return clusters |
|
class AdvancedLSTMModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True): |
|
super(AdvancedLSTMModel, self).__init__() |
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True) |
|
|
|
|
|
self.fc1 = nn.Linear(hidden_size, 1000) |
|
self.fc2 = nn.Linear(1000, 2000) |
|
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU() |
|
|
|
def forward(self, x): |
|
_, (hn, _) = self.lstm(x) |
|
x = self.fc1(hn[-1]) |
|
x = self.activation(x) |
|
x = self.fc2(x) |
|
return x |
|
|
|
|
|
input_size = 5 |
|
hidden_size = 256 |
|
output_size = 2000 |
|
num_layers = 3 |
|
dropout = 0.3 |
|
use_sigmoid = True |
|
lstm_agent = AdvancedLSTMModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid) |
|
class ComplexEnvWithLSTM(Env): |
|
def __init__(self): |
|
super(ComplexEnvWithLSTM, self).__init__() |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32) |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
|
|
def reset(self): |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
return self.state |
|
|
|
def step(self, action): |
|
reward = -0.1 |
|
self.step_count += 1 |
|
|
|
if action == 0: |
|
reward += 1 if self.state[0] > 0 else -0.5 |
|
elif action == 1: |
|
reward += 0.5 if np.sum(self.state) > 0 else -1 |
|
else: |
|
reward += -0.5 if self.state[1] < 0 else 1 |
|
|
|
|
|
self.state += np.random.normal(0, 0.5, size=self.state.shape) |
|
|
|
done = self.step_count >= 100 |
|
return self.state, reward, done, {} |
|
|
|
env = ComplexEnvWithLSTM() |
|
|
|
|
|
X = np.random.rand(100, 5) |
|
y = np.random.randint(0, 2, size=100) |
|
feature_importances, rf = classify_data(X, y) |
|
clusters = cluster_data(X) |
|
|
|
|
|
model = PPO( |
|
policy="MlpPolicy", |
|
env=env, |
|
verbose=1, |
|
learning_rate=5e-4, |
|
n_steps=512, |
|
batch_size=32, |
|
n_epochs=4, |
|
gamma=0.99, |
|
clip_range=0.2, |
|
gae_lambda=0.95, |
|
vf_coef=0.5, |
|
ent_coef=0.005, |
|
max_grad_norm=0.5, |
|
target_kl=0.03, |
|
) |
|
|
|
|
|
model.learn(total_timesteps=20000) |
|
|
|
|
|
obs = env.reset() |
|
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0) |
|
for _ in range(100): |
|
action_probs = lstm_agent(obs) |
|
action = action_probs.argmax().item() |
|
obs, reward, done, _ = env.step(action) |
|
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0) |
|
if done: |
|
obs = env.reset() |
|
|
|
"""# ***DECODER TRANSFORMERS MATH AI***""" |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from sklearn.linear_model import LinearRegression |
|
from stable_baselines3 import PPO |
|
from gym import Env |
|
from gym.spaces import Discrete, Box |
|
|
|
|
|
def distribute_outputs(X, y): |
|
lin_reg = LinearRegression() |
|
lin_reg.fit(X, y) |
|
distributed_outputs = lin_reg.predict(X) |
|
return distributed_outputs |
|
|
|
class LSTMDecoderModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True): |
|
super(LSTMDecoderModel, self).__init__() |
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True) |
|
|
|
|
|
self.fc1 = nn.Linear(hidden_size, 1000) |
|
self.fc2 = nn.Linear(1000, 2000) |
|
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU() |
|
|
|
def forward(self, x): |
|
_, (hn, _) = self.lstm(x) |
|
x = self.fc1(hn[-1]) |
|
x = self.activation(x) |
|
x = self.fc2(x) |
|
return x |
|
|
|
|
|
input_size = 5 |
|
hidden_size = 256 |
|
output_size = 2000 |
|
num_layers = 3 |
|
dropout = 0.3 |
|
use_sigmoid = True |
|
lstm_decoder = LSTMDecoderModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid) |
|
|
|
class ComplexEnvForDecoder(Env): |
|
def __init__(self): |
|
super(ComplexEnvForDecoder, self).__init__() |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32) |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
|
|
def reset(self): |
|
self.state = np.random.uniform(low=-1, high=1, size=(5,)) |
|
self.step_count = 0 |
|
return self.state |
|
|
|
def step(self, action): |
|
reward = -0.1 |
|
self.step_count += 1 |
|
|
|
if action == 0: |
|
reward += 1 if self.state[0] > 0 else -0.5 |
|
elif action == 1: |
|
reward += 0.5 if np.sum(self.state) > 0 else -1 |
|
else: |
|
reward += -0.5 if self.state[1] < 0 else 1 |
|
|
|
self.state += np.random.normal(0, 0.5, size=self.state.shape) |
|
done = self.step_count >= 100 |
|
return self.state, reward, done, {} |
|
|
|
env_decoder = ComplexEnvForDecoder() |
|
|
|
|
|
X = np.random.rand(100, 5) |
|
y = np.random.randint(0, 2, size=100) |
|
distributed_outputs = distribute_outputs(X, y) |
|
|
|
|
|
model_decoder = PPO( |
|
policy="MlpPolicy", |
|
env=env_decoder, |
|
verbose=1, |
|
learning_rate=5e-4, |
|
n_steps=512, |
|
batch_size=32, |
|
n_epochs=4, |
|
gamma=0.99, |
|
clip_range=0.2, |
|
gae_lambda=0.95, |
|
vf_coef=0.5, |
|
ent_coef=0.005, |
|
max_grad_norm=0.5, |
|
target_kl=0.03, |
|
) |
|
|
|
|
|
model_decoder.learn(total_timesteps=20000) |
|
|
|
|
|
obs = env_decoder.reset() |
|
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0) |
|
for _ in range(100): |
|
action_probs = lstm_decoder(obs) |
|
action = action_probs.argmax().item() |
|
obs, reward, done, _ = env_decoder.step(action) |
|
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0) |
|
if done: |
|
obs = env_decoder.reset() |
|
|
|
"""# ***TRANSFORMERS MATH AI***""" |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from sklearn.ensemble import RandomForestClassifier |
|
from sklearn.cluster import AgglomerativeClustering |
|
from sklearn.linear_model import LinearRegression |
|
from stable_baselines3 import PPO |
|
from gym import Env |
|
from gym.spaces import Discrete, Box |
|
|
|
class EncoderLSTMModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True): |
|
super(EncoderLSTMModel, self).__init__() |
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True) |
|
self.fc1 = nn.Linear(hidden_size, output_size) |
|
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU() |
|
|
|
def forward(self, x): |
|
_, (hn, _) = self.lstm(x) |
|
x = self.fc1(hn[-1]) |
|
return self.activation(x) |
|
|
|
class DecoderLSTMModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True): |
|
super(DecoderLSTMModel, self).__init__() |
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True) |
|
self.fc1 = nn.Linear(hidden_size, output_size) |
|
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU() |
|
|
|
def forward(self, x): |
|
_, (hn, _) = self.lstm(x) |
|
x = self.fc1(hn[-1]) |
|
return self.activation(x) |
|
|
|
class TransformerModule(nn.Module): |
|
def __init__(self, input_size, hidden_size, num_heads=2, num_layers=2): |
|
super(TransformerModule, self).__init__() |
|
self.transformer_layer = nn.Transformer(d_model=input_size, nhead=num_heads, num_encoder_layers=num_layers, num_decoder_layers=num_layers) |
|
self.fc = nn.Linear(input_size, hidden_size) |
|
|
|
def forward(self, src, tgt): |
|
|
|
output = self.transformer_layer(src, tgt) |
|
|
|
return self.fc(output) |
|
|
|
|
|
input_size = 32 |
|
hidden_size = 256 |
|
output_size = 2000 |
|
num_heads = 4 |
|
num_layers = 2 |
|
dropout = 0.3 |
|
use_sigmoid = True |
|
|
|
|
|
encoder = EncoderLSTMModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid) |
|
decoder = DecoderLSTMModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid) |
|
transformer = TransformerModule(input_size, hidden_size, num_heads=num_heads, num_layers=num_layers) |
|
|
|
import torch |
|
import torch.nn as nn |
|
|
|
|
|
class EncoderLSTMModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.0, use_sigmoid=False): |
|
super(EncoderLSTMModel, self).__init__() |
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True) |
|
self.fc = nn.Linear(hidden_size, input_size) |
|
self.use_sigmoid = use_sigmoid |
|
|
|
def forward(self, x): |
|
lstm_out, _ = self.lstm(x) |
|
output = self.fc(lstm_out[:, -1, :]) |
|
if self.use_sigmoid: |
|
output = torch.sigmoid(output) |
|
return output |
|
|
|
|
|
class DecoderLSTMModel(nn.Module): |
|
def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.0, use_sigmoid=False): |
|
super(DecoderLSTMModel, self).__init__() |
|
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True) |
|
self.fc = nn.Linear(hidden_size, input_size) |
|
self.use_sigmoid = use_sigmoid |
|
|
|
def forward(self, x): |
|
lstm_out, _ = self.lstm(x) |
|
output = self.fc(lstm_out[:, -1, :]) |
|
if self.use_sigmoid: |
|
output = torch.sigmoid(output) |
|
return output |
|
|
|
|
|
class TransformerModule(nn.Module): |
|
def __init__(self, input_size, num_heads=4, num_layers=2): |
|
super(TransformerModule, self).__init__() |
|
self.transformer_layer = nn.Transformer( |
|
d_model=input_size, |
|
nhead=num_heads, |
|
num_encoder_layers=num_layers, |
|
num_decoder_layers=num_layers |
|
) |
|
self.fc = nn.Linear(input_size, input_size) |
|
|
|
def forward(self, src, tgt): |
|
|
|
print(f"src shape before transformer: {src.shape}") |
|
print(f"tgt shape before transformer: {tgt.shape}") |
|
|
|
|
|
output = self.transformer_layer(src, tgt) |
|
|
|
|
|
return self.fc(output) |
|
|
|
|
|
class CombinedModel(nn.Module): |
|
def __init__(self, encoder, decoder, transformer): |
|
super(CombinedModel, self).__init__() |
|
self.encoder = encoder |
|
self.decoder = decoder |
|
self.transformer = transformer |
|
|
|
def forward(self, x): |
|
|
|
encoded = self.encoder(x) |
|
|
|
|
|
src = encoded.unsqueeze(1) |
|
tgt = torch.zeros_like(src) |
|
|
|
|
|
print(f"CombinedModel: src shape: {src.shape}, tgt shape: {tgt.shape}") |
|
|
|
|
|
transformed = self.transformer(src, tgt) |
|
|
|
|
|
output = self.decoder(transformed) |
|
return output |
|
|
|
|
|
input_size = 32 |
|
hidden_size = 256 |
|
num_heads = 4 |
|
num_layers = 2 |
|
dropout = 0.3 |
|
use_sigmoid = True |
|
|
|
|
|
encoder = EncoderLSTMModel(input_size, hidden_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid) |
|
decoder = DecoderLSTMModel(input_size, hidden_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid) |
|
transformer = TransformerModule(input_size, num_heads=num_heads, num_layers=num_layers) |
|
|
|
|
|
combined_model = CombinedModel(encoder, decoder, transformer) |
|
|
|
|
|
batch_size = 10 |
|
seq_length = 5 |
|
example_input = torch.randn((batch_size, seq_length, input_size)) |
|
output = combined_model(example_input) |
|
|
|
print("Output shape:", output.shape) |
|
|
|
"""# ***CREATION AI***""" |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import numpy as np |
|
from tqdm import tqdm |
|
|
|
|
|
num_steps = 1000 |
|
input_dim = 784 |
|
batch_size = 64 |
|
learning_rate = 1e-4 |
|
|
|
|
|
beta_start = 1e-4 |
|
beta_end = 0.02 |
|
beta = np.linspace(beta_start, beta_end, num_steps) |
|
alpha = 1 - beta |
|
alpha_cumprod = np.cumprod(alpha) |
|
|
|
|
|
class DiffusionModel(nn.Module): |
|
def init( |
|
self, input_dim, hidden_dim=512, output_dim=784, |
|
num_layers=3, activation_function="ReLU", batch_norm=False, layer_norm=False, |
|
use_skip_connections=True, dropout_rate=0.1, use_time_embedding=True, |
|
time_embedding_dim=16, noise_scaling_factor=0.1, optimizer="adam", |
|
learning_rate=1e-4, weight_decay=1e-5, gradient_clip_value=5.0, |
|
scheduler_step_size=50, scheduler_gamma=0.95, beta_schedule="linear", |
|
noise_type="gaussian", noise_seed=None, min_noise_std=0.1, max_noise_std=1.0, |
|
use_positional_encoding=False, positional_encoding_scale=1.0, |
|
max_training_epochs=100, min_learning_rate=1e-6, warmup_steps=500 |
|
): |
|
super(DiffusionModel, self).init() |
|
|
|
|
|
self.use_skip_connections = use_skip_connections |
|
self.use_time_embedding = use_time_embedding |
|
self.noise_scaling_factor = noise_scaling_factor |
|
self.time_embedding_dim = time_embedding_dim |
|
self.noise_type = noise_type |
|
self.min_noise_std = min_noise_std |
|
self.max_noise_std = max_noise_std |
|
self.noise_seed = noise_seed |
|
self.use_positional_encoding = use_positional_encoding |
|
self.positional_encoding_scale = positional_encoding_scale |
|
|
|
|
|
layers = [] |
|
in_dim = input_dim + (self.time_embedding_dim if use_time_embedding else 0) |
|
|
|
for _ in range(num_layers): |
|
layers.append(nn.Linear(in_dim, hidden_dim)) |
|
|
|
if batch_norm: |
|
layers.append(nn.BatchNorm1d(hidden_dim)) |
|
elif layer_norm: |
|
layers.append(nn.LayerNorm(hidden_dim)) |
|
|
|
if activation_function.lower() == "relu": |
|
layers.append(nn.ReLU()) |
|
elif activation_function.lower() == "leakyrelu": |
|
layers.append(nn.LeakyReLU()) |
|
elif activation_function.lower() == "tanh": |
|
layers.append(nn.Tanh()) |
|
|
|
if dropout_rate > 0: |
|
layers.append(nn.Dropout(dropout_rate)) |
|
|
|
in_dim = hidden_dim |
|
|
|
layers.append(nn.Linear(hidden_dim, output_dim)) |
|
self.network = nn.Sequential(*layers) |
|
|
|
def forward(self, x, t): |
|
if self.use_time_embedding: |
|
t_embedding = torch.sin(t.float() * 2 * np.pi / num_steps).unsqueeze(-1) |
|
t_embedding = t_embedding * self.time_embedding_dim |
|
x = torch.cat([x, t_embedding], dim=1) |
|
|
|
return self.network(x) |
|
|
|
|
|
|
|
def add_noise(x, t, noise_type="gaussian", min_noise_std=0.1, max_noise_std=1.0, |
|
noise_seed=None, noise_scaling_factor=0.1, beta_schedule="linear"): |
|
|
|
if noise_seed is not None: |
|
torch.manual_seed(noise_seed) |
|
|
|
noise_std = min_noise_std + t * (max_noise_std - min_noise_std) / num_steps |
|
noise = torch.randn_like(x) * noise_std if noise_type == "gaussian" else torch.rand_like(x) * noise_std |
|
|
|
if beta_schedule == "linear": |
|
alpha_t = alpha_cumprod[t].view(-1, 1) |
|
elif beta_schedule == "cosine": |
|
alpha_t = torch.cos(t * np.pi / num_steps).view(-1, 1) |
|
|
|
noisy_x = torch.sqrt(alpha_t) * x + torch.sqrt(1 - alpha_t) * noise * noise_scaling_factor |
|
return noisy_x |
|
|
|
"""# ***DIFFUSION TRANSFORMERS***""" |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import numpy as np |
|
from tqdm import tqdm |
|
|
|
|
|
num_steps = 1000 |
|
input_dim = 784 |
|
batch_size = 64 |
|
learning_rate = 1e-4 |
|
|
|
|
|
beta_start = 1e-4 |
|
beta_end = 0.02 |
|
beta = np.linspace(beta_start, beta_end, num_steps) |
|
alpha = 1 - beta |
|
alpha_cumprod = np.cumprod(alpha) |
|
|
|
|
|
alpha_cumprod_tensor = torch.tensor(alpha_cumprod, dtype=torch.float32) |
|
|
|
|
|
class DiffusionEncoder(nn.Module): |
|
def __init__(self): |
|
super(DiffusionEncoder, self).__init__() |
|
self.network = nn.Sequential( |
|
nn.Linear(input_dim + 1, 512), |
|
nn.ReLU(), |
|
nn.Linear(512, 512), |
|
nn.ReLU(), |
|
nn.Linear(512, input_dim) |
|
) |
|
|
|
def forward(self, x, t): |
|
t_embedding = torch.sin(t.float() * 2 * np.pi / num_steps).unsqueeze(-1) |
|
x = torch.cat([x, t_embedding], dim=1) |
|
return self.network(x) |
|
|
|
|
|
class FullyConnectedDecoder(nn.Module): |
|
def __init__(self): |
|
super(FullyConnectedDecoder, self).__init__() |
|
self.network = nn.Sequential( |
|
nn.Linear(input_dim, 512), |
|
nn.ReLU(), |
|
nn.Linear(512, 512), |
|
nn.ReLU(), |
|
nn.Linear(512, input_dim) |
|
) |
|
|
|
def forward(self, x): |
|
return self.network(x) |
|
|
|
|
|
def add_noise(x, t): |
|
noise = torch.randn_like(x) |
|
alpha_t = alpha_cumprod_tensor[t].view(-1, 1) |
|
noisy_x = torch.sqrt(alpha_t) * x + torch.sqrt(1 - alpha_t) * noise |
|
return noisy_x, noise |
|
|
|
|
|
encoder = DiffusionEncoder() |
|
decoder = FullyConnectedDecoder() |
|
optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=learning_rate) |
|
|
|
|
|
for epoch in range(2): |
|
for _ in tqdm(range(1000)): |
|
|
|
x = torch.randn(batch_size, input_dim) |
|
t = torch.randint(0, num_steps, (batch_size,)) |
|
|
|
|
|
noisy_x, noise = add_noise(x, t) |
|
|
|
|
|
encoded = encoder(noisy_x, t) |
|
|
|
|
|
decoded = decoder(encoded) |
|
|
|
|
|
loss = nn.MSELoss()(decoded, x) |
|
|
|
|
|
optimizer.zero_grad() |
|
loss.backward() |
|
optimizer.step() |
|
|
|
print(f'Epoch {epoch + 1}, Loss: {loss.item():.4f}') |
|
|
|
"""# ***SELF-AWARNESS AI***""" |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import torch.nn.functional as F |
|
from tqdm import tqdm |
|
|
|
|
|
input_size = 10 |
|
hidden_size = 20 |
|
num_layers = 2 |
|
seq_len = 5 |
|
batch_size = 1 |
|
num_epochs = 5 |
|
learning_rate = 1e-4 |
|
|
|
|
|
gnn_params = { |
|
'input_dim': input_size, |
|
'hidden_dim': 32, |
|
'hidden_dim_2': 64, |
|
'output_dim': input_size, |
|
'activation_function': 'ReLU', |
|
'dropout_rate': 0.2, |
|
'batch_norm': True, |
|
} |
|
|
|
|
|
lstm_params = { |
|
'input_size': input_size, |
|
'hidden_size': hidden_size, |
|
'num_layers': num_layers, |
|
'dropout': 0.2, |
|
'bidirectional': False, |
|
'activation_function': 'Tanh', |
|
} |
|
|
|
|
|
class GraphNeuralNetwork(nn.Module): |
|
def __init__(self, params): |
|
super(GraphNeuralNetwork, self).__init__() |
|
self.fc1 = nn.Linear(params['input_dim'], params['hidden_dim']) |
|
self.fc2 = nn.Linear(params['hidden_dim'], params['hidden_dim_2']) |
|
self.fc3 = nn.Linear(params['hidden_dim_2'], params['output_dim']) |
|
self.dropout = nn.Dropout(params['dropout_rate']) |
|
|
|
def forward(self, x): |
|
x = F.relu(self.fc1(x)) |
|
x = self.dropout(x) |
|
x = F.relu(self.fc2(x)) |
|
x = self.dropout(x) |
|
return self.fc3(x) |
|
|
|
|
|
class LSTMPredictor(nn.Module): |
|
def __init__(self, params): |
|
super(LSTMPredictor, self).__init__() |
|
self.lstm = nn.LSTM(params['input_size'], params['hidden_size'], params['num_layers'], |
|
batch_first=True, dropout=params['dropout']) |
|
self.fc = nn.Linear(params['hidden_size'], 1) |
|
|
|
def forward(self, x): |
|
lstm_out, _ = self.lstm(x) |
|
last_hidden = lstm_out[:, -1, :] |
|
return self.fc(last_hidden) |
|
|
|
|
|
class EmotionAwarenessModel(nn.Module): |
|
def __init__(self, gnn_params, lstm_params): |
|
super(EmotionAwarenessModel, self).__init__() |
|
self.gnn = GraphNeuralNetwork(gnn_params) |
|
self.lstm = LSTMPredictor(lstm_params) |
|
|
|
def forward(self, x): |
|
gnn_out = self.gnn(x) |
|
gnn_out = gnn_out.unsqueeze(1) |
|
return self.lstm(gnn_out) |
|
|
|
|
|
model = EmotionAwarenessModel(gnn_params, lstm_params) |
|
optimizer = optim.Adam(model.parameters(), lr=learning_rate) |
|
|
|
|
|
x = torch.randn(batch_size, input_size) |
|
target = torch.randn(batch_size, 1) |
|
|
|
|
|
for epoch in range(num_epochs): |
|
model.train() |
|
optimizer.zero_grad() |
|
|
|
|
|
output = model(x) |
|
|
|
|
|
loss = F.mse_loss(output, target) |
|
loss.backward() |
|
|
|
|
|
optimizer.step() |
|
|
|
print(f'Epoch {epoch + 1}, Loss: {loss.item():.4f}') |
|
|
|
"""# ***NEW METHOD MACHINE LEARNING***""" |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch_geometric.nn import GCNConv |
|
from torch_geometric.data import Data |
|
import torch.nn.functional as F |
|
|
|
|
|
class LinearRegressionModel: |
|
def init(self): |
|
self.model = LinearRegression() |
|
|
|
def fit(self, X, y): |
|
self.model.fit(X, y) |
|
|
|
def predict(self, X): |
|
return self.model.predict(X) |
|
|
|
|
|
class GraphNeuralNetwork(nn.Module): |
|
def init(self, in_channels, out_channels): |
|
super(GraphNeuralNetwork, self).init() |
|
self.conv1 = GCNConv(in_channels, 16) |
|
self.conv2 = GCNConv(16, out_channels) |
|
|
|
def forward(self, data): |
|
x, edge_index = data.x, data.edge_index |
|
x = self.conv1(x, edge_index) |
|
x = F.relu(x) |
|
x = self.conv2(x, edge_index) |
|
return x |
|
|
|
|
|
class QLearningWithPPO: |
|
def init(self, state_dim, action_dim, lr=0.001, gamma=0.99): |
|
self.q_net = nn.Linear(state_dim, action_dim) |
|
self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr) |
|
self.gamma = gamma |
|
self.eps_clip = 0.2 |
|
|
|
def get_action(self, state): |
|
q_values = self.q_net(state) |
|
action = torch.argmax(q_values).item() |
|
return action |
|
|
|
def update(self, state, action, reward, next_state): |
|
q_values = self.q_net(state) |
|
q_next = self.q_net(next_state).detach() |
|
|
|
target = reward + self.gamma * torch.max(q_next) |
|
loss = F.mse_loss(q_values[action], target) |
|
|
|
self.optimizer.zero_grad() |
|
loss.backward() |
|
self.optimizer.step() |
|
|
|
|
|
def logic_learning_with_q_ppo(dataset, num_alternatives=10): |
|
lin_model = LinearRegressionModel() |
|
X, y = dataset[:, :-1], dataset[:, -1] |
|
lin_model.fit(X, y) |
|
base_solution = lin_model.predict(X) |
|
|
|
gnn_model = GraphNeuralNetwork(in_channels=1, out_channels=1) |
|
optimizer_gnn = optim.Adam(gnn_model.parameters(), lr=0.01) |
|
q_ppo = QLearningWithPPO(state_dim=1, action_dim=num_alternatives) |
|
|
|
edge_index = torch.tensor([[0, 1], [1, 0]], dtype=torch.long) |
|
x = torch.tensor(base_solution, dtype=torch.float).view(-1, 1) |
|
data = Data(x=x, edge_index=edge_index) |
|
|
|
for i in range(num_alternatives): |
|
state = torch.tensor([i], dtype=torch.float) |
|
action = q_ppo.get_action(state) |
|
|
|
|
|
gnn_model.train() |
|
for epoch in range(50): |
|
optimizer_gnn.zero_grad() |
|
out = gnn_model(data) |
|
loss = F.mse_loss(out, x) |
|
loss.backward() |
|
optimizer_gnn.step() |
|
|
|
solution = out.detach().numpy().flatten() |
|
reward = -np.abs(base_solution - solution).sum() |
|
|
|
next_state = torch.tensor([i + 1], dtype=torch.float) |
|
q_ppo.update(state, action, reward, next_state) |
|
|
|
print(f"Alternative solution {i+1}: reward = {reward}") |
|
|
|
return solution |
|
|
|
"""# ***COMBINED MODELS LIBERALMIND***""" |
|
|
|
import numpy as np |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch_geometric.nn import GCNConv |
|
from torch_geometric.data import Data |
|
import torch.nn.functional as F |
|
from sklearn.linear_model import LinearRegression |
|
|
|
|
|
class LinearRegressionModel: |
|
def __init__(self, params): |
|
self.params = params |
|
self.model = LinearRegression( |
|
fit_intercept=params['fit_intercept'], |
|
copy_X=params['copy_X'], |
|
n_jobs=params['n_jobs'] |
|
) |
|
|
|
def fit(self, X, y): |
|
if self.params.get('normalize', False): |
|
X = (X - np.mean(X, axis=0)) / np.std(X, axis=0) |
|
self.model.fit(X, y) |
|
|
|
def predict(self, X): |
|
return self.model.predict(X) |
|
|
|
|
|
class GraphNeuralNetwork(nn.Module): |
|
def __init__(self, params): |
|
super(GraphNeuralNetwork, self).__init__() |
|
self.conv1 = GCNConv(params['in_channels'], params['hidden_dim1']) |
|
self.conv2 = GCNConv(params['hidden_dim1'], params['hidden_dim2']) |
|
self.conv3 = GCNConv(params['hidden_dim2'], params['out_channels']) |
|
self.dropout = params['dropout'] |
|
|
|
def forward(self, data): |
|
x, edge_index = data.x, data.edge_index |
|
x = self.conv1(x, edge_index) |
|
x = F.relu(x) |
|
x = F.dropout(x, p=self.dropout, training=self.training) |
|
x = self.conv2(x, edge_index) |
|
x = F.relu(x) |
|
x = F.dropout(x, p=self.dropout, training=self.training) |
|
x = self.conv3(x, edge_index) |
|
return x |
|
|
|
|
|
class QLearningWithPPO: |
|
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99): |
|
self.q_net = nn.Linear(state_dim, action_dim) |
|
self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr) |
|
self.gamma = gamma |
|
|
|
def get_action(self, state): |
|
q_values = self.q_net(state) |
|
action = torch.argmax(q_values).item() |
|
return action |
|
|
|
def update(self, state, action, reward, next_state): |
|
q_values = self.q_net(state) |
|
q_next = self.q_net(next_state).detach() |
|
|
|
target = reward + self.gamma * torch.max(q_next) |
|
loss = F.mse_loss(q_values[action], target) |
|
|
|
self.optimizer.zero_grad() |
|
loss.backward() |
|
self.optimizer.step() |
|
|
|
|
|
class CreativeNet(nn.Module): |
|
def __init__(self): |
|
super(CreativeNet, self).__init__() |
|
self.fc = nn.Linear(10, 20) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
class LogicNet(nn.Module): |
|
def __init__(self): |
|
super(LogicNet, self).__init__() |
|
self.fc = nn.Linear(20, 30) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
class MathNet(nn.Module): |
|
def __init__(self): |
|
super(MathNet, self).__init__() |
|
self.fc = nn.Linear(30, 40) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
|
|
class CombinedModel(nn.Module): |
|
def __init__(self, gnn_params): |
|
super(CombinedModel, self).__init__() |
|
self.creative_net = CreativeNet() |
|
self.logic_net = LogicNet() |
|
self.math_net = MathNet() |
|
self.gnn_model = GraphNeuralNetwork(gnn_params) |
|
|
|
def forward(self, x, data): |
|
|
|
x = self.creative_net(x) |
|
x = self.logic_net(x) |
|
x = self.math_net(x) |
|
|
|
|
|
gnn_output = self.gnn_model(data) |
|
|
|
return gnn_output |
|
|
|
|
|
def logic_learning_with_q_ppo(dataset, num_alternatives=10): |
|
|
|
lin_params = { |
|
'fit_intercept': True, |
|
'copy_X': True, |
|
'n_jobs': -1, |
|
'normalize': False, |
|
} |
|
|
|
|
|
gnn_params = { |
|
'in_channels': 1, |
|
'out_channels': 1, |
|
'hidden_dim1': 16, |
|
'hidden_dim2': 32, |
|
'dropout': 0.2, |
|
} |
|
|
|
|
|
lin_model = LinearRegressionModel(lin_params) |
|
X, y = dataset[:, :-1], dataset[:, -1] |
|
lin_model.fit(X, y) |
|
base_solution = lin_model.predict(X) |
|
|
|
q_ppo = QLearningWithPPO(state_dim=1, action_dim=num_alternatives) |
|
|
|
|
|
edge_index = torch.tensor([[0, 1], [1, 0]], dtype=torch.long) |
|
x = torch.tensor(base_solution, dtype=torch.float).view(-1, 1) |
|
data = Data(x=x, edge_index=edge_index) |
|
|
|
combined_model = CombinedModel(gnn_params) |
|
|
|
for i in range(num_alternatives): |
|
state = torch.tensor([i], dtype=torch.float) |
|
action = q_ppo.get_action(state) |
|
|
|
|
|
combined_model.train() |
|
optimizer_gnn = optim.Adam(combined_model.parameters(), lr=0.01) |
|
for epoch in range(50): |
|
optimizer_gnn.zero_grad() |
|
out = combined_model(torch.tensor(X, dtype=torch.float), data) |
|
loss = F.mse_loss(out, x) |
|
loss.backward() |
|
optimizer_gnn.step() |
|
|
|
solution = out.detach().numpy().flatten() |
|
reward = -np.abs(base_solution - solution).sum() |
|
|
|
next_state = torch.tensor([i + 1], dtype=torch.float) |
|
q_ppo.update(state, action, reward, next_state) |
|
|
|
print(f"Alternative solution {i + 1}: reward = {reward}") |
|
|
|
return solution |
|
|
|
|
|
dataset = np.random.rand(100, 11) |
|
solutions = logic_learning_with_q_ppo(dataset) |
|
|
|
|
|
combined_model = CombinedModel({ |
|
'in_channels': 1, |
|
'out_channels': 1, |
|
'hidden_dim1': 16, |
|
'hidden_dim2': 32, |
|
'dropout': 0.2, |
|
}) |
|
|
|
input_data = torch.randn(1, 10) |
|
dummy_data = Data(x=input_data.view(-1, 1), edge_index=torch.tensor([[0, 1], [1, 0]], dtype=torch.long)) |
|
output = combined_model(input_data, dummy_data) |
|
print("Output:", output) |
|
|
|
import random |
|
|
|
class QLearningWithPPO: |
|
def init(self, state_dim, action_dim, lr=0.001, gamma=0.99, epsilon=0.1): |
|
self.q_net = nn.Linear(state_dim, action_dim) |
|
self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr) |
|
self.gamma = gamma |
|
self.epsilon = epsilon |
|
self.memory = [] |
|
|
|
def get_action(self, state): |
|
if random.random() < self.epsilon: |
|
return random.randint(0, self.q_net.out_features - 1) |
|
q_values = self.q_net(state) |
|
action = torch.argmax(q_values).item() |
|
return action |
|
|
|
def update(self, batch_size): |
|
if len(self.memory) < batch_size: |
|
return |
|
|
|
|
|
experiences = random.sample(self.memory, batch_size) |
|
states, actions, rewards, next_states = zip(*experiences) |
|
|
|
states = torch.stack(states) |
|
actions = torch.tensor(actions) |
|
rewards = torch.tensor(rewards) |
|
next_states = torch.stack(next_states) |
|
|
|
q_values = self.q_net(states) |
|
q_next = self.q_net(next_states).detach() |
|
|
|
target = rewards + self.gamma * torch.max(q_next, dim=1)[0] |
|
loss = F.mse_loss(q_values.gather(1, actions.unsqueeze(1)), target.unsqueeze(1)) |
|
|
|
self.optimizer.zero_grad() |
|
loss.backward() |
|
self.optimizer.step() |
|
|
|
def store_experience(self, state, action, reward, next_state): |
|
self.memory.append((state, action, reward, next_state)) |
|
|
|
|
|
def logic_learning_with_q_ppo(dataset, num_epochs=100, batch_size=32, num_alternatives=10): |
|
|
|
lin_params = { |
|
'fit_intercept': True, |
|
'copy_X': True, |
|
'n_jobs': -1, |
|
'normalize': False, |
|
} |
|
|
|
|
|
lin_reg_model = LinearRegressionModel() |
|
lin_reg_model.init(lin_params) |
|
|
|
|
|
|
|
|
|
gnn_params = { |
|
'in_channels': data.num_node_features, |
|
'hidden_dim1': 16, |
|
'hidden_dim2': 8, |
|
'out_channels': 4, |
|
'dropout': 0.5, |
|
} |
|
gnn_model = GraphNeuralNetwork(gnn_params) |
|
q_ppo = QLearningWithPPO(state_dim=10, action_dim=4) |
|
|
|
|
|
for epoch in range(num_epochs): |
|
state = torch.tensor(X, dtype=torch.float32) |
|
action = q_ppo.get_action(state) |
|
reward = random.random() |
|
next_state = state |
|
|
|
|
|
q_ppo.store_experience(state, action, reward, next_state) |
|
|
|
|
|
if (epoch + 1) % batch_size == 0: |
|
q_ppo.update(batch_size) |
|
|
|
|
|
if (epoch + 1) % 10 == 0: |
|
print(f"Эпоха {epoch + 1}/{num_epochs}, Вознаграждение: {reward:.4f}") |
|
|
|
|
|
print("Предсказания линейной регрессии:", y_pred) |
|
print("Выход графовой нейросети:", gnn_model(data)) |
|
|
|
"""# ***GENERATIVE MODEL***""" |
|
|
|
import torch |
|
import torch.nn as nn |
|
from stable_baselines3 import PPO |
|
from gym import Env |
|
from gym.spaces import Box, Discrete |
|
from datasets import load_dataset |
|
|
|
|
|
class CreativeNet(nn.Module): |
|
def init(self): |
|
super(CreativeNet, self).init() |
|
self.fc = nn.Linear(32, 64) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
|
|
class LogicNet(nn.Module): |
|
def init(self): |
|
super(LogicNet, self).init() |
|
self.fc = nn.Linear(64, 128) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
|
|
class MathNet(nn.Module): |
|
def init(self): |
|
super(MathNet, self).init() |
|
self.fc = nn.Linear(128, 32) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
|
|
class CombinedModel(nn.Module): |
|
def init(self): |
|
super(CombinedModel, self).init() |
|
self.creative_net = CreativeNet() |
|
self.logic_net = LogicNet() |
|
self.math_net = MathNet() |
|
|
|
def forward(self, x): |
|
x = self.creative_net(x) |
|
x = self.logic_net(x) |
|
x = self.math_net(x) |
|
return x |
|
|
|
|
|
class CustomEnv(Env): |
|
def init(self, model, dataset): |
|
super(CustomEnv, self).init() |
|
self.model = model |
|
self.dataset = dataset |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box(low=0, high=1, shape=(32,), dtype=torch.float32) |
|
self.current_index = 0 |
|
|
|
def reset(self): |
|
self.current_index = 0 |
|
return self._get_observation() |
|
|
|
def step(self, action): |
|
observation = self._get_observation() |
|
input_tensor = torch.tensor(observation, dtype=torch.float32).unsqueeze(0) |
|
model_output = self.model(input_tensor).squeeze(0).detach().numpy() |
|
|
|
|
|
reward = -1 if action != model_output.argmax() else 1 |
|
self.current_index += 1 |
|
done = self.current_index >= len(self.dataset) |
|
return observation, reward, done, {} |
|
|
|
def _get_observation(self): |
|
return torch.tensor(self.dataset[self.current_index], dtype=torch.float32).numpy() |
|
|
|
|
|
|
|
keyword = "/content/dataset/anthropic_hh_golden-test.arrow" |
|
dataset = load_dataset(keyword)["train"][:10] |
|
input_size = len(dataset[0]) |
|
|
|
|
|
model = CombinedModel() |
|
env = CustomEnv(model, dataset) |
|
|
|
|
|
ppo_model = PPO("MlpPolicy", env, verbose=1) |
|
ppo_model.learn(total_timesteps=10000) |
|
|
|
|
|
def generate_response(model, input_data): |
|
input_tensor = torch.tensor(input_data, dtype=torch.float32).unsqueeze(0) |
|
output = model(input_tensor).squeeze(0).detach().numpy() |
|
return output.argmax() |
|
|
|
|
|
example_input = torch.rand(32).numpy() |
|
generated_response = generate_response(model, example_input) |
|
print("Generated response:", generated_response) |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
from torch_geometric.data import Data |
|
from datasets import load_dataset |
|
from sklearn.preprocessing import StandardScaler |
|
import numpy as np |
|
from tqdm import tqdm |
|
|
|
|
|
class CombinedModel(nn.Module): |
|
def init(self, input_size, hidden_size, output_size): |
|
super(CombinedModel, self).init() |
|
|
|
self.encoder = nn.LSTM(input_size, hidden_size, batch_first=True) |
|
|
|
self.gnn_fc1 = nn.Linear(hidden_size, 64) |
|
self.gnn_fc2 = nn.Linear(64, hidden_size) |
|
|
|
self.decoder = nn.LSTM(hidden_size, output_size, batch_first=True) |
|
|
|
def forward(self, x, edge_index=None): |
|
|
|
x, _ = self.encoder(x) |
|
x = x[:, -1, :] |
|
|
|
x = torch.relu(self.gnn_fc1(x)) |
|
x = torch.relu(self.gnn_fc2(x)) |
|
|
|
x = x.unsqueeze(1).repeat(1, 10, 1) |
|
x, _ = self.decoder(x) |
|
return x |
|
|
|
|
|
def preprocess_dataset(dataset_name): |
|
dataset = load_dataset(dataset_name) |
|
|
|
if 'train' in dataset: |
|
data = np.array(dataset['train']) |
|
else: |
|
data = np.array(dataset['data']) |
|
|
|
scaler = StandardScaler() |
|
scaled_data = scaler.fit_transform(data) |
|
return torch.tensor(scaled_data, dtype=torch.float32) |
|
|
|
|
|
def train_model(model, dataset, epochs=10, batch_size=32, learning_rate=1e-4): |
|
optimizer = optim.Adam(model.parameters(), lr=learning_rate) |
|
loss_fn = nn.MSELoss() |
|
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True) |
|
|
|
for epoch in range(epochs): |
|
total_loss = 0 |
|
for batch in tqdm(dataloader, desc=f"Epoch {epoch + 1}/{epochs}"): |
|
inputs = batch[:, :-1].unsqueeze(1) |
|
targets = batch[:, -1].unsqueeze(1) |
|
|
|
optimizer.zero_grad() |
|
outputs = model(inputs) |
|
loss = loss_fn(outputs.squeeze(), targets) |
|
loss.backward() |
|
optimizer.step() |
|
total_loss += loss.item() |
|
print(f"Loss: {total_loss / len(dataloader):.4f}") |
|
|
|
|
|
if name == "main": |
|
|
|
input_size = 10 |
|
hidden_size = 128 |
|
output_size = 1 |
|
|
|
|
|
model = CombinedModel(input_size, hidden_size, output_size) |
|
|
|
|
|
dataset_name = "your_dataset_name" |
|
dataset = preprocess_dataset(dataset_name) |
|
|
|
|
|
train_model(model, dataset) |
|
|
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import numpy as np |
|
from sklearn.ensemble import RandomForestClassifier |
|
from stable_baselines3 import PPO |
|
from gym import Env |
|
from gym.spaces import Box, Discrete |
|
import pandas as pd |
|
import os |
|
|
|
|
|
|
|
class UnifiedModel(nn.Module): |
|
def init(self, input_size, hidden_size, output_size): |
|
super(UnifiedModel, self).init() |
|
|
|
self.encoder = nn.LSTM(input_size, hidden_size, batch_first=True) |
|
|
|
self.logic_fc = nn.Sequential( |
|
nn.Linear(hidden_size, hidden_size * 2), |
|
nn.ReLU(), |
|
nn.Linear(hidden_size * 2, hidden_size) |
|
) |
|
|
|
self.decoder = nn.LSTM(hidden_size, output_size, batch_first=True) |
|
|
|
def forward(self, x): |
|
|
|
x, _ = self.encoder(x) |
|
x = x[:, -1, :] |
|
|
|
x = self.logic_fc(x) |
|
|
|
x = x.unsqueeze(1).repeat(1, 10, 1) |
|
x, _ = self.decoder(x) |
|
return x |
|
|
|
|
|
|
|
def load_dataset(file_path): |
|
|
|
ext = os.path.splitext(file_path)[1].lower() |
|
if ext == '.csv': |
|
data = pd.read_csv(file_path) |
|
elif ext in ['.xls', '.xlsx']: |
|
data = pd.read_excel(file_path) |
|
elif ext == '.json': |
|
data = pd.read_json(file_path) |
|
elif ext == '.txt': |
|
data = pd.read_csv(file_path, delimiter='\t') |
|
else: |
|
raise ValueError("Формат файла не поддерживается") |
|
|
|
|
|
data = data.select_dtypes(include=[np.number]).dropna() |
|
return torch.tensor(data.values, dtype=torch.float32) |
|
|
|
|
|
|
|
|
|
class CustomEnv(Env): |
|
def init(self, data): |
|
super(CustomEnv, self).init() |
|
self.data = data |
|
self.current_step = 0 |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box(low=-np.inf, high=np.inf, shape=(data.shape[1],), dtype=np.float32) |
|
|
|
def reset(self): |
|
self.current_step = 0 |
|
return self.data[self.current_step] |
|
|
|
def step(self, action): |
|
self.current_step += 1 |
|
reward = np.random.random() |
|
done = self.current_step >= len(self.data) |
|
return self.data[self.current_step % len(self.data)], reward, done, {} |
|
|
|
|
|
def train_with_rlcf(data): |
|
|
|
rf = RandomForestClassifier(n_estimators=100) |
|
X, y = data[:, :-1], data[:, -1] |
|
rf.fit(X, y) |
|
feature_importances = rf.feature_importances_ |
|
return feature_importances |
|
|
|
|
|
def train_with_ppo(data): |
|
env = CustomEnv(data) |
|
model = PPO("MlpPolicy", env, verbose=1) |
|
model.learn(total_timesteps=10000) |
|
return model |
|
|
|
|
|
|
|
def main(): |
|
|
|
file_path = input("Введите путь к файлу в директории Google Colab: ") |
|
|
|
|
|
data = load_dataset(file_path) |
|
print(f"Датасет загружен! Размер: {data.shape}") |
|
|
|
|
|
input_size = data.shape[1] - 1 |
|
hidden_size = 128 |
|
output_size = 1 |
|
model = UnifiedModel(input_size, hidden_size, output_size) |
|
|
|
|
|
feature_importances = train_with_rlcf(data) |
|
print("Feature Importances (RLCF):", feature_importances) |
|
|
|
|
|
ppo_model = train_with_ppo(data) |
|
print("PPO обучение завершено!") |
|
|
|
import os |
|
import torch |
|
import torch.nn as nn |
|
import torch.optim as optim |
|
import numpy as np |
|
from sklearn.ensemble import RandomForestClassifier |
|
from stable_baselines3 import PPO |
|
from gym import Env |
|
from gym.spaces import Box, Discrete |
|
import pandas as pd |
|
|
|
|
|
|
|
|
|
|
|
class CreativeNet(nn.Module): |
|
def init(self): |
|
super(CreativeNet, self).init() |
|
self.fc = nn.Linear(32, 64) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
|
|
class LogicNet(nn.Module): |
|
def init(self): |
|
super(LogicNet, self).init() |
|
self.fc = nn.Linear(64, 128) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
|
|
class MathNet(nn.Module): |
|
def init(self): |
|
super(MathNet, self).init() |
|
self.fc = nn.Linear(128, 32) |
|
|
|
def forward(self, x): |
|
return torch.relu(self.fc(x)) |
|
|
|
|
|
class CombinedModel(nn.Module): |
|
def init(self): |
|
super(CombinedModel, self).init() |
|
self.creative_net = CreativeNet() |
|
self.logic_net = LogicNet() |
|
self.math_net = MathNet() |
|
|
|
def forward(self, x): |
|
x = self.creative_net(x) |
|
x = self.logic_net(x) |
|
x = self.math_net(x) |
|
return x |
|
|
|
|
|
|
|
|
|
def load_dataset(file_path): |
|
try: |
|
if file_path.endswith(('.csv', '.txt')): |
|
data = pd.read_csv(file_path) |
|
elif file_path.endswith(('.xls', '.xlsx')): |
|
data = pd.read_excel(file_path) |
|
elif file_path.endswith('.json'): |
|
data = pd.read_json(file_path) |
|
else: |
|
raise ValueError(f"Формат файла {file_path} не поддерживается.") |
|
|
|
|
|
data = data.select_dtypes(include=[np.number]).dropna() |
|
return torch.tensor(data.values, dtype=torch.float32) |
|
except Exception as e: |
|
print(f"Ошибка при загрузке файла: {e}") |
|
return None |
|
|
|
|
|
|
|
|
|
|
|
class CustomEnv(Env): |
|
def init(self, data): |
|
super(CustomEnv, self).init() |
|
self.data = data |
|
self.current_step = 0 |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box(low=-np.inf, high=np.inf, shape=(data.shape[1],), dtype=np.float32) |
|
|
|
def reset(self): |
|
self.current_step = 0 |
|
return self.data[self.current_step] |
|
|
|
def step(self, action): |
|
self.current_step += 1 |
|
reward = np.random.random() |
|
done = self.current_step >= len(self.data) |
|
return self.data[self.current_step % len(self.data)], reward, done, {} |
|
|
|
|
|
def train_with_rlcf(data): |
|
rf = RandomForestClassifier(n_estimators=100) |
|
X, y = data[:, :-1], data[:, -1] |
|
rf.fit(X, y) |
|
return rf.feature_importances_ |
|
|
|
|
|
class CustomEnv(Env): |
|
def init(self, data): |
|
super(CustomEnv, self).init() |
|
self.data = data.numpy() |
|
self.current_step = 0 |
|
self.action_space = Discrete(3) |
|
self.observation_space = Box( |
|
low=-np.inf, high=np.inf, shape=(self.data.shape[1] - 1,), dtype=np.float32 |
|
) |
|
|
|
def reset(self): |
|
|
|
self.current_step = 0 |
|
|
|
return self.data[self.current_step, :-1].astype(np.float32) |
|
|
|
def step(self, action): |
|
|
|
reward = float(np.random.random()) |
|
|
|
self.current_step += 1 |
|
|
|
done = self.current_step >= len(self.data) |
|
|
|
obs = self.data[self.current_step % len(self.data), :-1].astype(np.float32) |
|
return obs, reward, done, {} |
|
|
|
def main(): |
|
|
|
file_path = input("Введите путь к вашему файлу в Google Colab: ").strip() |
|
data = load_dataset(file_path) |
|
|
|
if data is None: |
|
print("Не удалось загрузить датасет.") |
|
return |
|
|
|
print(f"Датасет успешно загружен! Размер данных: {data.shape}") |
|
|
|
|
|
model = CombinedModel() |
|
print("Объединённая модель создана!") |
|
|
|
|
|
feature_importances = train_with_rlcf(data) |
|
print("Feature Importances (RLCF):", feature_importances) |
|
|
|
|
|
ppo_model = train_with_ppo(data) |
|
print("PPO обучение завершено!") |
|
|
|
|
|
|
|
main() |