File size: 17,847 Bytes
57c13e3
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
"""
Terminal-based chat interface for DeepDrone with LiteLLM and Ollama support.
"""

import asyncio
from typing import List, Dict, Any, Optional
from rich.console import Console
from rich.panel import Panel
from rich.text import Text
from rich.markdown import Markdown
from rich.live import Live
from rich.spinner import Spinner
from prompt_toolkit import prompt
from prompt_toolkit.shortcuts import message_dialog
from prompt_toolkit.styles import Style
import json
import time

from .config import ModelConfig
from .llm_interface import LLMInterface
from .drone_tools import DroneToolsManager

class TerminalDroneChat:
    """Terminal-based chat interface for drone control."""
    
    def __init__(self, model_config: ModelConfig, connection_string: Optional[str] = None):
        self.console = Console()
        self.model_config = model_config
        self.connection_string = connection_string
        self.chat_history: List[Dict[str, str]] = []
        
        # Initialize LLM interface
        self.llm = LLMInterface(model_config)
        
        # Initialize drone tools
        self.drone_tools = DroneToolsManager(connection_string)
        
        # Chat settings
        self.show_thinking = True
        self.max_history = 50
        
        # Style for prompt
        self.prompt_style = Style.from_dict({
            'prompt': '#00ff00 bold',
            'input': '#ffffff',
        })
    
    def start(self):
        """Start the interactive chat session."""
        self._show_welcome()
        
        try:
            while True:
                user_input = self._get_user_input()
                
                if not user_input.strip():
                    continue
                
                # Handle special commands
                if user_input.startswith('/'):
                    if self._handle_command(user_input):
                        continue
                    else:
                        break
                
                # Process user message
                self._process_message(user_input)
                
        except KeyboardInterrupt:
            self._show_goodbye()
        except EOFError:
            self._show_goodbye()
    
    def _show_welcome(self):
        """Show welcome message and model info."""
        welcome_text = f"""
[bold green]🚁 DEEPDRONE TERMINAL ACTIVE[/bold green]

[bold]Model:[/bold] {self.model_config.name} ({self.model_config.provider})
[bold]Connection:[/bold] {self.connection_string or 'Not connected'}
[bold]Status:[/bold] Ready for commands

[dim]Type your commands or questions. Use /help for available commands.[/dim]
[dim]Type /quit to exit.[/dim]
        """
        
        self.console.print(Panel(
            welcome_text.strip(),
            border_style="bright_green",
            padding=(1, 2)
        ))
    
    def _show_goodbye(self):
        """Show goodbye message."""
        self.console.print("\n[yellow]🚁 DeepDrone session ended. Fly safe![/yellow]")
        
        # Disconnect from drone if connected
        if self.drone_tools.is_connected():
            self.console.print("[dim]Disconnecting from drone...[/dim]")
            self.drone_tools.disconnect()
    
    def _get_user_input(self) -> str:
        """Get user input with custom prompt."""
        try:
            return prompt(
                [('class:prompt', '🚁 DeepDrone> ')],
                style=self.prompt_style
            )
        except (KeyboardInterrupt, EOFError):
            return '/quit'
    
    def _handle_command(self, command: str) -> bool:
        """Handle special commands. Returns True to continue, False to quit."""
        cmd_parts = command[1:].split()
        if not cmd_parts:
            return True
        
        cmd = cmd_parts[0].lower()
        
        if cmd in ['quit', 'exit', 'q']:
            return False
        
        elif cmd == 'help':
            self._show_help()
        
        elif cmd == 'clear':
            self.console.clear()
            self._show_welcome()
        
        elif cmd == 'history':
            self._show_history()
        
        elif cmd == 'status':
            self._show_status()
        
        elif cmd == 'connect':
            if len(cmd_parts) > 1:
                self._connect_drone(cmd_parts[1])
            else:
                self.console.print("[red]Usage: /connect <connection_string>[/red]")
        
        elif cmd == 'disconnect':
            self._disconnect_drone()
        
        elif cmd == 'models':
            self._show_model_info()
        
        else:
            self.console.print(f"[red]Unknown command: {command}[/red]")
            self.console.print("Type /help for available commands")
        
        return True
    
    def _process_message(self, user_message: str):
        """Process user message and generate response."""
        # Add user message to history
        self.chat_history.append({"role": "user", "content": user_message})
        
        # Show user message
        self.console.print(Panel(
            user_message,
            title="[bold blue]You[/bold blue]",
            border_style="blue",
            padding=(0, 1)
        ))
        
        # Check if this requires drone tools
        requires_tools = self._message_requires_tools(user_message)
        
        # Generate response
        with Live(
            Spinner("dots", text="[green]DeepDrone is thinking...[/green]"),
            console=self.console,
            transient=True
        ) as live:
            
            try:
                if requires_tools:
                    response = self._process_with_tools(user_message, live)
                else:
                    response = self._process_simple_chat(user_message)
                
                live.stop()
                
                # Show response
                self._show_response(response)
                
                # Add to history
                self.chat_history.append({"role": "assistant", "content": response})
                
                # Trim history if too long
                if len(self.chat_history) > self.max_history:
                    self.chat_history = self.chat_history[-self.max_history:]
                
            except Exception as e:
                live.stop()
                self.console.print(f"[red]Error: {e}[/red]")
    
    def _message_requires_tools(self, message: str) -> bool:
        """Check if message requires drone tools."""
        tool_keywords = [
            'connect', 'takeoff', 'land', 'fly', 'goto', 'mission',
            'battery', 'location', 'status', 'arm', 'disarm', 'rtl',
            'return', 'home', 'altitude', 'waypoint', 'navigate'
        ]
        
        message_lower = message.lower()
        return any(keyword in message_lower for keyword in tool_keywords)
    
    def _process_with_tools(self, message: str, live: Live) -> str:
        """Process message that may require drone tools."""
        # Update status
        live.update(Spinner("dots", text="[green]Analyzing command and planning actions...[/green]"))
        
        # Create system prompt with tool information
        system_prompt = self._create_system_prompt_with_tools()
        
        # Prepare messages
        messages = [{"role": "system", "content": system_prompt}]
        messages.extend(self.chat_history[-10:])  # Last 10 messages for context
        messages.append({"role": "user", "content": message})
        
        # Get LLM response
        response = self.llm.chat(messages)
        
        # Check if response contains tool calls
        if self._response_has_tool_calls(response):
            live.update(Spinner("dots", text="[yellow]Executing drone operations...[/yellow]"))
            response = self._execute_tool_calls(response)
        
        return response
    
    def _process_simple_chat(self, message: str) -> str:
        """Process simple chat message without tools."""
        system_prompt = """You are DeepDrone, an AI assistant specialized in drone operations and flight control.
        
You help users with:
- Drone flight planning and mission design
- Understanding drone systems and components
- Troubleshooting flight issues
- Safety protocols and regulations
- Data analysis from drone flights

Be concise, helpful, and focus on drone-related topics. If asked about your identity, 
clearly state that you are DeepDrone, a specialized drone AI assistant."""
        
        messages = [{"role": "system", "content": system_prompt}]
        messages.extend(self.chat_history[-10:])
        messages.append({"role": "user", "content": message})
        
        return self.llm.chat(messages)
    
    def _create_system_prompt_with_tools(self) -> str:
        """Create system prompt with tool information."""
        return """You are DeepDrone, an AI assistant that can control real drones through Python code.

Available drone control functions:
- connect_drone(connection_string): Connect to drone
- disconnect_drone(): Disconnect from drone
- takeoff(altitude): Take off to specified altitude in meters
- land(): Land the drone
- return_home(): Return to launch point
- fly_to(lat, lon, alt): Fly to GPS coordinates
- get_location(): Get current GPS position
- get_battery(): Get battery status
- execute_mission(waypoints): Execute mission with list of waypoints

When user requests drone operations, write Python code using these functions.
Always explain what you're doing and provide status updates.

Example:
```python
# Connect to drone simulator
connect_drone('udp:127.0.0.1:14550')

# Take off to 30 meters
takeoff(30)

# Fly to a specific location
fly_to(37.7749, -122.4194, 30)

# Return home
return_home()

# Disconnect
disconnect_drone()
```

Be safety-conscious and explain each operation."""
    
    def _response_has_tool_calls(self, response: str) -> bool:
        """Check if response contains Python code blocks."""
        return "```python" in response or "```" in response
    
    def _execute_tool_calls(self, response: str) -> str:
        """Execute tool calls found in response."""
        # Extract Python code blocks
        code_blocks = self._extract_code_blocks(response)
        
        results = []
        for code in code_blocks:
            try:
                result = self._execute_code_block(code)
                results.append(f"βœ… Executed: {result}")
            except Exception as e:
                results.append(f"❌ Error: {e}")
        
        # Append execution results to response
        if results:
            response += "\n\n**Execution Results:**\n" + "\n".join(results)
        
        return response
    
    def _extract_code_blocks(self, text: str) -> List[str]:
        """Extract Python code blocks from markdown text."""
        code_blocks = []
        lines = text.split('\n')
        in_code_block = False
        current_block = []
        
        for line in lines:
            if line.strip().startswith('```python') or line.strip().startswith('```'):
                if in_code_block:
                    # End of code block
                    if current_block:
                        code_blocks.append('\n'.join(current_block))
                        current_block = []
                    in_code_block = False
                else:
                    # Start of code block
                    in_code_block = True
            elif in_code_block:
                current_block.append(line)
        
        return code_blocks
    
    def _execute_code_block(self, code: str) -> str:
        """Execute a code block using drone tools."""
        # Create a safe execution environment
        safe_globals = {
            'connect_drone': self.drone_tools.connect_drone,
            'disconnect_drone': self.drone_tools.disconnect_drone,
            'takeoff': self.drone_tools.takeoff,
            'land': self.drone_tools.land,
            'return_home': self.drone_tools.return_home,
            'fly_to': self.drone_tools.fly_to,
            'get_location': self.drone_tools.get_location,
            'get_battery': self.drone_tools.get_battery,
            'execute_mission': self.drone_tools.execute_mission,
        }
        
        # Capture output
        output = []
        
        def capture_print(*args, **kwargs):
            output.append(' '.join(str(arg) for arg in args))
        
        safe_globals['print'] = capture_print
        
        # Execute code
        exec(code, safe_globals)
        
        return '\n'.join(output) if output else "Command executed successfully"
    
    def _show_response(self, response: str):
        """Show AI response with formatting."""
        # Parse as markdown if it contains markdown elements
        if any(marker in response for marker in ['**', '*', '```', '#', '-', '1.']):
            content = Markdown(response)
        else:
            content = Text(response)
        
        self.console.print(Panel(
            content,
            title="[bold green]🚁 DeepDrone[/bold green]",
            border_style="green",
            padding=(0, 1)
        ))
    
    def _show_help(self):
        """Show help message."""
        help_text = """
[bold]Available Commands:[/bold]

[bold cyan]/help[/bold cyan] - Show this help message
[bold cyan]/quit[/bold cyan] - Exit the application
[bold cyan]/clear[/bold cyan] - Clear the screen
[bold cyan]/history[/bold cyan] - Show chat history
[bold cyan]/status[/bold cyan] - Show system status
[bold cyan]/connect <connection>[/bold cyan] - Connect to drone
[bold cyan]/disconnect[/bold cyan] - Disconnect from drone
[bold cyan]/models[/bold cyan] - Show current model info

[bold]Drone Commands (natural language):[/bold]
- "Connect to simulator at udp:127.0.0.1:14550"
- "Take off to 30 meters"
- "Fly to coordinates 37.7749, -122.4194 at 50 meters"
- "Show current location and battery status"
- "Execute a square flight pattern"
- "Return home and land"

[bold]Example Conversation:[/bold]
[dim]You: Connect to the drone simulator
DeepDrone: I'll connect to the simulator for you...
You: Take off to 20 meters and fly in a circle
DeepDrone: Taking off to 20 meters and executing circular pattern...[/dim]
        """
        
        self.console.print(Panel(
            help_text.strip(),
            title="[bold]DeepDrone Help[/bold]",
            border_style="cyan"
        ))
    
    def _show_history(self):
        """Show chat history."""
        if not self.chat_history:
            self.console.print("[yellow]No chat history available[/yellow]")
            return
        
        self.console.print("[bold]Chat History:[/bold]\n")
        
        for i, msg in enumerate(self.chat_history[-10:], 1):  # Last 10 messages
            role_color = "blue" if msg["role"] == "user" else "green"
            role_name = "You" if msg["role"] == "user" else "DeepDrone"
            
            self.console.print(f"[{role_color}]{i}. {role_name}:[/{role_color}] {msg['content'][:100]}...")
    
    def _show_status(self):
        """Show system status."""
        drone_status = "Connected" if self.drone_tools.is_connected() else "Disconnected"
        drone_color = "green" if self.drone_tools.is_connected() else "red"
        
        status_text = f"""
[bold]Model:[/bold] {self.model_config.name} ({self.model_config.provider})
[bold]Drone Status:[/bold] [{drone_color}]{drone_status}[/{drone_color}]
[bold]Connection:[/bold] {self.connection_string or 'None'}
[bold]Chat History:[/bold] {len(self.chat_history)} messages
        """
        
        if self.drone_tools.is_connected():
            try:
                location = self.drone_tools.get_location()
                battery = self.drone_tools.get_battery()
                status_text += f"""
[bold]Location:[/bold] {location}
[bold]Battery:[/bold] {battery}
                """
            except Exception as e:
                status_text += f"\n[yellow]Could not get drone telemetry: {e}[/yellow]"
        
        self.console.print(Panel(
            status_text.strip(),
            title="[bold]System Status[/bold]",
            border_style="yellow"
        ))
    
    def _connect_drone(self, connection_string: str):
        """Connect to drone."""
        self.console.print(f"[yellow]Connecting to drone at {connection_string}...[/yellow]")
        
        try:
            if self.drone_tools.connect_drone(connection_string):
                self.connection_string = connection_string
                self.console.print("[green]βœ… Connected to drone successfully[/green]")
            else:
                self.console.print("[red]❌ Failed to connect to drone[/red]")
        except Exception as e:
            self.console.print(f"[red]❌ Connection error: {e}[/red]")
    
    def _disconnect_drone(self):
        """Disconnect from drone."""
        if self.drone_tools.is_connected():
            self.drone_tools.disconnect_drone()
            self.console.print("[yellow]Disconnected from drone[/yellow]")
        else:
            self.console.print("[yellow]No drone connection to disconnect[/yellow]")
    
    def _show_model_info(self):
        """Show current model information."""
        info_text = f"""
[bold]Name:[/bold] {self.model_config.name}
[bold]Provider:[/bold] {self.model_config.provider}
[bold]Model ID:[/bold] {self.model_config.model_id}
[bold]Max Tokens:[/bold] {self.model_config.max_tokens}
[bold]Temperature:[/bold] {self.model_config.temperature}
        """
        
        if self.model_config.base_url:
            info_text += f"\n[bold]Base URL:[/bold] {self.model_config.base_url}"
        
        self.console.print(Panel(
            info_text.strip(),
            title="[bold]Current Model[/bold]",
            border_style="magenta"
        ))