LiberalMind / liberal_mind_beta (1).py
liberalusa's picture
Upload liberal_mind_beta (1).py
0207e4a verified
# -*- coding: utf-8 -*-
"""liberal mind beta
Automatically generated by Colab.
Original file is located at
https://colab.research.google.com/drive/1WZLrb1Gf63n6vXBvGFEWwIPt1BU_5xc9
# ***INSTALL LIBRARIES***
"""
pip install stable_baselines3
pip install datasets
pip install torch-geometric
pip install gym==0.21.0
pip install shimmy
"""# ***DOWNLOAD DATASETS***"""
from datasets import load_dataset
from huggingface_hub import list_datasets
def find_and_load_dataset(keyword):
# Ищем датасеты по ключевому слову
matching_datasets = [ds.id for ds in list_datasets() if keyword.lower() in ds.id.lower()]
# Проверяем, найдены ли соответствующие датасеты
if not matching_datasets:
print("Нет датасетов, содержащих указанное ключевое слово.")
return None
# Выбираем первый найденный датасет
dataset_name = matching_datasets[0]
print(f"Найден датасет: {dataset_name}. Загружаем его...")
# Загружаем датасет
dataset = load_dataset(dataset_name)
return dataset
# Пример использования
keyword = input("Введите ключевое слово для поиска датасета: ")
dataset = find_and_load_dataset(keyword)
if dataset:
print("Датасет загружен успешно!")
print(dataset)
import os
import shutil
from datasets import load_dataset
# Имя и путь для сохранения
dataset_name = "Unified-Language-Model-Alignment/Anthropic_HH_Golden" # Замените на точное имя датасета
save_dir = "/content/dataset" # Путь для сохранения в директории Google Colab
# Загружаем датасет
dataset = load_dataset(dataset_name)
# Убеждаемся, что папка для сохранения существует
os.makedirs(save_dir, exist_ok=True)
# Копируем файлы датасета из кэша в нужную директорию
for split_cache_files in dataset.cache_files.values():
for cache_file in split_cache_files:
shutil.copy2(cache_file['filename'], save_dir)
print(f"Датасет '{dataset_name}' сохранен в папке {save_dir}")
"""# ***ENCODER TRANSFORMERS MATH AI***"""
import numpy as np
import tensorflow as tf
import torch
from torch import nn, optim
from sklearn.ensemble import RandomForestClassifier
from scipy.cluster.hierarchy import linkage, fcluster
from sklearn.preprocessing import StandardScaler
from stable_baselines3 import PPO
import numpy as np
import torch
import torch.nn as nn
from stable_baselines3 import PPO
from gym import Env
from gym.spaces import Discrete, Box
# Оптимизированные гиперпараметры для многоязыковой LLM
ppo_hyperparameters = {
"n_steps": 1024, # Увеличение шагов для лучшего захвата зависимости данных
"batch_size": 64, # Оптимальный размер для стабилизации обучения
"n_epochs": 1, # Баланс скорости обновления и обучения
"gamma": 0.99, # Стандартное значение дисконтирования
"learning_rate": 3e-4, # Стандартный темп обучения
"clip_range": 0.2, # Оптимальное значение для стабильности
"gae_lambda": 0.95, # Гладкость обобщенного преимущества
"vf_coef": 0.5, # Коэффициент функции ценности
"ent_coef": 0.01, # Коэффициент энтропии для исследования
"max_grad_norm": 0.5, # Ограничение градиента
"target_kl": 0.03, # Целевое значение KL-дивергенции
"penalty_coef": 0.05, # Регуляция для устойчивости
"epsilon": 0.15, # Умеренная случайность
"adv_norm": True, # Нормализация преимущества
"weight_init": "xavier" # Инициализация весов
}
class ComplexEnv(Env):
def __init__(self):
super(ComplexEnv, self).__init__()
self.action_space = Discrete(3)
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32)
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
def reset(self):
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
return self.state
def step(self, action):
reward = -0.1
self.step_count += 1
if action == 0:
reward += 1 if self.state[0] > 0 else -1 * ppo_hyperparameters["penalty_coef"]
elif action == 1:
reward += 0.5 if np.sum(self.state) > 0 else -2 * ppo_hyperparameters["penalty_coef"]
else:
reward += -0.5 if self.state[1] < 0 else 1 * ppo_hyperparameters["penalty_coef"]
self.state += np.random.normal(0, 0.5, size=self.state.shape)
done = self.step_count >= 100
return self.state, reward, done, {}
# Инициализация среды
env = ComplexEnv()
# Определение и настройка модели PPO с новыми гиперпараметрами и инициализацией весов
class EpsilonPPO(PPO):
def __init__(self, policy, env, **kwargs):
super(EpsilonPPO, self).__init__(policy, env, **kwargs)
self.epsilon = ppo_hyperparameters["epsilon"]
# Инициализация весов
for layer in self.policy.modules():
if isinstance(layer, (nn.Linear, nn.Conv2d)):
if ppo_hyperparameters["weight_init"] == "xavier":
nn.init.xavier_uniform_(layer.weight)
elif ppo_hyperparameters["weight_init"] == "kaiming":
nn.init.kaiming_uniform_(layer.weight)
def _predict(self, observation, deterministic=False):
if np.random.rand() < self.epsilon:
return self.env.action_space.sample()
else:
return super().predict(observation, deterministic=deterministic)
# Создание PPO модели с новыми гиперпараметрами и нормализацией
model = EpsilonPPO(
policy="MlpPolicy",
env=env,
verbose=1,
n_steps=ppo_hyperparameters["n_steps"],
batch_size=ppo_hyperparameters["batch_size"],
n_epochs=ppo_hyperparameters["n_epochs"],
gamma=ppo_hyperparameters["gamma"],
learning_rate=ppo_hyperparameters["learning_rate"],
clip_range=ppo_hyperparameters["clip_range"],
gae_lambda=ppo_hyperparameters["gae_lambda"],
vf_coef=ppo_hyperparameters["vf_coef"],
ent_coef=ppo_hyperparameters["ent_coef"],
max_grad_norm=ppo_hyperparameters["max_grad_norm"],
target_kl=ppo_hyperparameters["target_kl"]
)
# Обучение модели
model.learn(total_timesteps=50000)
# Проверка работы агента
obs = env.reset()
for _ in range(100):
action, _ = model.predict(obs)
obs, reward, done, _ = env.step(action)
if done:
obs = env.reset()
import numpy as np
import torch
import torch.nn as nn
from sklearn.ensemble import RandomForestClassifier
from sklearn.cluster import AgglomerativeClustering
from scipy.spatial.distance import euclidean
from sklearn.preprocessing import StandardScaler
from stable_baselines3 import PPO
from gym import Env
from gym.spaces import Discrete, Box
# Настройка Random Forest
def classify_data(X, y):
rf = RandomForestClassifier(n_estimators=100)
rf.fit(X, y)
feature_importances = rf.feature_importances_
return feature_importances, rf
# Иерархическая кластеризация
def cluster_data(X):
clustering = AgglomerativeClustering(n_clusters=5, affinity='euclidean', linkage='ward')
clusters = clustering.fit_predict(X)
return clusters
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(LSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
self.fc = nn.Linear(hidden_size, output_size)
def forward(self, x):
_, (hn, _) = self.lstm(x) # передаём через LSTM
out = self.fc(hn[-1]) # последний скрытый слой
return out
# Параметры LSTM
input_size = 5
hidden_size = 32
output_size = 3
lstm_agent = LSTMModel(input_size, hidden_size, output_size)
class ComplexEnvWithLSTM(Env):
def __init__(self):
super(ComplexEnvWithLSTM, self).__init__()
self.action_space = Discrete(3)
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32)
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
def reset(self):
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
return self.state
def step(self, action):
reward = -0.1
self.step_count += 1
# Логика наград
if action == 0:
reward += 1 if self.state[0] > 0 else -0.5
elif action == 1:
reward += 0.5 if np.sum(self.state) > 0 else -1
else:
reward += -0.5 if self.state[1] < 0 else 1
# Обновляем состояние с учетом LSTM
self.state += np.random.normal(0, 0.5, size=self.state.shape)
done = self.step_count >= 100
return self.state, reward, done, {}
env = ComplexEnvWithLSTM()
def classify_data(X, y):
rf = RandomForestClassifier(n_estimators=100)
rf.fit(X, y)
feature_importances = rf.feature_importances_
return feature_importances, rf
def cluster_data(X):
clustering = AgglomerativeClustering(n_clusters=5, metric='euclidean', linkage='ward')
clusters = clustering.fit_predict(X)
return clusters
class AdvancedLSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True):
super(AdvancedLSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True)
# Полносвязные слои
self.fc1 = nn.Linear(hidden_size, 1000)
self.fc2 = nn.Linear(1000, 2000)
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU()
def forward(self, x):
_, (hn, _) = self.lstm(x)
x = self.fc1(hn[-1])
x = self.activation(x)
x = self.fc2(x)
return x
# Параметры LSTM
input_size = 5
hidden_size = 256
output_size = 2000
num_layers = 3
dropout = 0.3
use_sigmoid = True
lstm_agent = AdvancedLSTMModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid)
class ComplexEnvWithLSTM(Env):
def __init__(self):
super(ComplexEnvWithLSTM, self).__init__()
self.action_space = Discrete(3)
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32)
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
def reset(self):
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
return self.state
def step(self, action):
reward = -0.1
self.step_count += 1
if action == 0:
reward += 1 if self.state[0] > 0 else -0.5
elif action == 1:
reward += 0.5 if np.sum(self.state) > 0 else -1
else:
reward += -0.5 if self.state[1] < 0 else 1
# Обновляем состояние
self.state += np.random.normal(0, 0.5, size=self.state.shape)
done = self.step_count >= 100
return self.state, reward, done, {}
env = ComplexEnvWithLSTM()
# Генерация данных для Random Forest и кластеризации
X = np.random.rand(100, 5) # Замените на реальные данные
y = np.random.randint(0, 2, size=100)
feature_importances, rf = classify_data(X, y)
clusters = cluster_data(X)
# Настройка PPO с LSTM
model = PPO(
policy="MlpPolicy",
env=env,
verbose=1,
learning_rate=5e-4,
n_steps=512,
batch_size=32,
n_epochs=4,
gamma=0.99,
clip_range=0.2,
gae_lambda=0.95,
vf_coef=0.5,
ent_coef=0.005,
max_grad_norm=0.5,
target_kl=0.03,
)
# Обучение PPO с использованием LSTM
model.learn(total_timesteps=20000)
# Проверка работы агента
obs = env.reset()
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
for _ in range(100):
action_probs = lstm_agent(obs)
action = action_probs.argmax().item()
obs, reward, done, _ = env.step(action)
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
if done:
obs = env.reset()
"""# ***DECODER TRANSFORMERS MATH AI***"""
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.linear_model import LinearRegression
from stable_baselines3 import PPO
from gym import Env
from gym.spaces import Discrete, Box
# Функция множественной линейной регрессии
def distribute_outputs(X, y):
lin_reg = LinearRegression()
lin_reg.fit(X, y)
distributed_outputs = lin_reg.predict(X)
return distributed_outputs
class LSTMDecoderModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True):
super(LSTMDecoderModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True)
# Полносвязные слои
self.fc1 = nn.Linear(hidden_size, 1000)
self.fc2 = nn.Linear(1000, 2000)
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU()
def forward(self, x):
_, (hn, _) = self.lstm(x)
x = self.fc1(hn[-1])
x = self.activation(x)
x = self.fc2(x)
return x
# Параметры модели
input_size = 5
hidden_size = 256
output_size = 2000
num_layers = 3
dropout = 0.3
use_sigmoid = True
lstm_decoder = LSTMDecoderModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid)
class ComplexEnvForDecoder(Env):
def __init__(self):
super(ComplexEnvForDecoder, self).__init__()
self.action_space = Discrete(3)
self.observation_space = Box(low=-10, high=10, shape=(5,), dtype=np.float32)
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
def reset(self):
self.state = np.random.uniform(low=-1, high=1, size=(5,))
self.step_count = 0
return self.state
def step(self, action):
reward = -0.1
self.step_count += 1
if action == 0:
reward += 1 if self.state[0] > 0 else -0.5
elif action == 1:
reward += 0.5 if np.sum(self.state) > 0 else -1
else:
reward += -0.5 if self.state[1] < 0 else 1
self.state += np.random.normal(0, 0.5, size=self.state.shape)
done = self.step_count >= 100
return self.state, reward, done, {}
env_decoder = ComplexEnvForDecoder()
# Генерация данных
X = np.random.rand(100, 5) # Замените на реальные данные
y = np.random.randint(0, 2, size=100)
distributed_outputs = distribute_outputs(X, y)
# Настройка PPO с LSTM-декодером
model_decoder = PPO(
policy="MlpPolicy",
env=env_decoder,
verbose=1,
learning_rate=5e-4,
n_steps=512,
batch_size=32,
n_epochs=4,
gamma=0.99,
clip_range=0.2,
gae_lambda=0.95,
vf_coef=0.5,
ent_coef=0.005,
max_grad_norm=0.5,
target_kl=0.03,
)
# Обучение PPO с использованием LSTM-декодера
model_decoder.learn(total_timesteps=20000)
# Проверка работы декодера
obs = env_decoder.reset()
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
for _ in range(100):
action_probs = lstm_decoder(obs)
action = action_probs.argmax().item()
obs, reward, done, _ = env_decoder.step(action)
obs = torch.tensor(obs, dtype=torch.float32).unsqueeze(0).unsqueeze(0)
if done:
obs = env_decoder.reset()
"""# ***TRANSFORMERS MATH AI***"""
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.ensemble import RandomForestClassifier
from sklearn.cluster import AgglomerativeClustering
from sklearn.linear_model import LinearRegression
from stable_baselines3 import PPO
from gym import Env
from gym.spaces import Discrete, Box
class EncoderLSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True):
super(EncoderLSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True)
self.fc1 = nn.Linear(hidden_size, output_size)
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU()
def forward(self, x):
_, (hn, _) = self.lstm(x)
x = self.fc1(hn[-1])
return self.activation(x)
class DecoderLSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, output_size, num_layers=2, dropout=0.3, use_sigmoid=True):
super(DecoderLSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True)
self.fc1 = nn.Linear(hidden_size, output_size)
self.activation = nn.Sigmoid() if use_sigmoid else nn.ReLU()
def forward(self, x):
_, (hn, _) = self.lstm(x)
x = self.fc1(hn[-1])
return self.activation(x)
class TransformerModule(nn.Module):
def __init__(self, input_size, hidden_size, num_heads=2, num_layers=2):
super(TransformerModule, self).__init__()
self.transformer_layer = nn.Transformer(d_model=input_size, nhead=num_heads, num_encoder_layers=num_layers, num_decoder_layers=num_layers)
self.fc = nn.Linear(input_size, hidden_size)
def forward(self, src, tgt):
# Проходим через трансформер
output = self.transformer_layer(src, tgt)
# Преобразуем выход трансформера для передачи в декодер
return self.fc(output)
# Параметры для модели
input_size = 32 # Сделаем input_size кратным num_heads
hidden_size = 256
output_size = 2000
num_heads = 4 # Убедимся, что input_size % num_heads == 0
num_layers = 2
dropout = 0.3
use_sigmoid = True
# Инициализация компонентов
encoder = EncoderLSTMModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid)
decoder = DecoderLSTMModel(input_size, hidden_size, output_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid)
transformer = TransformerModule(input_size, hidden_size, num_heads=num_heads, num_layers=num_layers)
import torch
import torch.nn as nn
# Энкодер на основе LSTM
class EncoderLSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.0, use_sigmoid=False):
super(EncoderLSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_size, input_size) # Сохраняем выходной размер равным input_size
self.use_sigmoid = use_sigmoid
def forward(self, x):
lstm_out, _ = self.lstm(x)
output = self.fc(lstm_out[:, -1, :]) # Используем только последний выход LSTM
if self.use_sigmoid:
output = torch.sigmoid(output)
return output
# Декодер на основе LSTM
class DecoderLSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers=1, dropout=0.0, use_sigmoid=False):
super(DecoderLSTMModel, self).__init__()
self.lstm = nn.LSTM(input_size, hidden_size, num_layers=num_layers, dropout=dropout, batch_first=True)
self.fc = nn.Linear(hidden_size, input_size) # Сохраняем выходной размер равным input_size
self.use_sigmoid = use_sigmoid
def forward(self, x):
lstm_out, _ = self.lstm(x)
output = self.fc(lstm_out[:, -1, :]) # Используем только последний выход LSTM
if self.use_sigmoid:
output = torch.sigmoid(output)
return output
# Трансформер
class TransformerModule(nn.Module):
def __init__(self, input_size, num_heads=4, num_layers=2):
super(TransformerModule, self).__init__()
self.transformer_layer = nn.Transformer(
d_model=input_size, # Убедитесь, что это соответствует размерности входа
nhead=num_heads,
num_encoder_layers=num_layers,
num_decoder_layers=num_layers
)
self.fc = nn.Linear(input_size, input_size) # Для преобразования выходных данных
def forward(self, src, tgt):
# Проверка размерности входных данных
print(f"src shape before transformer: {src.shape}")
print(f"tgt shape before transformer: {tgt.shape}")
# Проходим через трансформер
output = self.transformer_layer(src, tgt)
# Преобразуем выход трансформера для передачи в декодер
return self.fc(output)
# Объединённая модель
class CombinedModel(nn.Module):
def __init__(self, encoder, decoder, transformer):
super(CombinedModel, self).__init__()
self.encoder = encoder
self.decoder = decoder
self.transformer = transformer
def forward(self, x):
# Пропускаем через энкодер
encoded = self.encoder(x)
# Подготавливаем входы и выходы для трансформера
src = encoded.unsqueeze(1) # Изменяем размерность: (batch_size, 1, input_size)
tgt = torch.zeros_like(src) # Создаём нулевую целевую последовательность
# Убедимся, что размеры правильные
print(f"CombinedModel: src shape: {src.shape}, tgt shape: {tgt.shape}")
# Пропускаем через трансформер
transformed = self.transformer(src, tgt)
# Передаём в декодер
output = self.decoder(transformed)
return output
# Параметры для модели
input_size = 32 # Размерность входа
hidden_size = 256 # Размерность скрытого слоя
num_heads = 4 # Количество голов
num_layers = 2 # Количество слоёв
dropout = 0.3 # Дропаут
use_sigmoid = True # Использование сигмоиды
# Инициализация компонентов
encoder = EncoderLSTMModel(input_size, hidden_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid)
decoder = DecoderLSTMModel(input_size, hidden_size, num_layers=num_layers, dropout=dropout, use_sigmoid=use_sigmoid)
transformer = TransformerModule(input_size, num_heads=num_heads, num_layers=num_layers)
# Создание объединённой модели
combined_model = CombinedModel(encoder, decoder, transformer)
# Пример данных
batch_size = 10
seq_length = 5
example_input = torch.randn((batch_size, seq_length, input_size)) # Генерация случайного входа
output = combined_model(example_input)
print("Output shape:", output.shape) # Вывод формы результата
"""# ***CREATION AI***"""
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from tqdm import tqdm
# Параметры диффузии и обучения
num_steps = 1000 # Количество шагов диффузии
input_dim = 784 # Например, для изображений 28x28 = 784
batch_size = 64 # Размер батча
learning_rate = 1e-4
# Параметры альфа
beta_start = 1e-4
beta_end = 0.02
beta = np.linspace(beta_start, beta_end, num_steps)
alpha = 1 - beta
alpha_cumprod = np.cumprod(alpha)
# Гиперпараметры модели
class DiffusionModel(nn.Module):
def init(
self, input_dim, hidden_dim=512, output_dim=784,
num_layers=3, activation_function="ReLU", batch_norm=False, layer_norm=False,
use_skip_connections=True, dropout_rate=0.1, use_time_embedding=True,
time_embedding_dim=16, noise_scaling_factor=0.1, optimizer="adam",
learning_rate=1e-4, weight_decay=1e-5, gradient_clip_value=5.0,
scheduler_step_size=50, scheduler_gamma=0.95, beta_schedule="linear",
noise_type="gaussian", noise_seed=None, min_noise_std=0.1, max_noise_std=1.0,
use_positional_encoding=False, positional_encoding_scale=1.0,
max_training_epochs=100, min_learning_rate=1e-6, warmup_steps=500
):
super(DiffusionModel, self).init()
# Основные гиперпараметры
self.use_skip_connections = use_skip_connections
self.use_time_embedding = use_time_embedding
self.noise_scaling_factor = noise_scaling_factor
self.time_embedding_dim = time_embedding_dim
self.noise_type = noise_type
self.min_noise_std = min_noise_std
self.max_noise_std = max_noise_std
self.noise_seed = noise_seed
self.use_positional_encoding = use_positional_encoding
self.positional_encoding_scale = positional_encoding_scale
# Инициализация архитектуры сети
layers = []
in_dim = input_dim + (self.time_embedding_dim if use_time_embedding else 0)
for _ in range(num_layers):
layers.append(nn.Linear(in_dim, hidden_dim))
if batch_norm:
layers.append(nn.BatchNorm1d(hidden_dim))
elif layer_norm:
layers.append(nn.LayerNorm(hidden_dim))
if activation_function.lower() == "relu":
layers.append(nn.ReLU())
elif activation_function.lower() == "leakyrelu":
layers.append(nn.LeakyReLU())
elif activation_function.lower() == "tanh":
layers.append(nn.Tanh())
if dropout_rate > 0:
layers.append(nn.Dropout(dropout_rate))
in_dim = hidden_dim
layers.append(nn.Linear(hidden_dim, output_dim))
self.network = nn.Sequential(*layers)
def forward(self, x, t):
if self.use_time_embedding:
t_embedding = torch.sin(t.float() * 2 * np.pi / num_steps).unsqueeze(-1)
t_embedding = t_embedding * self.time_embedding_dim
x = torch.cat([x, t_embedding], dim=1)
return self.network(x)
# Функция добавления шума
def add_noise(x, t, noise_type="gaussian", min_noise_std=0.1, max_noise_std=1.0,
noise_seed=None, noise_scaling_factor=0.1, beta_schedule="linear"):
if noise_seed is not None:
torch.manual_seed(noise_seed)
noise_std = min_noise_std + t * (max_noise_std - min_noise_std) / num_steps
noise = torch.randn_like(x) * noise_std if noise_type == "gaussian" else torch.rand_like(x) * noise_std
if beta_schedule == "linear":
alpha_t = alpha_cumprod[t].view(-1, 1)
elif beta_schedule == "cosine":
alpha_t = torch.cos(t * np.pi / num_steps).view(-1, 1)
noisy_x = torch.sqrt(alpha_t) * x + torch.sqrt(1 - alpha_t) * noise * noise_scaling_factor
return noisy_x
"""# ***DIFFUSION TRANSFORMERS***"""
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from tqdm import tqdm
# Параметры
num_steps = 1000 # Количество шагов диффузии
input_dim = 784 # Например, для изображений 28x28 = 784
batch_size = 64 # Размер батча
learning_rate = 1e-4
# Параметры альфа
beta_start = 1e-4
beta_end = 0.02
beta = np.linspace(beta_start, beta_end, num_steps)
alpha = 1 - beta
alpha_cumprod = np.cumprod(alpha)
# Преобразование alpha_cumprod в тензор PyTorch
alpha_cumprod_tensor = torch.tensor(alpha_cumprod, dtype=torch.float32)
# Определение диффузионного энкодера
class DiffusionEncoder(nn.Module):
def __init__(self):
super(DiffusionEncoder, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim + 1, 512), # Добавляем 1 для временной переменной
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, input_dim) # Выходной размер должен соответствовать input_dim
)
def forward(self, x, t):
t_embedding = torch.sin(t.float() * 2 * np.pi / num_steps).unsqueeze(-1) # Временная переменная
x = torch.cat([x, t_embedding], dim=1) # Объединяем x и t_embedding
return self.network(x)
# Определение декодера (полносвязная нейросеть)
class FullyConnectedDecoder(nn.Module):
def __init__(self):
super(FullyConnectedDecoder, self).__init__()
self.network = nn.Sequential(
nn.Linear(input_dim, 512),
nn.ReLU(),
nn.Linear(512, 512),
nn.ReLU(),
nn.Linear(512, input_dim)
)
def forward(self, x):
return self.network(x)
# Функция добавления шума
def add_noise(x, t):
noise = torch.randn_like(x)
alpha_t = alpha_cumprod_tensor[t].view(-1, 1) # Индексация тензора alpha_cumprod_tensor
noisy_x = torch.sqrt(alpha_t) * x + torch.sqrt(1 - alpha_t) * noise
return noisy_x, noise
# Инициализация модели и оптимизатора
encoder = DiffusionEncoder()
decoder = FullyConnectedDecoder()
optimizer = optim.Adam(list(encoder.parameters()) + list(decoder.parameters()), lr=learning_rate)
# Обучение модели
for epoch in range(2): # 100 эпох
for _ in tqdm(range(1000)): # 1000 шагов обучения
# Генерация случайных данных
x = torch.randn(batch_size, input_dim)
t = torch.randint(0, num_steps, (batch_size,)) # Случайные временные шаги
# Добавление шума
noisy_x, noise = add_noise(x, t)
# Прямой проход через энкодер
encoded = encoder(noisy_x, t)
# Прямой проход через декодер
decoded = decoder(encoded)
# Расчет потерь (например, MSE)
loss = nn.MSELoss()(decoded, x)
# Обратный проход и обновление весов
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f'Epoch {epoch + 1}, Loss: {loss.item():.4f}')
"""# ***SELF-AWARNESS AI***"""
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from tqdm import tqdm
# Гиперпараметры
input_size = 10 # Размерность входных данных
hidden_size = 20 # Размер скрытого слоя LSTM
num_layers = 2 # Количество слоев LSTM
seq_len = 5 # Длина последовательности
batch_size = 1 # Размер батча
num_epochs = 5 # Количество эпох
learning_rate = 1e-4 # Скорость обучения
# Гиперпараметры для графовой нейросети
gnn_params = {
'input_dim': input_size, # Входной размер
'hidden_dim': 32, # Скрытый размер первого слоя
'hidden_dim_2': 64, # Скрытый размер второго слоя
'output_dim': input_size, # Выходной размер
'activation_function': 'ReLU', # Функция активации
'dropout_rate': 0.2, # Дроп-аут
'batch_norm': True, # Использовать батч-нормализацию
}
# Гиперпараметры для LSTM
lstm_params = {
'input_size': input_size, # Размерность входа
'hidden_size': hidden_size, # Размер скрытого слоя
'num_layers': num_layers, # Количество слоев
'dropout': 0.2, # Дроп-аут
'bidirectional': False, # Двунаправленный LSTM
'activation_function': 'Tanh', # Функция активации
}
# Определение графовой нейросети
class GraphNeuralNetwork(nn.Module):
def __init__(self, params):
super(GraphNeuralNetwork, self).__init__()
self.fc1 = nn.Linear(params['input_dim'], params['hidden_dim'])
self.fc2 = nn.Linear(params['hidden_dim'], params['hidden_dim_2'])
self.fc3 = nn.Linear(params['hidden_dim_2'], params['output_dim'])
self.dropout = nn.Dropout(params['dropout_rate'])
def forward(self, x):
x = F.relu(self.fc1(x))
x = self.dropout(x)
x = F.relu(self.fc2(x))
x = self.dropout(x)
return self.fc3(x) # Возвращаем выходный размер равный input_size
# Определение модели LSTM
class LSTMPredictor(nn.Module):
def __init__(self, params):
super(LSTMPredictor, self).__init__()
self.lstm = nn.LSTM(params['input_size'], params['hidden_size'], params['num_layers'],
batch_first=True, dropout=params['dropout'])
self.fc = nn.Linear(params['hidden_size'], 1)
def forward(self, x):
lstm_out, _ = self.lstm(x)
last_hidden = lstm_out[:, -1, :]
return self.fc(last_hidden)
# Определение модели EmotionAwareness с GNN и LSTM
class EmotionAwarenessModel(nn.Module):
def __init__(self, gnn_params, lstm_params):
super(EmotionAwarenessModel, self).__init__()
self.gnn = GraphNeuralNetwork(gnn_params)
self.lstm = LSTMPredictor(lstm_params)
def forward(self, x):
gnn_out = self.gnn(x) # GNN output: shape (batch_size, input_size)
gnn_out = gnn_out.unsqueeze(1) # Add sequence length dimension: shape (batch_size, 1, input_size)
return self.lstm(gnn_out) # Pass to LSTM
# Создание модели
model = EmotionAwarenessModel(gnn_params, lstm_params)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# Пример данных (batch_size, input_size)
x = torch.randn(batch_size, input_size) # Should be (batch_size, input_size)
target = torch.randn(batch_size, 1) # Целевое значение
# Обучение модели
for epoch in range(num_epochs):
model.train()
optimizer.zero_grad()
# Прямой проход
output = model(x)
# Расчет потерь
loss = F.mse_loss(output, target)
loss.backward()
# Обновление параметров
optimizer.step()
print(f'Epoch {epoch + 1}, Loss: {loss.item():.4f}')
"""# ***NEW METHOD MACHINE LEARNING***"""
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
import torch.nn.functional as F
# Линейная регрессия для эталонного решения
class LinearRegressionModel:
def init(self):
self.model = LinearRegression()
def fit(self, X, y):
self.model.fit(X, y)
def predict(self, X):
return self.model.predict(X)
# Графовая нейросеть для подбора альтернативных решений
class GraphNeuralNetwork(nn.Module):
def init(self, in_channels, out_channels):
super(GraphNeuralNetwork, self).init()
self.conv1 = GCNConv(in_channels, 16)
self.conv2 = GCNConv(16, out_channels)
def forward(self, data):
x, edge_index = data.x, data.edge_index
x = self.conv1(x, edge_index)
x = F.relu(x)
x = self.conv2(x, edge_index)
return x
# Q-обучение с PPO
class QLearningWithPPO:
def init(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.q_net = nn.Linear(state_dim, action_dim)
self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
self.gamma = gamma
self.eps_clip = 0.2
def get_action(self, state):
q_values = self.q_net(state)
action = torch.argmax(q_values).item()
return action
def update(self, state, action, reward, next_state):
q_values = self.q_net(state)
q_next = self.q_net(next_state).detach()
target = reward + self.gamma * torch.max(q_next)
loss = F.mse_loss(q_values[action], target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Метод обучения логики
def logic_learning_with_q_ppo(dataset, num_alternatives=10):
lin_model = LinearRegressionModel()
X, y = dataset[:, :-1], dataset[:, -1]
lin_model.fit(X, y)
base_solution = lin_model.predict(X)
gnn_model = GraphNeuralNetwork(in_channels=1, out_channels=1)
optimizer_gnn = optim.Adam(gnn_model.parameters(), lr=0.01)
q_ppo = QLearningWithPPO(state_dim=1, action_dim=num_alternatives)
edge_index = torch.tensor([[0, 1], [1, 0]], dtype=torch.long)
x = torch.tensor(base_solution, dtype=torch.float).view(-1, 1)
data = Data(x=x, edge_index=edge_index)
for i in range(num_alternatives):
state = torch.tensor([i], dtype=torch.float)
action = q_ppo.get_action(state)
# Тренировка графовой сети
gnn_model.train()
for epoch in range(50):
optimizer_gnn.zero_grad()
out = gnn_model(data)
loss = F.mse_loss(out, x)
loss.backward()
optimizer_gnn.step()
solution = out.detach().numpy().flatten()
reward = -np.abs(base_solution - solution).sum()
next_state = torch.tensor([i + 1], dtype=torch.float)
q_ppo.update(state, action, reward, next_state)
print(f"Alternative solution {i+1}: reward = {reward}")
return solution
"""# ***COMBINED MODELS LIBERALMIND***"""
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.nn import GCNConv
from torch_geometric.data import Data
import torch.nn.functional as F
from sklearn.linear_model import LinearRegression
# Линейная регрессия для эталонного решения
class LinearRegressionModel:
def __init__(self, params):
self.params = params
self.model = LinearRegression(
fit_intercept=params['fit_intercept'],
copy_X=params['copy_X'],
n_jobs=params['n_jobs']
)
def fit(self, X, y):
if self.params.get('normalize', False):
X = (X - np.mean(X, axis=0)) / np.std(X, axis=0)
self.model.fit(X, y)
def predict(self, X):
return self.model.predict(X)
# Графовая нейросеть для подбора альтернативных решений
class GraphNeuralNetwork(nn.Module):
def __init__(self, params):
super(GraphNeuralNetwork, self).__init__()
self.conv1 = GCNConv(params['in_channels'], params['hidden_dim1'])
self.conv2 = GCNConv(params['hidden_dim1'], params['hidden_dim2'])
self.conv3 = GCNConv(params['hidden_dim2'], params['out_channels'])
self.dropout = params['dropout']
def forward(self, data):
x, edge_index = data.x, data.edge_index
x = self.conv1(x, edge_index)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.conv2(x, edge_index)
x = F.relu(x)
x = F.dropout(x, p=self.dropout, training=self.training)
x = self.conv3(x, edge_index)
return x
# Q-обучение с PPO
class QLearningWithPPO:
def __init__(self, state_dim, action_dim, lr=0.001, gamma=0.99):
self.q_net = nn.Linear(state_dim, action_dim)
self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
self.gamma = gamma
def get_action(self, state):
q_values = self.q_net(state)
action = torch.argmax(q_values).item()
return action
def update(self, state, action, reward, next_state):
q_values = self.q_net(state)
q_next = self.q_net(next_state).detach()
target = reward + self.gamma * torch.max(q_next)
loss = F.mse_loss(q_values[action], target)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
# Определение базовых нейросетей для примера
class CreativeNet(nn.Module):
def __init__(self):
super(CreativeNet, self).__init__()
self.fc = nn.Linear(10, 20)
def forward(self, x):
return torch.relu(self.fc(x))
class LogicNet(nn.Module):
def __init__(self):
super(LogicNet, self).__init__()
self.fc = nn.Linear(20, 30)
def forward(self, x):
return torch.relu(self.fc(x))
class MathNet(nn.Module):
def __init__(self):
super(MathNet, self).__init__()
self.fc = nn.Linear(30, 40)
def forward(self, x):
return torch.relu(self.fc(x))
# Общий класс для объединенной модели
class CombinedModel(nn.Module):
def __init__(self, gnn_params):
super(CombinedModel, self).__init__()
self.creative_net = CreativeNet()
self.logic_net = LogicNet()
self.math_net = MathNet()
self.gnn_model = GraphNeuralNetwork(gnn_params)
def forward(self, x, data):
# Обработка через сети
x = self.creative_net(x) # Первый этап - творчество
x = self.logic_net(x) # Второй этап - логика
x = self.math_net(x) # Третий этап - математика
# Обработка через графовую нейросеть
gnn_output = self.gnn_model(data)
return gnn_output # Возвращаем выход графовой нейросети
# Метод обучения логики с использованием Q-обучения и графовой нейросети
def logic_learning_with_q_ppo(dataset, num_alternatives=10):
# Гиперпараметры для линейной регрессии
lin_params = {
'fit_intercept': True,
'copy_X': True,
'n_jobs': -1,
'normalize': False, # Опция для нормализации данных, обработается в методе fit
}
# Гиперпараметры для графовой нейросети
gnn_params = {
'in_channels': 1,
'out_channels': 1,
'hidden_dim1': 16,
'hidden_dim2': 32,
'dropout': 0.2,
}
# Инициализация линейной регрессии и объединенной модели
lin_model = LinearRegressionModel(lin_params)
X, y = dataset[:, :-1], dataset[:, -1]
lin_model.fit(X, y)
base_solution = lin_model.predict(X)
q_ppo = QLearningWithPPO(state_dim=1, action_dim=num_alternatives)
# Создание графа
edge_index = torch.tensor([[0, 1], [1, 0]], dtype=torch.long)
x = torch.tensor(base_solution, dtype=torch.float).view(-1, 1)
data = Data(x=x, edge_index=edge_index)
combined_model = CombinedModel(gnn_params)
for i in range(num_alternatives):
state = torch.tensor([i], dtype=torch.float)
action = q_ppo.get_action(state)
# Тренировка графовой сети
combined_model.train()
optimizer_gnn = optim.Adam(combined_model.parameters(), lr=0.01)
for epoch in range(50):
optimizer_gnn.zero_grad()
out = combined_model(torch.tensor(X, dtype=torch.float), data) # Передача данных через объединенную модель
loss = F.mse_loss(out, x)
loss.backward()
optimizer_gnn.step()
solution = out.detach().numpy().flatten()
reward = -np.abs(base_solution - solution).sum()
next_state = torch.tensor([i + 1], dtype=torch.float)
q_ppo.update(state, action, reward, next_state)
print(f"Alternative solution {i + 1}: reward = {reward}")
return solution
# Пример данных
dataset = np.random.rand(100, 11) # Пример: 100 примеров с 10 признаками и 1 целевой переменной
solutions = logic_learning_with_q_ppo(dataset)
# Создание объединенной модели и пример данных для входа
combined_model = CombinedModel({
'in_channels': 1,
'out_channels': 1,
'hidden_dim1': 16,
'hidden_dim2': 32,
'dropout': 0.2,
})
input_data = torch.randn(1, 10)
dummy_data = Data(x=input_data.view(-1, 1), edge_index=torch.tensor([[0, 1], [1, 0]], dtype=torch.long))
output = combined_model(input_data, dummy_data) # Передача dummy_data для GNN
print("Output:", output)
import random
class QLearningWithPPO:
def init(self, state_dim, action_dim, lr=0.001, gamma=0.99, epsilon=0.1):
self.q_net = nn.Linear(state_dim, action_dim)
self.optimizer = optim.Adam(self.q_net.parameters(), lr=lr)
self.gamma = gamma
self.epsilon = epsilon # Для epsilon-greedy стратегии
self.memory = [] # Хранение опытов для обучения
def get_action(self, state):
if random.random() < self.epsilon: # Эpsilon-greedy стратегия
return random.randint(0, self.q_net.out_features - 1)
q_values = self.q_net(state)
action = torch.argmax(q_values).item()
return action
def update(self, batch_size):
if len(self.memory) < batch_size:
return
# Случайный выбор опыта из памяти
experiences = random.sample(self.memory, batch_size)
states, actions, rewards, next_states = zip(*experiences)
states = torch.stack(states)
actions = torch.tensor(actions)
rewards = torch.tensor(rewards)
next_states = torch.stack(next_states)
q_values = self.q_net(states)
q_next = self.q_net(next_states).detach()
target = rewards + self.gamma * torch.max(q_next, dim=1)[0]
loss = F.mse_loss(q_values.gather(1, actions.unsqueeze(1)), target.unsqueeze(1))
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
def store_experience(self, state, action, reward, next_state):
self.memory.append((state, action, reward, next_state))
# Метод обучения логики с использованием Q-обучения и графовой нейросети
def logic_learning_with_q_ppo(dataset, num_epochs=100, batch_size=32, num_alternatives=10):
# Гиперпараметры для линейной регрессии
lin_params = {
'fit_intercept': True,
'copy_X': True,
'n_jobs': -1,
'normalize': False,
}
# Инициализация моделей
lin_reg_model = LinearRegressionModel()
lin_reg_model.init(lin_params)
# Инициализация графовой нейросети и Q-обучения с PPO
gnn_params = {
'in_channels': data.num_node_features,
'hidden_dim1': 16,
'hidden_dim2': 8,
'out_channels': 4,
'dropout': 0.5,
}
gnn_model = GraphNeuralNetwork(gnn_params)
q_ppo = QLearningWithPPO(state_dim=10, action_dim=4) # Пример размерности состояния и действия
# Основной цикл обучения
for epoch in range(num_epochs):
state = torch.tensor(X, dtype=torch.float32) # Преобразуем X в тензор
action = q_ppo.get_action(state) # Получаем действие
reward = random.random() # Пример получения вознаграждения (это должно быть заменено на реальную логику)
next_state = state # В реальном сценарии next_state должен изменяться
# Сохраняем опыт
q_ppo.store_experience(state, action, reward, next_state)
# Обновляем модель каждые batch_size итераций
if (epoch + 1) % batch_size == 0:
q_ppo.update(batch_size)
# Вывод информации о текущем прогрессе
if (epoch + 1) % 10 == 0:
print(f"Эпоха {epoch + 1}/{num_epochs}, Вознаграждение: {reward:.4f}")
# Вывод предсказаний
print("Предсказания линейной регрессии:", y_pred)
print("Выход графовой нейросети:", gnn_model(data))
"""# ***GENERATIVE MODEL***"""
import torch
import torch.nn as nn
from stable_baselines3 import PPO
from gym import Env
from gym.spaces import Box, Discrete
from datasets import load_dataset
# Модель творчества
class CreativeNet(nn.Module):
def init(self):
super(CreativeNet, self).init()
self.fc = nn.Linear(32, 64)
def forward(self, x):
return torch.relu(self.fc(x))
# Модель логики
class LogicNet(nn.Module):
def init(self):
super(LogicNet, self).init()
self.fc = nn.Linear(64, 128)
def forward(self, x):
return torch.relu(self.fc(x))
# Математическая модель
class MathNet(nn.Module):
def init(self):
super(MathNet, self).init()
self.fc = nn.Linear(128, 32)
def forward(self, x):
return torch.relu(self.fc(x))
# Объединённая модель
class CombinedModel(nn.Module):
def init(self):
super(CombinedModel, self).init()
self.creative_net = CreativeNet()
self.logic_net = LogicNet()
self.math_net = MathNet()
def forward(self, x):
x = self.creative_net(x)
x = self.logic_net(x)
x = self.math_net(x)
return x
# Среда обучения
class CustomEnv(Env):
def init(self, model, dataset):
super(CustomEnv, self).init()
self.model = model
self.dataset = dataset
self.action_space = Discrete(3)
self.observation_space = Box(low=0, high=1, shape=(32,), dtype=torch.float32)
self.current_index = 0
def reset(self):
self.current_index = 0
return self._get_observation()
def step(self, action):
observation = self._get_observation()
input_tensor = torch.tensor(observation, dtype=torch.float32).unsqueeze(0)
model_output = self.model(input_tensor).squeeze(0).detach().numpy()
# Логика вознаграждения на основе действий и вывода модели
reward = -1 if action != model_output.argmax() else 1
self.current_index += 1
done = self.current_index >= len(self.dataset)
return observation, reward, done, {}
def _get_observation(self):
return torch.tensor(self.dataset[self.current_index], dtype=torch.float32).numpy()
# Загрузка датасета
keyword = "/content/dataset/anthropic_hh_golden-test.arrow"
dataset = load_dataset(keyword)["train"][:10]
input_size = len(dataset[0])
# Инициализация модели и среды
model = CombinedModel()
env = CustomEnv(model, dataset)
# Настройка PPO
ppo_model = PPO("MlpPolicy", env, verbose=1)
ppo_model.learn(total_timesteps=10000)
# Генерация ответа
def generate_response(model, input_data):
input_tensor = torch.tensor(input_data, dtype=torch.float32).unsqueeze(0)
output = model(input_tensor).squeeze(0).detach().numpy()
return output.argmax()
# Пример генерации
example_input = torch.rand(32).numpy()
generated_response = generate_response(model, example_input)
print("Generated response:", generated_response)
import torch
import torch.nn as nn
import torch.optim as optim
from torch_geometric.data import Data
from datasets import load_dataset
from sklearn.preprocessing import StandardScaler
import numpy as np
from tqdm import tqdm
# Универсальная объединённая модель
class CombinedModel(nn.Module):
def init(self, input_size, hidden_size, output_size):
super(CombinedModel, self).init()
# Энкодер на основе LSTM
self.encoder = nn.LSTM(input_size, hidden_size, batch_first=True)
# Графовая нейросеть
self.gnn_fc1 = nn.Linear(hidden_size, 64)
self.gnn_fc2 = nn.Linear(64, hidden_size)
# Декодер на основе LSTM
self.decoder = nn.LSTM(hidden_size, output_size, batch_first=True)
def forward(self, x, edge_index=None):
# Энкодинг последовательностей
x, _ = self.encoder(x)
x = x[:, -1, :] # Используем последний выход
# Обработка через GNN
x = torch.relu(self.gnn_fc1(x))
x = torch.relu(self.gnn_fc2(x))
# Декодинг
x = x.unsqueeze(1).repeat(1, 10, 1) # Растягиваем для LSTM-декодера
x, _ = self.decoder(x)
return x
# Функция загрузки и предобработки датасета
def preprocess_dataset(dataset_name):
dataset = load_dataset(dataset_name)
# Преобразуем данные в numpy (или используем ваш подход)
if 'train' in dataset:
data = np.array(dataset['train'])
else:
data = np.array(dataset['data'])
# Масштабирование
scaler = StandardScaler()
scaled_data = scaler.fit_transform(data)
return torch.tensor(scaled_data, dtype=torch.float32)
# Обучение модели
def train_model(model, dataset, epochs=10, batch_size=32, learning_rate=1e-4):
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
loss_fn = nn.MSELoss()
dataloader = torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)
for epoch in range(epochs):
total_loss = 0
for batch in tqdm(dataloader, desc=f"Epoch {epoch + 1}/{epochs}"):
inputs = batch[:, :-1].unsqueeze(1) # Последний столбец - целевая переменная
targets = batch[:, -1].unsqueeze(1)
optimizer.zero_grad()
outputs = model(inputs)
loss = loss_fn(outputs.squeeze(), targets)
loss.backward()
optimizer.step()
total_loss += loss.item()
print(f"Loss: {total_loss / len(dataloader):.4f}")
# Пример использования
if name == "main":
# Параметры модели
input_size = 10
hidden_size = 128
output_size = 1
# Инициализация модели
model = CombinedModel(input_size, hidden_size, output_size)
# Загрузка и предобработка датасета
dataset_name = "your_dataset_name" # Укажите название датасета
dataset = preprocess_dataset(dataset_name)
# Обучение модели
train_model(model, dataset)
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from stable_baselines3 import PPO
from gym import Env
from gym.spaces import Box, Discrete
import pandas as pd
import os
# *ШАГ 1: Объединение трех моделей в одну*
class UnifiedModel(nn.Module):
def init(self, input_size, hidden_size, output_size):
super(UnifiedModel, self).init()
# LSTM-энкодер
self.encoder = nn.LSTM(input_size, hidden_size, batch_first=True)
# Логическая модель
self.logic_fc = nn.Sequential(
nn.Linear(hidden_size, hidden_size * 2),
nn.ReLU(),
nn.Linear(hidden_size * 2, hidden_size)
)
# LSTM-декодер
self.decoder = nn.LSTM(hidden_size, output_size, batch_first=True)
def forward(self, x):
# Проходим через LSTM-энкодер
x, _ = self.encoder(x)
x = x[:, -1, :] # Используем последний выход
# Пропускаем через логическую модель
x = self.logic_fc(x)
# Добавляем измерение времени для декодера
x = x.unsqueeze(1).repeat(1, 10, 1)
x, _ = self.decoder(x)
return x
# *ШАГ 2: Работа с файлами любого формата*
def load_dataset(file_path):
# Определяем тип файла
ext = os.path.splitext(file_path)[1].lower()
if ext == '.csv':
data = pd.read_csv(file_path)
elif ext in ['.xls', '.xlsx']:
data = pd.read_excel(file_path)
elif ext == '.json':
data = pd.read_json(file_path)
elif ext == '.txt':
data = pd.read_csv(file_path, delimiter='\t')
else:
raise ValueError("Формат файла не поддерживается")
# Преобразуем в NumPy массив и масштабируем
data = data.select_dtypes(include=[np.number]).dropna() # Оставляем только числовые данные
return torch.tensor(data.values, dtype=torch.float32)
# *ШАГ 3: RLCF и PPO обучение*
# Класс среды для PPO
class CustomEnv(Env):
def init(self, data):
super(CustomEnv, self).init()
self.data = data
self.current_step = 0
self.action_space = Discrete(3) # Пример: 3 действия
self.observation_space = Box(low=-np.inf, high=np.inf, shape=(data.shape[1],), dtype=np.float32)
def reset(self):
self.current_step = 0
return self.data[self.current_step]
def step(self, action):
self.current_step += 1
reward = np.random.random() # Пример: случайная награда
done = self.current_step >= len(self.data)
return self.data[self.current_step % len(self.data)], reward, done, {}
# Функция RLCF
def train_with_rlcf(data):
# Random Forest классификатор
rf = RandomForestClassifier(n_estimators=100)
X, y = data[:, :-1], data[:, -1]
rf.fit(X, y)
feature_importances = rf.feature_importances_
return feature_importances
# Функция PPO обучения
def train_with_ppo(data):
env = CustomEnv(data)
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=10000)
return model
# *ШАГ 4: Интеграция с Google Colab*
def main():
# Ввод пути к файлу
file_path = input("Введите путь к файлу в директории Google Colab: ")
# Загрузка датасета
data = load_dataset(file_path)
print(f"Датасет загружен! Размер: {data.shape}")
# Инициализация модели
input_size = data.shape[1] - 1 # Предполагаем, что последний столбец - целевая переменная
hidden_size = 128
output_size = 1
model = UnifiedModel(input_size, hidden_size, output_size)
# RLCF обучение
feature_importances = train_with_rlcf(data)
print("Feature Importances (RLCF):", feature_importances)
# PPO обучение
ppo_model = train_with_ppo(data)
print("PPO обучение завершено!")
import os
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from stable_baselines3 import PPO
from gym import Env
from gym.spaces import Box, Discrete
import pandas as pd
# *ШАГ 1: Объединение трех моделей в одну*
# Модель творчества
class CreativeNet(nn.Module):
def init(self):
super(CreativeNet, self).init()
self.fc = nn.Linear(32, 64)
def forward(self, x):
return torch.relu(self.fc(x))
# Модель логики
class LogicNet(nn.Module):
def init(self):
super(LogicNet, self).init()
self.fc = nn.Linear(64, 128)
def forward(self, x):
return torch.relu(self.fc(x))
# Математическая модель
class MathNet(nn.Module):
def init(self):
super(MathNet, self).init()
self.fc = nn.Linear(128, 32)
def forward(self, x):
return torch.relu(self.fc(x))
# Объединённая модель
class CombinedModel(nn.Module):
def init(self):
super(CombinedModel, self).init()
self.creative_net = CreativeNet()
self.logic_net = LogicNet()
self.math_net = MathNet()
def forward(self, x):
x = self.creative_net(x)
x = self.logic_net(x)
x = self.math_net(x)
return x
# *ШАГ 2: Загрузка данных из файла любого формата*
def load_dataset(file_path):
try:
if file_path.endswith(('.csv', '.txt')):
data = pd.read_csv(file_path)
elif file_path.endswith(('.xls', '.xlsx')):
data = pd.read_excel(file_path)
elif file_path.endswith('.json'):
data = pd.read_json(file_path)
else:
raise ValueError(f"Формат файла {file_path} не поддерживается.")
# Оставляем только числовые данные и преобразуем их в Tensor
data = data.select_dtypes(include=[np.number]).dropna()
return torch.tensor(data.values, dtype=torch.float32)
except Exception as e:
print(f"Ошибка при загрузке файла: {e}")
return None
# *ШАГ 3: RLCF и PPO обучение*
# Класс среды для PPO
class CustomEnv(Env):
def init(self, data):
super(CustomEnv, self).init()
self.data = data
self.current_step = 0
self.action_space = Discrete(3)
self.observation_space = Box(low=-np.inf, high=np.inf, shape=(data.shape[1],), dtype=np.float32)
def reset(self):
self.current_step = 0
return self.data[self.current_step]
def step(self, action):
self.current_step += 1
reward = np.random.random() # Пример: случайная награда
done = self.current_step >= len(self.data)
return self.data[self.current_step % len(self.data)], reward, done, {}
# Функция RLCF
def train_with_rlcf(data):
rf = RandomForestClassifier(n_estimators=100)
X, y = data[:, :-1], data[:, -1]
rf.fit(X, y)
return rf.feature_importances_
# Класс среды для PPO
class CustomEnv(Env):
def init(self, data):
super(CustomEnv, self).init()
self.data = data.numpy() # Преобразуем данные в numpy
self.current_step = 0
self.action_space = Discrete(3) # Пример: 3 возможных действия
self.observation_space = Box(
low=-np.inf, high=np.inf, shape=(self.data.shape[1] - 1,), dtype=np.float32
)
def reset(self):
# Сбрасываем текущий шаг
self.current_step = 0
# Возвращаем первое наблюдение (все признаки кроме последнего, который мы предполагаем как целевую переменную)
return self.data[self.current_step, :-1].astype(np.float32)
def step(self, action):
# Генерация случайной награды на основе действия (пример)
reward = float(np.random.random())
# Переход к следующему шагу
self.current_step += 1
# Проверяем, завершён ли эпизод
done = self.current_step >= len(self.data)
# Возвращаем следующее наблюдение, награду, статус завершения и пустой словарь информации
obs = self.data[self.current_step % len(self.data), :-1].astype(np.float32)
return obs, reward, done, {}
def main():
# Ввод пути к файлу
file_path = input("Введите путь к вашему файлу в Google Colab: ").strip()
data = load_dataset(file_path)
if data is None:
print("Не удалось загрузить датасет.")
return
print(f"Датасет успешно загружен! Размер данных: {data.shape}")
# Инициализация объединённой модели
model = CombinedModel()
print("Объединённая модель создана!")
# RLCF обучение
feature_importances = train_with_rlcf(data)
print("Feature Importances (RLCF):", feature_importances)
# PPO обучение
ppo_model = train_with_ppo(data)
print("PPO обучение завершено!")
# Запуск
main()