from dataclasses import dataclass import numpy as np from stable_baselines3 import SAC from os import path import param_ from drone import Drone @dataclass class SwarmPolicy: blues: int reds: int is_blue: bool model: object = None count: int = 0 def __post_init__(self): dir_path = "policies/last" + f"/b{self.blues}r{self.reds}/" model_path = dir_path + ("blues_last.zip" if self.is_blue else "reds_last.zip") if path.exists(model_path): print("model loaded:" + model_path) self.model = SAC.load(model_path, verbose=1) # predicts from the model or from a simple centripete model def predict(self, obs): self.count += 1 if self.model: action, _ = self.model.predict(obs) # verbose = 'prediction from ' + (' blue model' if self.is_blue else ' red model') + ' at ' + str(self.count) # print(verbose) return action else: if self.is_blue: return self._improved_attack_predict(obs) else: return self._simple_predict(obs) # the default policy def _simple_predict(self, obs): simple_obs = _decentralise(obs[0:self.blues*6] if self.is_blue else obs[self.blues*6:(self.blues+self.reds)*6]) drone = Drone(is_blue=self.is_blue) action = np.array([]) nb_drones = self.blues if self.is_blue else self.reds for d in range(nb_drones): assign_pos_speed(drone, d, simple_obs) ''' pos_n, speed_n = simple_obs[d*6:d*6+3], simple_obs[d*6+3:d*6+6] pos = drone.from_norm(pos_n, drone.max_positions, drone.min_positions) drone.position = pos speed = drone.from_norm(speed_n, drone.max_speeds, drone.min_speeds) drone.speed = speed ''' action_d = drone.simple_red() action = np.hstack((action, action_d)) action = _centralise(action) return action # the default attack policy def _attack_predict(self, obs): def assign_targets(friends_obs, foes_obs): ''' this current version is simplistic: all friends target the first foe :) :param obs: :return: ''' friends_nb = len(friends_obs) // 6 foes_nb = len(foes_obs) // 6 friends_targets = -np.ones(friends_nb, dtype=int) while -1 in friends_targets: for foe in range(foes_nb): foe_pos = _denorm(foes_obs[foe*6:foe*6+3]) foe_pos_z = foe_pos[0] * np.exp(1j * foe_pos[1]) min_distance = np.inf closest_friend = -1 for friend in range(friends_nb): if friends_targets[friend] == -1: friend_pos = _denorm(friends_obs[friend*6:friend*6+3]) friend_pos_z = friend_pos[0] * np.exp(1j * friend_pos[1]) distance = np.abs(foe_pos_z - friend_pos_z) ** 2 + (friend_pos[2] - foe_pos[2]) ** 2 if distance < min_distance: min_distance = distance closest_friend = friend friends_targets[closest_friend] = foe return friends_targets # gets the friends and foes obs blue_obs = _decentralise(obs[0:self.blues * 6]) red_obs = _decentralise(obs[self.blues * 6:(self.blues + self.reds) * 6]) friends_obs = blue_obs if self.is_blue else red_obs foes_obs = red_obs if self.is_blue else blue_obs # assign red targets to blues friends_targets = assign_targets(friends_obs, foes_obs) friend_drone = Drone(is_blue=self.is_blue) foe_drone = Drone(is_blue=not self.is_blue) action = np.array([]) nb_drones = self.blues if self.is_blue else self.reds for d in range(nb_drones): # assign denormalised position and speed (in m and m/s) to foe drone friend_drone = assign_pos_speed(friend_drone, d, friends_obs) foe_drone_id = friends_targets[d] foe_drone = assign_pos_speed(foe_drone, foe_drone_id, foes_obs) target, time_to_target = calculate_target(friend_drone, foe_drone) action_d = friend_drone.simple_red(target=target, z_margin=0) action = np.hstack((action, action_d)) action = _centralise(action) return action # the improved manual attack policy def _improved_attack_predict(self, obs): # TODO: revamp the algo as follows # start from closest reds, find all blues that are compatible with some margin # among those blues, choose the blue whose first target is the latest # until there is no red left # in case there are blues left overs, restart the process, or converge to zero, or.. # or we decide in advance how many blues we want on the closest and populate several blues againts reds # at the beginning # TODO: check that reds are correctly ordered # TODO : add margin in the params # TODO : case of the foe is not reachable # gets the friends and foes obs blue_obs = _decentralise(obs[0:self.blues * 6]) red_obs = _decentralise(obs[self.blues * 6:(self.blues + self.reds) * 6]) friends_obs = blue_obs if self.is_blue else red_obs foes_obs = red_obs if self.is_blue else blue_obs friends_nb = self.blues if self.is_blue else self.reds foes_nb = self.reds if self.is_blue else self.blues friend_drones = [] for friend_id in range(friends_nb): # assign denormalised position and speed (in m and m/s) to foe drone friend_drone = Drone(is_blue=self.is_blue) friend_drone = assign_pos_speed(friend_drone, friend_id, friends_obs) friend_drones.append(friend_drone) foe_drones = [] for foe_id in range(foes_nb): # assign denormalised position and speed (in m and m/s) to foe drone foe_drone = Drone(is_blue=not self.is_blue) foe_drone = assign_pos_speed(foe_drone, foe_id, foes_obs) foe_drones.append(foe_drone) targets = np.zeros((friends_nb, foes_nb, 3)) best_targets = -np.ones((friends_nb, 3)) times_to_target = -np.ones((friends_nb, foes_nb)) calculation_done = -np.ones(friends_nb) friend_chosen = -np.ones(friends_nb) foe_id = 0 friends_chosen = 0 while foe_id < foes_nb-1 and friends_chosen < friends_nb: best_friend = -1 best_target = np.zeros(3) longest_time = -np.inf foe_drone = foe_drones[foe_id] for friend_id in range(friends_nb): if friend_chosen[friend_id] == -1: # the friend has no foe target assigned friend_drone = friend_drones[friend_id] if calculation_done[friend_id] == -1: # it has not already been calculated target_, time_to_target, is_a_catch = calculate_target(friend_drone, foe_drone) times_to_target[friend_id][foe_id] = time_to_target if is_a_catch else np.inf targets[friend_id][foe_id] = target_ if times_to_target[friend_id][foe_id] < np.inf: # it is a catch if calculation_done[friend_id] == -1: # calculation of time with other drones has not been done for foe_idx in range(foe_id + 1, foes_nb): foex_drone = foe_drones[foe_idx] target_, time_to_target, is_a_catch = calculate_target(friend_drone, foex_drone) times_to_target[friend_id][foe_idx] = time_to_target if is_a_catch else np.inf targets[friend_id][foe_idx] = target_ calculation_done[friend_id] = 1 closest_target = np.min(times_to_target[friend_id, foe_id+1:]) if longest_time < closest_target: longest_time = closest_target best_friend = friend_id best_target = targets[friend_id][foe_id] best_targets[best_friend] = best_target friend_chosen[best_friend] = foe_id friends_chosen += 1 foe_id += 1 if friends_chosen < friends_nb: last_foe = foes_nb - 1 for friend_id in range(friends_nb): if friend_chosen[friend_id] == -1: if times_to_target[friend_id, last_foe] == -1: friend_drone, foe_drone = friend_drones[friend_id], foe_drones[last_foe] target_, time_to_target, is_a_catch = calculate_target(friend_drone, foe_drone) targets[friend_id][last_foe] = target_ closest_target_id = np.argmin(times_to_target[friend_id, :]) best_targets[friend_id] = targets[friend_id][closest_target_id] action = np.array([]) for friend_id in range(friends_nb): action_d = friend_drones[friend_id].simple_red(target=best_targets[friend_id], z_margin=0) action = np.hstack((action, action_d)) action = _centralise(action) return action def assign_pos_speed(drone: Drone, d: int, obs: np.ndarray) -> Drone: # assign denormalised position and speed (in m and m/s) to friend drone d = int(d) pos_n, speed_n = obs[d*6:d*6+3], obs[d*6+3:d*6+6] pos = drone.from_norm(pos_n, drone.max_positions, drone.min_positions) drone.position = pos speed = drone.from_norm(speed_n, drone.max_speeds, drone.min_speeds) drone.speed = speed return drone def _denorm(pos): # from norm (i.e. already decentralised) to meter drone = Drone() pos_meter = drone.from_norm(pos, drone.max_positions, drone.min_positions) return pos_meter def _decentralise(obs): # [-1,1] to [0,1] obs = (obs+1)/2 return obs def _centralise(act): # [0,1] to [-1,1] act = (act - 1/2) * 2 return act def calculate_target(blue_drone: Drone, red_drone: Drone) -> (np.ndarray(3, ), float, bool): ''' :param blue_drone: :param red_drone: :return: ''' # TODO : be more precise at the end of the discovery process def transform(pos, delta_, theta_): pos[0] -= delta_ pos[1] -= theta_ return pos[0] * np.exp(1j * pos[1]) def untransform_to_array(pos, delta_, theta_): pos[0] += delta_ pos[1] += theta_ return pos theta = red_drone.position[1] delta = param_.GROUNDZONE attack_pos = np.copy(blue_drone.position) target_pos = np.copy(red_drone.position) z_blue = transform(attack_pos, delta, theta) z_red = np.real(transform(target_pos, delta, theta)) v_blue = blue_drone.drone_model.max_speed v_red = red_drone.drone_model.max_speed blue_shooting_distance = blue_drone.drone_model.distance_to_neutralisation blue_time_to_zero = (np.abs(z_blue) - blue_shooting_distance) / v_blue red_time_to_zero = z_red / v_red if red_time_to_zero <= param_.STEP or red_time_to_zero < blue_time_to_zero + param_.STEP: return np.zeros(3), red_time_to_zero, False else: max_target = z_red min_target = 0 while True: target = (max_target + min_target) / 2 blue_time_to_target = max(0, (np.abs(z_blue - target) - blue_shooting_distance) / v_blue) red_time_to_target = np.abs(z_red - target) / v_red if red_time_to_target - param_.STEP < blue_time_to_target <= red_time_to_target: target = untransform_to_array((target / z_red) * target_pos, delta, theta) return target, blue_time_to_target, True if red_time_to_target < blue_time_to_target: max_target = target min_target = min_target else: # blue_ time_to_target <= red_time_to_target -1: max_target = max_target min_target = target def unitary_test(rho_blue: float, theta_blue: float, rho_red: float, theta_red: float): ''' tests for the calculate target function :param rho_blue: :param theta_blue: :param rho_red: :param theta_red: :return: ''' blue_drone = Drone() blue_drone.position = np.array([rho_blue, theta_blue, 100]) red_drone = Drone(is_blue=False) red_drone.position = np.array([rho_red, theta_red, 100]) tg, time = calculate_target(blue_drone, red_drone) print('rho_blue : ', rho_blue, ' theta_blue : ', theta_blue, ' rho_red : ', rho_red, ' theta_red : ', theta_red, ' tg : ', tg, ' time : ', time) return tg, time def test(): ''' test for the calculate trajectory function :return: ''' for rho_blue in [1000]: for theta_blue in np.pi * np.array([-1, 0.75, 0.5, 0.25, 0]): for rho_red in [1000]: for theta_red in np.pi * np.array([0, 1/4]): unitary_test(rho_blue=rho_blue, theta_blue=theta_blue, rho_red=rho_red, theta_red=theta_red) print('done')