File size: 13,476 Bytes
a162e39
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
from dataclasses import dataclass
import numpy as np
from stable_baselines3 import SAC
from os import path

import param_
from drone import Drone


@dataclass
class SwarmPolicy:
    blues: int
    reds: int
    is_blue: bool
    model: object = None
    count: int = 0

    def __post_init__(self):

        dir_path = "policies/last" + f"/b{self.blues}r{self.reds}/"
        model_path = dir_path + ("blues_last.zip" if self.is_blue else "reds_last.zip")
        if path.exists(model_path):
            print("model loaded:" + model_path)
            self.model = SAC.load(model_path, verbose=1)

    # predicts from the model or from a simple centripete model
    def predict(self, obs):

        self.count += 1

        if self.model:
            action, _ = self.model.predict(obs)
            # verbose = 'prediction from ' + (' blue model' if self.is_blue else ' red model') + ' at ' + str(self.count)
            # print(verbose)
            return action
        else:
            if self.is_blue:
                return self._improved_attack_predict(obs)
            else:
                return self._simple_predict(obs)

    # the default policy
    def _simple_predict(self, obs):
        simple_obs = _decentralise(obs[0:self.blues*6] if self.is_blue else obs[self.blues*6:(self.blues+self.reds)*6])
        drone = Drone(is_blue=self.is_blue)
        action = np.array([])
        nb_drones = self.blues if self.is_blue else self.reds
        for d in range(nb_drones):
            assign_pos_speed(drone, d, simple_obs)
            '''
            pos_n, speed_n = simple_obs[d*6:d*6+3], simple_obs[d*6+3:d*6+6]
            pos = drone.from_norm(pos_n, drone.max_positions, drone.min_positions)
            drone.position = pos
            speed = drone.from_norm(speed_n, drone.max_speeds, drone.min_speeds)
            drone.speed = speed
            '''
            action_d = drone.simple_red()
            action = np.hstack((action, action_d))

        action = _centralise(action)
        return action


    # the default attack policy
    def _attack_predict(self, obs):

        def assign_targets(friends_obs, foes_obs):
            '''
            this current version is simplistic: all friends target the first foe :)
            :param obs:
            :return:
            '''
            friends_nb = len(friends_obs) // 6
            foes_nb = len(foes_obs) // 6

            friends_targets = -np.ones(friends_nb, dtype=int)
            while -1 in friends_targets:
                for foe in range(foes_nb):
                    foe_pos = _denorm(foes_obs[foe*6:foe*6+3])
                    foe_pos_z = foe_pos[0] * np.exp(1j * foe_pos[1])
                    min_distance = np.inf
                    closest_friend = -1
                    for friend in range(friends_nb):
                        if friends_targets[friend] == -1:
                            friend_pos = _denorm(friends_obs[friend*6:friend*6+3])
                            friend_pos_z = friend_pos[0] * np.exp(1j * friend_pos[1])
                            distance = np.abs(foe_pos_z - friend_pos_z) ** 2 + (friend_pos[2] - foe_pos[2]) ** 2
                            if distance < min_distance:
                                min_distance = distance
                                closest_friend = friend
                    friends_targets[closest_friend] = foe

            return friends_targets


        # gets the friends and foes obs
        blue_obs = _decentralise(obs[0:self.blues * 6])
        red_obs = _decentralise(obs[self.blues * 6:(self.blues + self.reds) * 6])
        friends_obs = blue_obs if self.is_blue else red_obs
        foes_obs = red_obs if self.is_blue else blue_obs

        # assign red targets to blues
        friends_targets = assign_targets(friends_obs, foes_obs)

        friend_drone = Drone(is_blue=self.is_blue)
        foe_drone = Drone(is_blue=not self.is_blue)

        action = np.array([])
        nb_drones = self.blues if self.is_blue else self.reds
        for d in range(nb_drones):

            # assign denormalised position and speed (in m and m/s) to foe drone
            friend_drone = assign_pos_speed(friend_drone, d, friends_obs)
            foe_drone_id = friends_targets[d]

            foe_drone = assign_pos_speed(foe_drone, foe_drone_id, foes_obs)
            target, time_to_target = calculate_target(friend_drone, foe_drone)
            action_d = friend_drone.simple_red(target=target, z_margin=0)
            action = np.hstack((action, action_d))

        action = _centralise(action)
        return action

        # the improved manual attack policy
    def _improved_attack_predict(self, obs):
        # TODO: revamp the algo as follows
        #  start from closest reds, find all blues that are compatible with some margin
        #  among those blues, choose the blue whose first target is the latest
        #  until there is no red left
        #  in case there are blues left overs, restart the process, or converge to zero, or..
        #  or we decide in advance how many blues we want on the closest and populate several blues againts reds
        #  at the beginning
        # TODO: check that reds are correctly ordered
        # TODO : add margin in the params
        # TODO : case of the foe is not reachable

        # gets the friends and foes obs
        blue_obs = _decentralise(obs[0:self.blues * 6])
        red_obs = _decentralise(obs[self.blues * 6:(self.blues + self.reds) * 6])
        friends_obs = blue_obs if self.is_blue else red_obs
        foes_obs = red_obs if self.is_blue else blue_obs

        friends_nb = self.blues if self.is_blue else self.reds
        foes_nb = self.reds if self.is_blue else self.blues

        friend_drones = []
        for friend_id in range(friends_nb):
            # assign denormalised position and speed (in m and m/s) to foe drone
            friend_drone = Drone(is_blue=self.is_blue)
            friend_drone = assign_pos_speed(friend_drone, friend_id, friends_obs)
            friend_drones.append(friend_drone)

        foe_drones = []
        for foe_id in range(foes_nb):
            # assign denormalised position and speed (in m and m/s) to foe drone
            foe_drone = Drone(is_blue=not self.is_blue)
            foe_drone = assign_pos_speed(foe_drone, foe_id, foes_obs)
            foe_drones.append(foe_drone)

        targets = np.zeros((friends_nb, foes_nb, 3))
        best_targets = -np.ones((friends_nb, 3))
        times_to_target = -np.ones((friends_nb, foes_nb))
        calculation_done = -np.ones(friends_nb)
        friend_chosen = -np.ones(friends_nb)

        foe_id = 0
        friends_chosen = 0
        while foe_id < foes_nb-1 and friends_chosen < friends_nb:
            best_friend = -1
            best_target = np.zeros(3)
            longest_time = -np.inf
            foe_drone = foe_drones[foe_id]
            for friend_id in range(friends_nb):
                if friend_chosen[friend_id] == -1:  # the friend has no foe target assigned

                    friend_drone = friend_drones[friend_id]

                    if calculation_done[friend_id] == -1:  # it has not already been calculated
                        target_, time_to_target, is_a_catch = calculate_target(friend_drone, foe_drone)
                        times_to_target[friend_id][foe_id] = time_to_target if is_a_catch else np.inf
                        targets[friend_id][foe_id] = target_
                    if times_to_target[friend_id][foe_id] < np.inf:  # it is a catch

                        if calculation_done[friend_id] == -1:  # calculation of time with other drones has not been done
                            for foe_idx in range(foe_id + 1, foes_nb):
                                foex_drone = foe_drones[foe_idx]
                                target_, time_to_target, is_a_catch = calculate_target(friend_drone, foex_drone)
                                times_to_target[friend_id][foe_idx] = time_to_target if is_a_catch else np.inf
                                targets[friend_id][foe_idx] = target_
                                calculation_done[friend_id] = 1
                        closest_target = np.min(times_to_target[friend_id, foe_id+1:])
                        if longest_time < closest_target:
                            longest_time = closest_target
                            best_friend = friend_id
                            best_target = targets[friend_id][foe_id]

            best_targets[best_friend] = best_target
            friend_chosen[best_friend] = foe_id
            friends_chosen += 1
            foe_id += 1

        if friends_chosen < friends_nb:
            last_foe = foes_nb - 1
            for friend_id in range(friends_nb):
                if friend_chosen[friend_id] == -1:
                    if times_to_target[friend_id, last_foe] == -1:
                        friend_drone, foe_drone = friend_drones[friend_id], foe_drones[last_foe]
                        target_, time_to_target, is_a_catch = calculate_target(friend_drone, foe_drone)
                        targets[friend_id][last_foe] = target_
                    closest_target_id = np.argmin(times_to_target[friend_id, :])
                    best_targets[friend_id] = targets[friend_id][closest_target_id]




        action = np.array([])
        for friend_id in range(friends_nb):
            action_d = friend_drones[friend_id].simple_red(target=best_targets[friend_id], z_margin=0)
            action = np.hstack((action, action_d))

        action = _centralise(action)
        return action


def assign_pos_speed(drone: Drone, d: int, obs: np.ndarray) -> Drone:
    # assign denormalised position and speed (in m and m/s) to friend drone
    d = int(d)
    pos_n, speed_n = obs[d*6:d*6+3], obs[d*6+3:d*6+6]
    pos = drone.from_norm(pos_n, drone.max_positions, drone.min_positions)
    drone.position = pos
    speed = drone.from_norm(speed_n, drone.max_speeds, drone.min_speeds)
    drone.speed = speed
    return drone


def _denorm(pos):  # from norm (i.e. already decentralised) to meter
    drone = Drone()
    pos_meter = drone.from_norm(pos, drone.max_positions, drone.min_positions)
    return pos_meter


def _decentralise(obs):  # [-1,1] to [0,1]
    obs = (obs+1)/2
    return obs


def _centralise(act):  # [0,1] to [-1,1]
    act = (act - 1/2) * 2
    return act


def calculate_target(blue_drone: Drone, red_drone: Drone) -> (np.ndarray(3, ), float, bool):
    '''

    :param blue_drone:
    :param red_drone:
    :return:
    '''
    # TODO : be more precise at the end of the discovery process

    def transform(pos, delta_, theta_):
        pos[0] -= delta_
        pos[1] -= theta_
        return pos[0] * np.exp(1j * pos[1])

    def untransform_to_array(pos, delta_, theta_):
        pos[0] += delta_
        pos[1] += theta_
        return pos

    theta = red_drone.position[1]
    delta = param_.GROUNDZONE

    attack_pos = np.copy(blue_drone.position)
    target_pos = np.copy(red_drone.position)

    z_blue = transform(attack_pos, delta, theta)
    z_red = np.real(transform(target_pos, delta, theta))

    v_blue = blue_drone.drone_model.max_speed
    v_red = red_drone.drone_model.max_speed

    blue_shooting_distance = blue_drone.drone_model.distance_to_neutralisation

    blue_time_to_zero = (np.abs(z_blue) - blue_shooting_distance) / v_blue
    red_time_to_zero = z_red / v_red

    if red_time_to_zero <= param_.STEP or red_time_to_zero < blue_time_to_zero + param_.STEP:
        return np.zeros(3), red_time_to_zero, False
    else:
        max_target = z_red
        min_target = 0
        while True:
            target = (max_target + min_target) / 2
            blue_time_to_target = max(0, (np.abs(z_blue - target) - blue_shooting_distance) / v_blue)
            red_time_to_target = np.abs(z_red - target) / v_red
            if red_time_to_target - param_.STEP < blue_time_to_target <= red_time_to_target:
                target = untransform_to_array((target / z_red) * target_pos, delta, theta)
                return target, blue_time_to_target, True
            if red_time_to_target < blue_time_to_target:
                max_target = target
                min_target = min_target
            else:  # blue_  time_to_target  <= red_time_to_target -1:
                max_target = max_target
                min_target = target




def unitary_test(rho_blue: float, theta_blue: float, rho_red: float, theta_red: float):
    '''
    tests for the calculate target function
    :param rho_blue:
    :param theta_blue:
    :param rho_red:
    :param theta_red:
    :return:
    '''
    blue_drone = Drone()
    blue_drone.position = np.array([rho_blue, theta_blue, 100])
    red_drone = Drone(is_blue=False)
    red_drone.position = np.array([rho_red, theta_red, 100])
    tg, time = calculate_target(blue_drone, red_drone)
    print('rho_blue : ', rho_blue, ' theta_blue : ', theta_blue, ' rho_red : ', rho_red, ' theta_red : ', theta_red,
          ' tg : ', tg, ' time : ', time)
    return tg, time


def test():
    '''
    test for the calculate trajectory function
    :return:
    '''
    for rho_blue in [1000]:
        for theta_blue in np.pi * np.array([-1, 0.75, 0.5, 0.25, 0]):
            for rho_red in [1000]:
                for theta_red in np.pi * np.array([0, 1/4]):
                    unitary_test(rho_blue=rho_blue, theta_blue=theta_blue, rho_red=rho_red, theta_red=theta_red)
    print('done')