File size: 15,025 Bytes
bd61f34
 
 
 
 
 
 
 
8227b96
bd61f34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
"""
DroneKit-Python interface for DeepDrone - Real Drone Control Module
This module provides functions for controlling real drones using DroneKit-Python.
"""

import time
import math
# Import compatibility fix for collections.MutableMapping
from . import compatibility_fix
from dronekit import connect, VehicleMode, LocationGlobalRelative, Command
from pymavlink import mavutil
from typing import Dict, List, Optional, Tuple, Union
import logging

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('drone_control')

class DroneController:
    """Class to handle real drone control operations using DroneKit."""
    
    def __init__(self, connection_string: str = None):
        """
        Initialize the drone controller.
        
        Args:
            connection_string: Connection string for the drone (e.g., 'udp:127.0.0.1:14550' for SITL,
                              '/dev/ttyACM0' for serial, or 'tcp:192.168.1.1:5760' for remote connection)
        """
        self.vehicle = None
        self.connection_string = connection_string
        self.connected = False
    
    def connect_to_drone(self, connection_string: str = None, timeout: int = 30) -> bool:
        """
        Connect to the drone using DroneKit.
        
        Args:
            connection_string: Connection string for the drone (overrides the one provided in __init__)
            timeout: Connection timeout in seconds
            
        Returns:
            bool: True if connection successful, False otherwise
        """
        if connection_string:
            self.connection_string = connection_string
            
        if not self.connection_string:
            logger.error("No connection string provided")
            return False
            
        try:
            logger.info(f"Connecting to drone on {self.connection_string}...")
            self.vehicle = connect(self.connection_string, wait_ready=True, timeout=timeout)
            self.connected = True
            logger.info("Connected to drone successfully")
            
            # Log basic vehicle info
            logger.info(f"Vehicle Version: {self.vehicle.version}")
            logger.info(f"Vehicle Status: {self.vehicle.system_status.state}")
            logger.info(f"GPS: {self.vehicle.gps_0}")
            logger.info(f"Battery: {self.vehicle.battery}")
            
            return True
        except Exception as e:
            logger.error(f"Error connecting to drone: {str(e)}")
            self.connected = False
            return False
    
    def disconnect(self) -> None:
        """Disconnect from the drone."""
        if self.vehicle and self.connected:
            logger.info("Disconnecting from drone...")
            self.vehicle.close()
            self.connected = False
            logger.info("Disconnected from drone")
    
    def arm_and_takeoff(self, target_altitude: float) -> bool:
        """
        Arms the drone and takes off to the specified altitude.
        
        Args:
            target_altitude: Target altitude in meters
            
        Returns:
            bool: True if takeoff successful, False otherwise
        """
        if not self._ensure_connected():
            return False
            
        logger.info("Arming motors...")
        # Switch to GUIDED mode
        self.vehicle.mode = VehicleMode("GUIDED")
        
        # Wait until mode change is verified
        timeout = 10  # seconds
        start = time.time()
        while self.vehicle.mode.name != "GUIDED":
            if time.time() - start > timeout:
                logger.error("Failed to enter GUIDED mode")
                return False
            time.sleep(0.5)
        
        # Arm the drone
        self.vehicle.armed = True
        
        # Wait for arming
        timeout = 10  # seconds
        start = time.time()
        while not self.vehicle.armed:
            if time.time() - start > timeout:
                logger.error("Failed to arm")
                return False
            time.sleep(0.5)
        
        logger.info("Taking off!")
        # Take off to target altitude
        self.vehicle.simple_takeoff(target_altitude)
        
        # Wait until target altitude reached
        while True:
            current_altitude = self.vehicle.location.global_relative_frame.alt
            logger.info(f"Altitude: {current_altitude}")
            
            # Break and return when we're close enough to target altitude
            if current_altitude >= target_altitude * 0.95:
                logger.info("Reached target altitude")
                break
            time.sleep(1)
        
        return True
    
    def land(self) -> bool:
        """
        Land the drone.
        
        Returns:
            bool: True if land command sent successfully, False otherwise
        """
        if not self._ensure_connected():
            return False
            
        logger.info("Landing...")
        self.vehicle.mode = VehicleMode("LAND")
        return True
    
    def return_to_launch(self) -> bool:
        """
        Return to launch location.
        
        Returns:
            bool: True if RTL command sent successfully, False otherwise
        """
        if not self._ensure_connected():
            return False
            
        logger.info("Returning to launch location...")
        self.vehicle.mode = VehicleMode("RTL")
        return True
    
    def goto_location(self, latitude: float, longitude: float, altitude: float) -> bool:
        """
        Go to the specified GPS location.
        
        Args:
            latitude: Target latitude in degrees
            longitude: Target longitude in degrees
            altitude: Target altitude in meters (relative to home position)
            
        Returns:
            bool: True if goto command sent successfully, False otherwise
        """
        if not self._ensure_connected():
            return False
            
        logger.info(f"Going to location: Lat: {latitude}, Lon: {longitude}, Alt: {altitude}")
        
        # Make sure vehicle is in GUIDED mode
        if self.vehicle.mode.name != "GUIDED":
            self.vehicle.mode = VehicleMode("GUIDED")
            # Wait for mode change
            timeout = 5
            start = time.time()
            while self.vehicle.mode.name != "GUIDED":
                if time.time() - start > timeout:
                    logger.error("Failed to enter GUIDED mode")
                    return False
                time.sleep(0.5)
        
        # Create LocationGlobalRelative object and send command
        target_location = LocationGlobalRelative(latitude, longitude, altitude)
        self.vehicle.simple_goto(target_location)
        
        logger.info(f"Going to location: Lat: {latitude}, Lon: {longitude}, Alt: {altitude}")
        return True
    
    def get_current_location(self) -> Dict[str, float]:
        """
        Get the current GPS location of the drone.
        
        Returns:
            Dict containing latitude, longitude, and altitude
        """
        if not self._ensure_connected():
            return {"error": "Not connected to drone"}
            
        location = self.vehicle.location.global_relative_frame
        return {
            "latitude": location.lat,
            "longitude": location.lon, 
            "altitude": location.alt
        }
    
    def get_battery_status(self) -> Dict[str, float]:
        """
        Get the current battery status.
        
        Returns:
            Dict containing battery voltage and remaining percentage
        """
        if not self._ensure_connected():
            return {"error": "Not connected to drone"}
            
        return {
            "voltage": self.vehicle.battery.voltage,
            "level": self.vehicle.battery.level,
            "current": self.vehicle.battery.current
        }
        
    def get_airspeed(self) -> float:
        """
        Get the current airspeed.
        
        Returns:
            Current airspeed in m/s
        """
        if not self._ensure_connected():
            return -1.0
            
        return self.vehicle.airspeed
        
    def get_groundspeed(self) -> float:
        """
        Get the current ground speed.
        
        Returns:
            Current ground speed in m/s
        """
        if not self._ensure_connected():
            return -1.0
            
        return self.vehicle.groundspeed
    
    def upload_mission(self, waypoints: List[Dict[str, float]]) -> bool:
        """
        Upload a mission with multiple waypoints to the drone.
        
        Args:
            waypoints: List of dictionaries with lat, lon, alt for each waypoint
            
        Returns:
            bool: True if mission upload successful, False otherwise
        """
        if not self._ensure_connected():
            return False
            
        logger.info(f"Uploading mission with {len(waypoints)} waypoints...")
        
        # Create list of commands
        cmds = self.vehicle.commands
        cmds.clear()
        
        # Add home location as first waypoint
        cmds.add(Command(0, 0, 0, mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, 
                         mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, 0, 0, 0, 0, 
                         self.vehicle.home_location.lat, 
                         self.vehicle.home_location.lon, 
                         0))
        
        # Add mission waypoints
        for idx, wp in enumerate(waypoints):
            # Add delay at waypoint (0 = no delay)
            delay = wp.get("delay", 0)
            
            # Add waypoint command
            cmds.add(Command(0, 0, 0, mavutil.mavlink.MAV_FRAME_GLOBAL_RELATIVE_ALT, 
                             mavutil.mavlink.MAV_CMD_NAV_WAYPOINT, 0, 0, delay, 0, 0, 0, 
                             wp["lat"], wp["lon"], wp["alt"]))
        
        # Upload the commands to the vehicle
        cmds.upload()
        logger.info("Mission uploaded successfully")
        return True
    
    def execute_mission(self) -> bool:
        """
        Execute the uploaded mission.
        
        Returns:
            bool: True if mission started successfully, False otherwise
        """
        if not self._ensure_connected():
            return False
            
        logger.info("Executing mission...")
        self.vehicle.mode = VehicleMode("AUTO")
        
        # Wait for mode change
        timeout = 5
        start = time.time()
        while self.vehicle.mode.name != "AUTO":
            if time.time() - start > timeout:
                logger.error("Failed to enter AUTO mode")
                return False
            time.sleep(0.5)
        
        logger.info("Mission execution started")
        return True
    
    def set_airspeed(self, speed: float) -> bool:
        """
        Set the target airspeed.
        
        Args:
            speed: Target airspeed in m/s
            
        Returns:
            bool: True if command sent successfully, False otherwise
        """
        if not self._ensure_connected():
            return False
            
        logger.info(f"Setting airspeed to {speed} m/s")
        self.vehicle.airspeed = speed
        return True
    
    def _ensure_connected(self) -> bool:
        """
        Ensure drone is connected before executing a command.
        
        Returns:
            bool: True if connected, False otherwise
        """
        if not self.vehicle or not self.connected:
            logger.error("Not connected to a drone. Call connect_to_drone() first.")
            return False
        return True


# Convenience functions for using the controller without creating an instance

_controller = None

def connect_drone(connection_string: str, timeout: int = 30) -> bool:
    """
    Connect to a drone using the specified connection string.
    
    Args:
        connection_string: Connection string for the drone
        timeout: Connection timeout in seconds
        
    Returns:
        bool: True if connection successful, False otherwise
    """
    global _controller
    if _controller is None:
        _controller = DroneController()
    
    return _controller.connect_to_drone(connection_string, timeout)

def disconnect_drone() -> None:
    """Disconnect from the drone."""
    global _controller
    if _controller:
        _controller.disconnect()

def takeoff(altitude: float) -> bool:
    """
    Arm and take off to the specified altitude.
    
    Args:
        altitude: Target altitude in meters
        
    Returns:
        bool: True if takeoff successful, False otherwise
    """
    global _controller
    if _controller:
        return _controller.arm_and_takeoff(altitude)
    return False

def land() -> bool:
    """
    Land the drone.
    
    Returns:
        bool: True if land command sent successfully, False otherwise
    """
    global _controller
    if _controller:
        return _controller.land()
    return False

def return_home() -> bool:
    """
    Return to launch/home location.
    
    Returns:
        bool: True if RTL command sent successfully, False otherwise
    """
    global _controller
    if _controller:
        return _controller.return_to_launch()
    return False

def fly_to(lat: float, lon: float, alt: float) -> bool:
    """
    Go to the specified GPS location.
    
    Args:
        lat: Target latitude in degrees
        lon: Target longitude in degrees
        alt: Target altitude in meters (relative to home position)
        
    Returns:
        bool: True if goto command sent successfully, False otherwise
    """
    global _controller
    if _controller:
        return _controller.goto_location(lat, lon, alt)
    return False

def get_location() -> Dict[str, float]:
    """
    Get the current GPS location of the drone.
    
    Returns:
        Dict containing latitude, longitude, and altitude
    """
    global _controller
    if _controller:
        return _controller.get_current_location()
    return {"error": "Not connected to drone"}

def get_battery() -> Dict[str, float]:
    """
    Get the current battery status.
    
    Returns:
        Dict containing battery voltage and remaining percentage
    """
    global _controller
    if _controller:
        return _controller.get_battery_status()
    return {"error": "Not connected to drone"}

def execute_mission_plan(waypoints: List[Dict[str, float]]) -> bool:
    """
    Upload and execute a mission with multiple waypoints.
    
    Args:
        waypoints: List of dictionaries with lat, lon, alt for each waypoint
        
    Returns:
        bool: True if mission started successfully, False otherwise
    """
    global _controller
    if _controller:
        if _controller.upload_mission(waypoints):
            return _controller.execute_mission()
    return False