File size: 12,585 Bytes
b1581fb
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
import torch
import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
from collections import deque
import random
import matplotlib.pyplot as plt
import matplotlib.animation as animation
import heapq  # For the A* algorithm
from huggingface_hub import HfApi, HfFolder  # Hugging Face API

# Function to generate a floorplan
def generate_floorplan(size=10, obstacle_density=0.2):
    floorplan = [[0 for _ in range(size)] for _ in range(size)]
    target_x, target_y = size - 1, size - 1
    floorplan[target_x][target_y] = 2  # Mark target position
    num_obstacles = int(size * size * obstacle_density)
    for _ in range(num_obstacles):
        x = random.randint(0, size - 1)
        y = random.randint(0, size - 1)
        if floorplan[x][y] == 0 and (x, y) != (0, 0):
            floorplan[x][y] = 1  # Mark obstacle
    return floorplan, target_x, target_y

def a_star(floorplan, start, goal):
    size = len(floorplan)
    open_set = []
    heapq.heappush(open_set, (0, start))
    came_from = {}
    g_score = {start: 0}
    f_score = {start: heuristic(start, goal)}

    while open_set:
        _, current = heapq.heappop(open_set)

        if current == goal:
            return reconstruct_path(came_from, current)

        neighbors = get_neighbors(current, size)
        for neighbor in neighbors:
            if floorplan[neighbor[0]][neighbor[1]] == 1:
                continue  # Ignore obstacles

            tentative_g_score = g_score[current] + 1

            if neighbor not in g_score or tentative_g_score < g_score[neighbor]:
                came_from[neighbor] = current
                g_score[neighbor] = tentative_g_score
                f_score[neighbor] = g_score[neighbor] + heuristic(neighbor, goal)
                heapq.heappush(open_set, (f_score[neighbor], neighbor))

    return []

def heuristic(a, b):
    return abs(a[0] - b[0]) + abs(a[1] - b[1])

def get_neighbors(pos, size):
    neighbors = []
    x, y = pos
    if x > 0:
        neighbors.append((x - 1, y))
    if x < size - 1:
        neighbors.append((x + 1, y))
    if y > 0:
        neighbors.append((x, y - 1))
    if y < size - 1:
        neighbors.append((x, y + 1))
    return neighbors

def reconstruct_path(came_from, current):
    path = [current]
    while current in came_from:
        current = came_from[current]
        path.append(current)
    return path[::-1]

class Environment:
    def __init__(self, size=10, obstacle_density=0.2):
        self.size = size
        self.floorplan, self.target_x, self.target_y = generate_floorplan(size, obstacle_density)
        self.robot_x = 0
        self.robot_y = 0

    def reset(self):
        while True:
            self.robot_x = random.randint(0, self.size - 1)
            self.robot_y = random.randint(0, self.size - 1)
            if self.floorplan[self.robot_x][self.robot_y] == 0:
                break
        return self.get_cnn_state()

    def step(self, action):
        new_x, new_y = self.robot_x, self.robot_y
        
        if action == 0:  # Up
            new_x = max(self.robot_x - 1, 0)
        elif action == 1:  # Down
            new_x = min(self.robot_x + 1, self.size - 1)
        elif action == 2:  # Left
            new_y = max(self.robot_y - 1, 0)
        elif action == 3:  # Right
            new_y = min(self.robot_y + 1, self.size - 1)

        # Check if the new position is an obstacle
        if self.floorplan[new_x][new_y] != 1:
            self.robot_x, self.robot_y = new_x, new_y

        done = (self.robot_x == self.target_x and self.robot_y == self.target_y)
        reward = self.get_reward(self.robot_x, self.robot_y)
        next_state = self.get_cnn_state()
        info = {}
        return next_state, reward, done, info

    def get_reward(self, robot_x, robot_y):
        if self.floorplan[robot_x][robot_y] == 1:
            return -5  # Penalty for hitting an obstacle
        elif robot_x == self.target_x and robot_y == self.target_y:
            return 10  # Reward for reaching the target
        else:
            return -0.1  # Penalty for each step

    def get_cnn_state(self):
        grid = [row[:] for row in self.floorplan]
        grid[self.robot_x][self.robot_y] = 3  # Mark the robot's current position
        return np.array(grid).flatten()

    def render(self, path=None):
        grid = np.array(self.floorplan)
        fig, ax = plt.subplots()
        ax.set_xticks(np.arange(-0.5, self.size, 1))
        ax.set_yticks(np.arange(-0.5, self.size, 1))
        ax.grid(which='major', color='k', linestyle='-', linewidth=1)
        ax.tick_params(which='both', bottom=False, left=False, labelbottom=False, labelleft=False)
        
        def update(i):
            ax.clear()
            ax.imshow(grid, cmap='Greys', interpolation='nearest')
            if path:
                x, y = path[i]
                ax.plot(y, x, 'bo')  # Draw robot's path
            plt.draw()

        ani = animation.FuncAnimation(fig, update, frames=len(path), repeat=False)
        plt.show()

class DQN(nn.Module):
    def __init__(self, input_size, hidden_sizes, output_size):
        super(DQN, self).__init__()
        self.input_size = input_size
        self.hidden_sizes = hidden_sizes
        self.output_size = output_size

        self.fc_layers = nn.ModuleList()
        prev_size = input_size
        for size in hidden_sizes:
            self.fc_layers.append(nn.Linear(prev_size, size))
            prev_size = size
        self.output_layer = nn.Linear(prev_size, output_size)

    def forward(self, x):
        if len(x.shape) > 2:
            x = x.view(x.size(0), -1)
        for layer in self.fc_layers:
            x = F.relu(layer(x))
        x = self.output_layer(x)
        return x

    def choose_action(self, state):
        with torch.no_grad():
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            q_values = self(state_tensor)
            action = q_values.argmax().item()
        return action

class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        batch = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*batch)
        return states, actions, rewards, next_states, dones

    def __len__(self):
        return len(self.buffer)

# Function to save the model checkpoint
def save_checkpoint(state, filename="checkpoint.pth.tar"):
    torch.save(state, filename)

# Function to load the model checkpoint
def load_checkpoint(filename):
    checkpoint = torch.load(filename)
    return checkpoint

# Training the DQN
env = Environment()
input_size = env.size * env.size  # Flattened grid size
hidden_sizes = [64, 64]  # Hidden layer sizes
output_size = 4  # Number of actions (up, down, left, right)

dqn = DQN(input_size, hidden_sizes, output_size)
dqn_target = DQN(input_size, hidden_sizes, output_size)
dqn_target.load_state_dict(dqn.state_dict())

optimizer = optim.Adam(dqn.parameters(), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)
replay_buffer = ReplayBuffer(10000)
num_episodes = 50
batch_size = 64
gamma = 0.99
target_update_freq = 100
checkpoint_freq = 10  # Save checkpoint every 10 episodes

losses = []
for episode in range(num_episodes):
    state = env.reset()
    total_reward = 0
    done = False
    
    # Integrate A* guidance for initial exploration
    initial_path = a_star(env.floorplan, (env.robot_x, env.robot_y), (env.target_x, env.target_y))
    path_index = 0

    while not done:
        epsilon = max(0.01, 0.2 - 0.01 * (episode / 2))
        if np.random.rand() < epsilon:
            if initial_path and path_index < len(initial_path):
                next_pos = initial_path[path_index]
                if next_pos[0] < env.robot_x:
                    action = 0  # Up
                elif next_pos[0] > env.robot_x:
                    action = 1  # Down
                elif next_pos[1] < env.robot_y:
                    action = 2  # Left
                else:
                    action = 3  # Right
                path_index += 1
            else:
                action = np.random.randint(output_size)
        else:
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
            with torch.no_grad():
                q_values = dqn(state_tensor)
            action = q_values.argmax().item()
        
        next_state, reward, done, _ = env.step(action)
        replay_buffer.push(state, action, reward, next_state, done)
        
        if len(replay_buffer) > batch_size:
            states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)
            states = torch.tensor(states, dtype=torch.float32)
            actions = torch.tensor(actions, dtype=torch.int64)
            rewards = torch.tensor(rewards, dtype=torch.float32)
            next_states = torch.tensor(next_states, dtype=torch.float32)
            dones = torch.tensor(dones, dtype=torch.float32)
            
            q_values = dqn(states)
            q_values = q_values.gather(1, actions.unsqueeze(1)).squeeze(1)
            
            with torch.no_grad():
                next_q_values = dqn(next_states)
                next_q_values = next_q_values.max(1)[0]
                target_q_values = rewards + (1 - dones) * gamma * next_q_values
            
            loss = F.smooth_l1_loss(q_values, target_q_values)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            
            losses.append(loss.item())
        
        total_reward += reward
        state = next_state
        
    if episode % target_update_freq == 0:
        dqn_target.load_state_dict(dqn.state_dict())
    scheduler.step()
    
    # Save checkpoints
    if episode % checkpoint_freq == 0 or episode == num_episodes - 1:
        checkpoint = {
            'episode': episode + 1,
            'state_dict': dqn.state_dict(),
            'optimizer': optimizer.state_dict(),
            'losses': losses
        }
        save_checkpoint(checkpoint, f'checkpoint_{episode + 1}.pth.tar')
    
    print(f"Episode {episode + 1}: Total Reward = {total_reward}, Loss = {np.mean(losses[-batch_size:]) if losses else None}")

# Save the final model
torch.save(dqn.state_dict(), 'dqn_model.pth')

# Load the trained model
dqn = DQN(input_size, hidden_sizes, output_size)
dqn.load_state_dict(torch.load('dqn_model.pth'))
dqn.eval()

# Simulate the bot's path using the trained DQN agent
state = env.reset()
done = False
path = [(env.robot_x, env.robot_y)]

while not done:
    state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0)
    with torch.no_grad():
        q_values = dqn(state_tensor)
    action = q_values.argmax().item()  # Choose action from the trained DQN
    next_state, reward, done, _ = env.step(action)
    path.append((env.robot_x, env.robot_y))
    state = next_state

# Render the environment and the bot's path
env.render(path)

# Evaluate trained DQN
def evaluate_agent(env, agent, num_episodes=5):
    total_rewards = 0
    successful_episodes = 0

    for episode in range(num_episodes):
        state = env.reset()
        episode_reward = 0
        done = False

        while not done:
            action = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)
            episode_reward += reward
            state = next_state

        total_rewards += episode_reward
        if episode_reward > 0:
            successful_episodes += 1

    avg_reward = total_rewards / num_episodes
    success_rate = successful_episodes / num_episodes

    print("Evaluation Results:")
    print(f"Average Reward: {avg_reward}")
    print(f"Success Rate: {success_rate}")

    return avg_reward, success_rate

# Call the evaluation function after rendering
avg_reward, success_rate = evaluate_agent(env, dqn, num_episodes=5)

# Upload the model to Hugging Face
# Authenticate with Hugging Face API
api = HfApi()
api_token = HfFolder.get_token()  # Ensure you have logged in with `huggingface-cli login`

# Create a model repository if it doesn't exist
model_repo = 'cajcodes/dqn-floorplan-finder'
api.create_repo(repo_id=model_repo, exist_ok=True)

# Upload the model
api.upload_file(
    path_or_fileobj='dqn_model.pth',
    path_in_repo='dqn_model.pth',
    repo_id=model_repo
)