--- license: gpl-3.0 pipeline_tag: graph-ml tags: - code --- --- import contextlib import os from matplotlib import pyplot as plt import numpy as np import torch import torch.nn as nn import torch.optim as optim import requests from torchvision import datasets, transforms import psutil import time import subprocess import onnxruntime as ort import matplotlib.pyplot as plt import numpy as np import numexpr as ne from transformers import AutoTokenizer, AutoModelForSeq2SeqLM tokenizer = AutoTokenizer.from_pretrained("janpase97/codeformer-pretrained") model = AutoModelForSeq2SeqLM.from_pretrained("janpase97/codeformer-pretrained") def check_graphics_api(target_app_name): graphics_api = None with contextlib.suppress(subprocess.CalledProcessError): output = subprocess.check_output(['tasklist', '/FI', f'imagename eq {target_app_name}', '/M']).decode('utf-8') if "opengl32.dll" in output: graphics_api = "OpenGL" elif "d3d11.dll" in output: graphics_api = "DirectX11" elif "d3d12.dll" in output: graphics_api = "DirectX12" elif "vulkan" in output: graphics_api = "VULKAN" return graphics_api # Get the target application's process object def get_target_app_process(target_app_name): return next( ( process for process in psutil.process_iter(['name']) if process.info['name'] == target_app_name ), None, ) # Attach the AI to the application's process by PID def attach_ai_to_app_pid(target_app_process): if target_app_process is not None: print(f"AI is attached to the application's process with PID: {target_app_process.pid}") return True else: print("Could not find the target application's process to attach the AI.") return False # Check if the targeted application is running def is_target_app_running(target_app_name): return any( process.info['name'] == target_app_name for process in psutil.process_iter(['name']) ) # Create the directory if it doesn't exist directory = r"G:\Epic Games\GTAV\GTA5_AI\trained_models" if not os.path.exists(directory): os.makedirs(directory) # Define the neural network model class NanoCircuit(nn.Module): def __init__(self): super(NanoCircuit, self).__init__() self.fc1 = nn.Linear(784, 128) self.fc2 = nn.Linear(128, 10) def forward(self, x): x = x.view(-1, 784) # Reshape the input from (batch_size, 28, 28) to (batch_size, 784) x = torch.relu(self.fc1(x)) x = self.fc2(x) return x # Set the device to GPU if available device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") # Load the MNIST dataset transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))]) train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform) train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=64, shuffle=True) # Initialize the model and move it to the GPU model = NanoCircuit().to(device) criterion = nn.CrossEntropyLoss() optimizer = optim.SGD(model.parameters(), lr=0.01, momentum=0.9) # Train the model on the GPU with a data cap def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): data_processed = 0 data_cap_bytes = data_cap_gb * (1024 ** 3) epoch = 0 while data_processed < data_cap_bytes: running_loss = 0.0 for i, data in enumerate(data_loader, 0): inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) # Update the amount of data processed data_processed += inputs.nelement() * inputs.element_size() if data_processed >= data_cap_bytes: break optimizer.zero_grad() outputs = model(inputs.view(-1, 28 * 28)) loss = criterion(outputs, labels) loss.backward() optimizer.step() running_loss += loss.item() epoch += 1 print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}") print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB") return model # Save the updated model as a .onnx file def save_model(model, filepath): dummy_input = torch.randn(1, 1, 28, 28).to(device) torch.onnx.export(model, dummy_input, filepath, input_names=['input'], output_names=['output'], opset_version=11) # Train the model with a 1 GB data cap trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=50) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) target_app_name = "GTA5_TRAINED.exe" save_interval_seconds = 5 * 60 application_was_running = False while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=.1) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) application_was_running = True elif application_was_running: print("Target application has exited. Saving the model...") save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) print("Finished training and saved the model.") break else: print("Target application is not running. Waiting to start training and updating the model...") time.sleep(save_interval_seconds) def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): data_processed = 0 data_cap_bytes = data_cap_gb * (1024 ** 3) epoch = 0 while data_processed < data_cap_bytes: running_loss = 0.0 for i, data in enumerate(data_loader, 0): inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) # Update the amount of data processed data_processed += inputs.nelement() * inputs.element_size() if data_processed >= data_cap_bytes: break optimizer.zero_grad() # Compute the outputs and loss using numexpr outputs = model(inputs.view(-1, 28 * 28)) outputs = outputs.cpu().detach().numpy() labels = labels.cpu().detach().numpy() loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels) # Backpropagate and update the model parameters ne.evaluate("loss", out=loss) grad_outputs = np.ones_like(outputs) grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1 grad_outputs /= len(labels) grad_outputs = ne.evaluate("grad_outputs * loss_grad") grad_outputs = torch.from_numpy(grad_outputs).to(device) outputs = torch.from_numpy(outputs).to(device) loss.backward(grad_outputs) optimizer.step() running_loss += loss.item() epoch += 1 print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}") print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB") return model # Train the model with a 10 GB data cap trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) target_app_name = "GTA5.exe" save_interval_seconds = 5 * 60 application_was_running = False while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, os.device_encoding, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) application_was_running = True elif application_was_running: print("Target application has exited. Saving the model...") save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) print("Finished training and saved the model.") break else: print("Target application is not running. Waiting to start training and updating the model...") time.sleep(save_interval_seconds) def train_with_data_cap(model, data_loader, criterion, optimizer, device, data_cap_gb): data_processed = 0 data_cap_bytes = data_cap_gb * (1024 ** 3) epoch = 0 while data_processed < data_cap_bytes: running_loss = 0.0 for i, data in enumerate(data_loader, 0): inputs, labels = data inputs, labels = inputs.to(device), labels.to(device) # Update the amount of data processed data_processed += inputs.nelement() * inputs.element_size() if data_processed >= data_cap_bytes: break optimizer.zero_grad() # Compute the outputs and loss using numexpr outputs = model(inputs.view(-1, 28 * 28)) outputs = outputs.cpu().detach().numpy() labels = labels.cpu().detach().numpy() loss = ne.evaluate("sum(-log(outputs[arange(outputs.shape[0]), labels]))") / len(labels) # Backpropagate and update the model parameters ne.evaluate("loss", out=loss) grad_outputs = np.ones_like(outputs) grad_outputs[np.arange(grad_outputs.shape[0]), labels] = -1 grad_outputs /= len(labels) grad_outputs = ne.evaluate("grad_outputs * loss_grad") grad_outputs = torch.from_numpy(grad_outputs).to(device) outputs = torch.from_numpy(outputs).to(device) loss.backward(grad_outputs) optimizer.step() running_loss += loss.item() epoch += 1 print(f"Epoch {epoch}, Loss: {running_loss / (i + 1)}") print(f"Data processed: {data_processed / (1024 ** 3):.2f} GB") return model target_app_name = "GTA5.exe" save_interval_seconds = 1 * 60 application_was_running = False while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) application_was_running = True elif application_was_running: print("Target application has exited. Saving the model...") save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) print("Finished training and saved the model.") break else: start_time = time.time() print("Target application is not running. Waiting to detect the graphics API...") while (time.time() - start_time) < 5: if is_target_app_running(target_app_name): if graphics_api := check_graphics_api(target_app_name): print(f"Detected {graphics_api} in the target application.") break else: print("Could not detect the graphics API used in the target application.") time.sleep(1) if not is_target_app_running(target_app_name): print("Target application not detected in 5 seconds. Shutting down the AI.") break while True: if is_target_app_running(target_app_name): if graphics_api := check_graphics_api(target_app_name): print(f"Detected {graphics_api} in the target application.") else: print("Could not detect the graphics API used in the target application.") else: start_time = time.time() print("Target application is not running. Waiting to start training and updating the model...") while (time.time() - start_time) < 5: if is_target_app_running(target_app_name): print(f"Detected {graphics_api} in the target application.") break time.sleep(1) if not is_target_app_running(target_app_name): print("Target application not detected in 5 seconds. Shutting down the AI.") break #Generate some random data for the boxplots np.random.seed(0) original_data = np.random.normal(0, 1, 100) trained_data = np.random.normal(0.5, 1, 100) while True: if is_target_app_running(target_app_name): print("Target application is running. Training and updating the model...") trained_model = train_with_data_cap(model, train_loader, criterion, optimizer, device, data_cap_gb=10) save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) # Create a box plot of the original and trained data plt.figure() plt.boxplot([original_data, trained_data], labels=["Original Data", "Trained Data"]) plt.title("Boxplot of Original and Trained Data") plt.ylabel("Values") plt.show() # Save the box plot as an image plt.savefig(r"G:\Epic Games\GTAV\GTA5_AI\Plot Box Comparison\boxplot_comparison.png") application_was_running = True elif application_was_running: print("Target application has exited. Saving the model...") save_model(trained_model, os.path.join(directory, 'GTA5_TRAINED.onnx')) print("Finished training and saved the model.") break else: start_time = time.time() print("Target application is not running. Waiting to detect the graphics API...") while (time.time() - start_time) < 5: if is_target_app_running(target_app_name): if graphics_api := check_graphics_api(target_app_name): print(f"Detected {graphics_api} in the target application.") break else: print("Could not detect the graphics API used in the target application.") time.sleep(1) if not is_target_app_running(target_app_name): print("Target application not detected in 5 seconds. Shutting down the AI.") break