swarms / drone.py
YvesP's picture
initial load
a162e39
from dataclasses import dataclass
import numpy as np
from dronemodel import DroneModel
import param_ as param_
from settings import Settings
@dataclass
class Drone:
"""
Creates a drone (it is either red or blue / foe or friend
"""
is_blue: bool = True
position: np.ndarray((3,)) = np.zeros((3,))
position_noise: np.ndarray((3,)) = np.zeros((3,))
drone_model: DroneModel = None
max_speeds: np.ndarray((3,)) = None
min_speeds: np.ndarray((3,)) = None
init_position: np.ndarray((3,)) = None
init_speed: np.ndarray((3,)) = None
color = np.ndarray((3,))
is_alive: bool = True
is_fired: int = 0
fires = 0
step_ = 0
id_: int = -1
ttl: float = param_.DURATION # ttl = Time To Live expressed in seconds
speed: np.ndarray((3,)) = np.zeros((3,))
min_positions = np.zeros((3,))
max_positions = np.array([Settings.perimeter, 2*np.pi, Settings.perimeter_z])
def __post_init__(self):
self.drone_model = DroneModel(self.is_blue)
self.max_speeds = np.array([self.drone_model.max_speed,
2*np.pi,
self.drone_model.max_up_speed])
self.min_speeds = np.array([0,
0,
-self.drone_model.max_down_speed])
self.init_position = np.copy(self.position)
self.init_position_noise = np.copy(self.position_noise)
self.init_speed = np.copy(self.speed)
self.color = param_.BLUE_COLOR if self.is_blue else param_.RED_COLOR
self.ttl = (self.position[0] / self.max_speeds[0]) * param_.TTL_RATIO + param_.TTL_MIN
def reset(self):
self.is_alive = True
self.speed = self.init_speed
self.color = param_.BLUE_COLOR if self.is_blue else param_.RED_COLOR
distance_factor = Settings.blue_distance_factor if self.is_blue else Settings.red_distance_factor
self.position[0] = self.init_position[0]*distance_factor
self.position[2] = self.init_position[2]*distance_factor
self.position_noise *= distance_factor
self.position[0] = np.clip(self.position[0] + np.random.rand() * self.position_noise[0],
param_.GROUNDZONE * 1.1,
param_.PERIMETER * 0.9)
self.position[1] = self.position[1] + np.random.rand() * self.position_noise[1]
self.position[2] = np.clip(self.position[2] + np.random.rand() * self.position_noise[2],
param_.GROUNDZONE * 1.1,
param_.PERIMETER_Z * 0.9)
self.ttl = (self.position[0] / self.max_speeds[0]) * param_.TTL_RATIO + param_.TTL_MIN
def step(self, action):
self.step_ = self.step_ + 1 # for debug purposes
reward = 0
info = {'ttl': param_.DURATION}
if self.is_alive: # if the drone is dead, it no longer moves :)
pos_xyz, speed_xyz = self.to_xyz(self.position), self.to_xyz(self.speed)
pos_s, speed_s = \
self.drone_model.get_trajectory(pos_xyz, speed_xyz, action, np.linspace(0, param_.STEP, 10))
pos, speed = pos_s.T[-1], speed_s.T[-1]
self.position, self.speed = self.from_xyz(pos), self.from_xyz(speed)
self.ttl -= param_.STEP
info['ttl'] = self.ttl
# evaluate the distance compared to the greedy action
if self.is_blue:
'''
for further usage
straight_action, time_to_catch = self.simple_blue()
tolerance = 0.05 if 4 < time_to_catch else 1 if 2 < time_to_catch else 3
distance = 1 if tolerance < np.linalg.norm(straight_action - action) else 0
'''
distance = 1
else:
straight_action = self.simple_red()
distance = 1 if 0.1 < np.linalg.norm(straight_action - action) else 0
info['distance_to_straight_action'] = distance
if self._hits_target():
info['hits_target'] = 1
reward = -param_.TARGET_HIT_COST
self.color = param_.RED_SUCCESS_COLOR
self.is_alive = False # the red has done its job ...
if self._out_of_bounds():
coef = -1 if self.is_blue else 1
reward = coef * param_.OOB_COST
self.is_alive = False
info['oob'] = 1
obs = self.get_observation()
done = not self.is_alive
return obs, reward, done, info
def _out_of_bounds(self):
return not (0 < self.position[2] < Settings.perimeter_z and self.position[0] < Settings.perimeter)
def _hits_target(self):
if self.is_blue:
return False
else:
distance_to_zero = np.sqrt(self.position[0]**2 + self.position[2]**2)
return distance_to_zero < Settings.groundzone
def fires_(self, foe) -> bool:
"""
checks if the foe drone is hit by self
:param foe: a foe drone
:return: True= yes, got you
"""
# deads don't kill nor die
if not (self.is_alive and foe.is_alive):
return False
# lets see if foe is in the "fire cone"
pos_xyz = - self.to_xyz(self.position) + self.to_xyz(foe.position)
distance = np.linalg.norm(pos_xyz)
pos_xyz /= distance
if distance < self.drone_model.distance_to_neutralisation:
return self.is_in_the_cone(foe)
return False
def is_in_the_cone(self, foe) -> bool:
'''
verifies if foe is in the cone (without any regard to distance)
:param foe:
:return:
'''
pos_xyz = - self.to_xyz(self.position) + self.to_xyz(foe.position)
pos_xyz /= np.linalg.norm(pos_xyz)
vit_xyz = self.to_xyz(self.speed)
vit_xyz /= np.linalg.norm(vit_xyz)
cos_theta = np.dot(pos_xyz, vit_xyz)
in_the_cone = False
if 0 < cos_theta:
theta = np.arccos(cos_theta)
in_the_cone = theta < self.drone_model.angle_to_neutralisation
return in_the_cone
# tell the drones that they are dead
def is_killed(self, is_blue=True):
self.is_alive = False
self.position[2] = 0
self.color = param_.BLUE_DEAD_COLOR if is_blue else param_.RED_DEAD_COLOR
def to_xyz(self, rho_theta_z: np.ndarray(shape=(3,))) -> np.ndarray(shape=(3,)):
"""
allows to get the 3D xyz coordinates from a polar representation
:param rho_theta_z: array (3,) with rho in meter, theta in rad, zed in meter for positions, /s for speeds, etc.
:return: float array (3,) with x, y, z in meter, /s for speeds, etc.
"""
xy_ = rho_theta_z[0] * np.exp(1j * rho_theta_z[1])
return np.array([np.real(xy_), np.imag(xy_), rho_theta_z[2]])
def from_xyz(self, xyz: np.ndarray(shape=(3,))) -> np.ndarray(shape=(3,)):
"""
"""
z_complex = xyz[0] + 1j*xyz[1]
rho = np.abs(z_complex)
theta = np.angle(z_complex)
return np.array([rho, theta, xyz[2]], dtype='float32')
def to_norm(self,
rho_theta_z: np.ndarray(shape=(3,)),
max_vector: np.ndarray(shape=(3,)),
min_vector: np.ndarray(shape=(3,)) = np.array([0, 0, 0]))\
-> np.ndarray(shape=(3,), dtype='float32'):
"""
normalises the position/speed in order to have all space in a [0;1]**3 space
:return: rho, theta, zed in a [0;1]**3 space
"""
rho = rho_theta_z[0] / max_vector[0]
theta = (rho_theta_z[1] / (2 * np.pi)) % 1
zed = (rho_theta_z[2] - min_vector[2]) / (max_vector[2] - min_vector[2])
return np.array([rho, theta, zed], dtype='float32')
def from_norm(self,
norm: np.ndarray(shape=(3,)),
max_vector: np.ndarray(shape=(3,)),
min_vector: np.ndarray(shape=(3,)) = np.array([0, 0, 0]))\
-> np.ndarray(shape=(3,), dtype='float32'):
"""
denormalises and renders into cylindric coordinates
:param norm:
:param max_vector:
:param min_vector:
:return:
"""
rho = norm[0] * max_vector[0]
theta = norm[1] * 2*np.pi
zed = norm[2] * (max_vector[2] - min_vector[2]) + min_vector[2]
return np.array([rho, theta, zed], dtype='float32')
def to_lat_lon_zed(self, lat, lon):
z = self.position[0] * np.exp(1j * self.position[1])
lat = np.imag(z) * 360 / (40075 * 1000) + lat
lon = np.real(z) * 360 / (40075 * 1000 * np.cos(np.pi / 180 * lat)) + lon
return lat, lon, self.position[2]
def distance(self, other_drone=None):
if other_drone:
distance = np.sqrt(np.abs(self.position[0] * np.exp(1j * self.position[1]) -
other_drone.position[0] * np.exp(1j * other_drone.position[1])) ** 2 +
(self.position[2] - other_drone.position[2]) ** 2)
else:
distance = np.sqrt((self.position[0] ** 2) + self.position[2] ** 2)
return distance
def get_observation(self): # -> np.array(shape=(6,), dtype='float32'):
"""
get normalised and transformed position and speed
:return:
"""
# calculates transformed normalised position
normalised_position = self.to_norm(self.position, self.max_positions, self.min_positions)
# calculates transformed normalised speed
normalised_speed = self.to_norm(self.speed, self.max_speeds, self.min_speeds)
return np.append(normalised_position, normalised_speed)
def simple_red(self, target: np.ndarray(3,)=np.zeros(3), z_margin: float=50) -> np.ndarray(shape=(3,)):
'''
defines the actions for a trajectory targeting zero
:return:
'''
self_z = self.position[0] * np.exp(1j * self.position[1])
target_z = target[0] * np.exp(1j * target[1])
direction = np.zeros(3)
direction[0] = np.abs(self_z - target_z)
direction[1] = np.angle(self_z - target_z)
direction[2] = self.position[2] - target[2] - z_margin
theta = (direction[1] + np.pi) / (2*np.pi) % 1
# slope of drone given its position
tan_phi = np.sign(direction[2]) * np.inf if direction[0] == 0 else direction[2]/direction[0]
# slope of drone speed
tan_phi_point = np.sign(self.speed[2]) * np.inf if self.speed[0] == 0 else self.speed[2]/self.speed[0]
# slope of forces
f_ratio = self.drone_model.Fxy / self.drone_model.Fz_minus
# go up if speed slope is too steep and vertical speed < 0 else take the position angle for forces angle
psy = -np.arctan(tan_phi * f_ratio) / np.pi + 0.5
action = np.array([1, theta, psy])
return action
def simple_target(self, target: (np.ndarray(shape=(3,)))) -> np.ndarray(shape=(3,)):
'''
defines the actions for a trajectory targeting ... the given target
:return:
'''
self_z = self.position[0] * np.exp(1j * self.position[1])
target_z = target[0] * np.exp(1j * target[1])
direction = np.zeros(3)
direction[0] = np.abs(self_z - target_z)
direction[1] = np.angle(self_z - target_z)
direction[2] = self.position[2] - target[2]
theta = (direction[1] + np.pi) / (2*np.pi) % 1
# slope of drone given its position
tan_phi = np.sign(direction[2]) * np.inf if direction[0] == 0 else direction[2]/direction[0]
# slope of drone speed
tan_phi_point = np.sign(self.speed[2]) * np.inf if self.speed[0] == 0 else self.speed[2]/self.speed[0]
# slope of forces
f_ratio = self.drone_model.Fxy / self.drone_model.Fz_minus
# go up if speed slope is too steep and vertical speed < 0 else take the position angle for forces angle
psy = 0.5 if tan_phi_point < -tan_phi else -np.arctan(tan_phi * f_ratio) / np.pi + 0.5
if Settings.perimeter_z / 2 < direction[2]:
psy = min(0.2, psy)
if self.position[0] < 1.5 * Settings.groundzone:
psy = min(0.2, psy)
action = np.array([1, theta, psy])
return action
def next_simple_pos(self):
next_pos = np.zeros(3)
simple_next = self.pos[0] * np.exp(1j * self.pos[1]) + self.speed[0] * np.exp(1j * self.speed[1]) * param_.step
next_pos[0] = np.abs(simple_next)
next_pos[1] = np.arg(simple_next)
next_pos[2] = self.pos[2] + self.speed[2] * param_.step
return next_pos
def copy_pos_speed(self, drone_to_copy):
self.position = drone_to_copy.position
self.speed = drone_to_copy.speed