import torch import torch.optim as optim import torch.nn as nn import torch.nn.functional as F import numpy as np from collections import deque import random import matplotlib.pyplot as plt import matplotlib.animation as animation import heapq # For the A* algorithm from huggingface_hub import HfApi, HfFolder # Hugging Face API # Function to generate a floorplan def generate_floorplan(size=10, obstacle_density=0.2): floorplan = [[0 for _ in range(size)] for _ in range(size)] target_x, target_y = size - 1, size - 1 floorplan[target_x][target_y] = 2 # Mark target position num_obstacles = int(size * size * obstacle_density) for _ in range(num_obstacles): x = random.randint(0, size - 1) y = random.randint(0, size - 1) if floorplan[x][y] == 0 and (x, y) != (0, 0): floorplan[x][y] = 1 # Mark obstacle return floorplan, target_x, target_y def a_star(floorplan, start, goal): size = len(floorplan) open_set = [] heapq.heappush(open_set, (0, start)) came_from = {} g_score = {start: 0} f_score = {start: heuristic(start, goal)} while open_set: _, current = heapq.heappop(open_set) if current == goal: return reconstruct_path(came_from, current) neighbors = get_neighbors(current, size) for neighbor in neighbors: if floorplan[neighbor[0]][neighbor[1]] == 1: continue # Ignore obstacles tentative_g_score = g_score[current] + 1 if neighbor not in g_score or tentative_g_score < g_score[neighbor]: came_from[neighbor] = current g_score[neighbor] = tentative_g_score f_score[neighbor] = g_score[neighbor] + heuristic(neighbor, goal) heapq.heappush(open_set, (f_score[neighbor], neighbor)) return [] def heuristic(a, b): return abs(a[0] - b[0]) + abs(a[1] - b[1]) def get_neighbors(pos, size): neighbors = [] x, y = pos if x > 0: neighbors.append((x - 1, y)) if x < size - 1: neighbors.append((x + 1, y)) if y > 0: neighbors.append((x, y - 1)) if y < size - 1: neighbors.append((x, y + 1)) return neighbors def reconstruct_path(came_from, current): path = [current] while current in came_from: current = came_from[current] path.append(current) return path[::-1] class Environment: def __init__(self, size=10, obstacle_density=0.2): self.size = size self.floorplan, self.target_x, self.target_y = generate_floorplan(size, obstacle_density) self.robot_x = 0 self.robot_y = 0 def reset(self): while True: self.robot_x = random.randint(0, self.size - 1) self.robot_y = random.randint(0, self.size - 1) if self.floorplan[self.robot_x][self.robot_y] == 0: break return self.get_cnn_state() def step(self, action): new_x, new_y = self.robot_x, self.robot_y if action == 0: # Up new_x = max(self.robot_x - 1, 0) elif action == 1: # Down new_x = min(self.robot_x + 1, self.size - 1) elif action == 2: # Left new_y = max(self.robot_y - 1, 0) elif action == 3: # Right new_y = min(self.robot_y + 1, self.size - 1) # Check if the new position is an obstacle if self.floorplan[new_x][new_y] != 1: self.robot_x, self.robot_y = new_x, new_y done = (self.robot_x == self.target_x and self.robot_y == self.target_y) reward = self.get_reward(self.robot_x, self.robot_y) next_state = self.get_cnn_state() info = {} return next_state, reward, done, info def get_reward(self, robot_x, robot_y): if self.floorplan[robot_x][robot_y] == 1: return -5 # Penalty for hitting an obstacle elif robot_x == self.target_x and robot_y == self.target_y: return 10 # Reward for reaching the target else: return -0.1 # Penalty for each step def get_cnn_state(self): grid = [row[:] for row in self.floorplan] grid[self.robot_x][self.robot_y] = 3 # Mark the robot's current position return np.array(grid).flatten() def render(self, path=None): grid = np.array(self.floorplan) fig, ax = plt.subplots() ax.set_xticks(np.arange(-0.5, self.size, 1)) ax.set_yticks(np.arange(-0.5, self.size, 1)) ax.grid(which='major', color='k', linestyle='-', linewidth=1) ax.tick_params(which='both', bottom=False, left=False, labelbottom=False, labelleft=False) def update(i): ax.clear() ax.imshow(grid, cmap='Greys', interpolation='nearest') if path: x, y = path[i] ax.plot(y, x, 'bo') # Draw robot's path plt.draw() ani = animation.FuncAnimation(fig, update, frames=len(path), repeat=False) plt.show() class DQN(nn.Module): def __init__(self, input_size, hidden_sizes, output_size): super(DQN, self).__init__() self.input_size = input_size self.hidden_sizes = hidden_sizes self.output_size = output_size self.fc_layers = nn.ModuleList() prev_size = input_size for size in hidden_sizes: self.fc_layers.append(nn.Linear(prev_size, size)) prev_size = size self.output_layer = nn.Linear(prev_size, output_size) def forward(self, x): if len(x.shape) > 2: x = x.view(x.size(0), -1) for layer in self.fc_layers: x = F.relu(layer(x)) x = self.output_layer(x) return x def choose_action(self, state): with torch.no_grad(): state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0) q_values = self(state_tensor) action = q_values.argmax().item() return action class ReplayBuffer: def __init__(self, capacity): self.buffer = deque(maxlen=capacity) def push(self, state, action, reward, next_state, done): self.buffer.append((state, action, reward, next_state, done)) def sample(self, batch_size): batch = random.sample(self.buffer, batch_size) states, actions, rewards, next_states, dones = zip(*batch) return states, actions, rewards, next_states, dones def __len__(self): return len(self.buffer) # Function to save the model checkpoint def save_checkpoint(state, filename="checkpoint.pth.tar"): torch.save(state, filename) # Function to load the model checkpoint def load_checkpoint(filename): checkpoint = torch.load(filename) return checkpoint # Training the DQN env = Environment() input_size = env.size * env.size # Flattened grid size hidden_sizes = [64, 64] # Hidden layer sizes output_size = 4 # Number of actions (up, down, left, right) dqn = DQN(input_size, hidden_sizes, output_size) dqn_target = DQN(input_size, hidden_sizes, output_size) dqn_target.load_state_dict(dqn.state_dict()) optimizer = optim.Adam(dqn.parameters(), lr=0.001) scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5) replay_buffer = ReplayBuffer(10000) num_episodes = 50 batch_size = 64 gamma = 0.99 target_update_freq = 100 checkpoint_freq = 10 # Save checkpoint every 10 episodes losses = [] for episode in range(num_episodes): state = env.reset() total_reward = 0 done = False # Integrate A* guidance for initial exploration initial_path = a_star(env.floorplan, (env.robot_x, env.robot_y), (env.target_x, env.target_y)) path_index = 0 while not done: epsilon = max(0.01, 0.2 - 0.01 * (episode / 2)) if np.random.rand() < epsilon: if initial_path and path_index < len(initial_path): next_pos = initial_path[path_index] if next_pos[0] < env.robot_x: action = 0 # Up elif next_pos[0] > env.robot_x: action = 1 # Down elif next_pos[1] < env.robot_y: action = 2 # Left else: action = 3 # Right path_index += 1 else: action = np.random.randint(output_size) else: state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0) with torch.no_grad(): q_values = dqn(state_tensor) action = q_values.argmax().item() next_state, reward, done, _ = env.step(action) replay_buffer.push(state, action, reward, next_state, done) if len(replay_buffer) > batch_size: states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size) states = torch.tensor(states, dtype=torch.float32) actions = torch.tensor(actions, dtype=torch.int64) rewards = torch.tensor(rewards, dtype=torch.float32) next_states = torch.tensor(next_states, dtype=torch.float32) dones = torch.tensor(dones, dtype=torch.float32) q_values = dqn(states) q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1) with torch.no_grad(): next_q_values = dqn(next_states) next_q_values = next_q_values.max(1)[0] target_q_values = rewards + (1 - dones) * gamma * next_q_values loss = F.smooth_l1_loss(q_values, target_q_values) optimizer.zero_grad() loss.backward() optimizer.step() losses.append(loss.item()) total_reward += reward state = next_state if episode % target_update_freq == 0: dqn_target.load_state_dict(dqn.state_dict()) scheduler.step() # Save checkpoints if episode % checkpoint_freq == 0 or episode == num_episodes - 1: checkpoint = { 'episode': episode + 1, 'state_dict': dqn.state_dict(), 'optimizer': optimizer.state_dict(), 'losses': losses } save_checkpoint(checkpoint, f'checkpoint_{episode + 1}.pth.tar') print(f"Episode {episode + 1}: Total Reward = {total_reward}, Loss = {np.mean(losses[-batch_size:]) if losses else None}") # Save the final model torch.save(dqn.state_dict(), 'dqn_model.pth') # Load the trained model dqn = DQN(input_size, hidden_sizes, output_size) dqn.load_state_dict(torch.load('dqn_model.pth')) dqn.eval() # Simulate the bot's path using the trained DQN agent state = env.reset() done = False path = [(env.robot_x, env.robot_y)] while not done: state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0) with torch.no_grad(): q_values = dqn(state_tensor) action = q_values.argmax().item() # Choose action from the trained DQN next_state, reward, done, _ = env.step(action) path.append((env.robot_x, env.robot_y)) state = next_state # Render the environment and the bot's path env.render(path) # Evaluate trained DQN def evaluate_agent(env, agent, num_episodes=5): total_rewards = 0 successful_episodes = 0 for episode in range(num_episodes): state = env.reset() episode_reward = 0 done = False while not done: action = agent.choose_action(state) next_state, reward, done, _ = env.step(action) episode_reward += reward state = next_state total_rewards += episode_reward if episode_reward > 0: successful_episodes += 1 avg_reward = total_rewards / num_episodes success_rate = successful_episodes / num_episodes print("Evaluation Results:") print(f"Average Reward: {avg_reward}") print(f"Success Rate: {success_rate}") return avg_reward, success_rate # Call the evaluation function after rendering avg_reward, success_rate = evaluate_agent(env, dqn, num_episodes=5) # Upload the model to Hugging Face # Authenticate with Hugging Face API api = HfApi() api_token = HfFolder.get_token() # Ensure you have logged in with `huggingface-cli login` # Create a model repository if it doesn't exist model_repo = 'cajcodes/dqn-floorplan-finder' api.create_repo(repo_id=model_repo, exist_ok=True) # Upload the model api.upload_file( path_or_fileobj='dqn_model.pth', path_in_repo='dqn_model.pth', repo_id=model_repo )