File size: 6,617 Bytes
4a51346
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import asyncio
import os
import sys

from IPython.core.debugger import Pdb
from IPython.core.completer import IPCompleter
from .ptutils import IPythonPTCompleter
from .shortcuts import create_ipython_shortcuts
from . import embed

from pathlib import Path
from pygments.token import Token
from prompt_toolkit.shortcuts.prompt import PromptSession
from prompt_toolkit.enums import EditingMode
from prompt_toolkit.formatted_text import PygmentsTokens
from prompt_toolkit.history import InMemoryHistory, FileHistory
from concurrent.futures import ThreadPoolExecutor

from prompt_toolkit import __version__ as ptk_version
PTK3 = ptk_version.startswith('3.')


# we want to avoid ptk as much as possible when using subprocesses
# as it uses cursor positioning requests, deletes color ....
_use_simple_prompt = "IPY_TEST_SIMPLE_PROMPT" in os.environ


class TerminalPdb(Pdb):
    """Standalone IPython debugger."""

    def __init__(self, *args, pt_session_options=None, **kwargs):
        Pdb.__init__(self, *args, **kwargs)
        self._ptcomp = None
        self.pt_init(pt_session_options)
        self.thread_executor = ThreadPoolExecutor(1)

    def pt_init(self, pt_session_options=None):
        """Initialize the prompt session and the prompt loop
        and store them in self.pt_app and self.pt_loop.

        Additional keyword arguments for the PromptSession class
        can be specified in pt_session_options.
        """
        if pt_session_options is None:
            pt_session_options = {}

        def get_prompt_tokens():
            return [(Token.Prompt, self.prompt)]

        if self._ptcomp is None:
            compl = IPCompleter(
                shell=self.shell, namespace={}, global_namespace={}, parent=self.shell
            )
            # add a completer for all the do_ methods
            methods_names = [m[3:] for m in dir(self) if m.startswith("do_")]

            def gen_comp(self, text):
                return [m for m in methods_names if m.startswith(text)]
            import types
            newcomp = types.MethodType(gen_comp, compl)
            compl.custom_matchers.insert(0, newcomp)
            # end add completer.

            self._ptcomp = IPythonPTCompleter(compl)

        # setup history only when we start pdb
        if self.shell.debugger_history is None:
            if self.shell.debugger_history_file is not None:
                p = Path(self.shell.debugger_history_file).expanduser()
                if not p.exists():
                    p.touch()
                self.debugger_history = FileHistory(os.path.expanduser(str(p)))
            else:
                self.debugger_history = InMemoryHistory()
        else:
            self.debugger_history = self.shell.debugger_history

        options = dict(
            message=(lambda: PygmentsTokens(get_prompt_tokens())),
            editing_mode=getattr(EditingMode, self.shell.editing_mode.upper()),
            key_bindings=create_ipython_shortcuts(self.shell),
            history=self.debugger_history,
            completer=self._ptcomp,
            enable_history_search=True,
            mouse_support=self.shell.mouse_support,
            complete_style=self.shell.pt_complete_style,
            style=getattr(self.shell, "style", None),
            color_depth=self.shell.color_depth,
        )

        if not PTK3:
            options['inputhook'] = self.shell.inputhook
        options.update(pt_session_options)
        if not _use_simple_prompt:
            self.pt_loop = asyncio.new_event_loop()
            self.pt_app = PromptSession(**options)

    def cmdloop(self, intro=None):
        """Repeatedly issue a prompt, accept input, parse an initial prefix
        off the received input, and dispatch to action methods, passing them
        the remainder of the line as argument.

        override the same methods from cmd.Cmd to provide prompt toolkit replacement.
        """
        if not self.use_rawinput:
            raise ValueError('Sorry ipdb does not support use_rawinput=False')

        # In order to make sure that prompt, which uses asyncio doesn't
        # interfere with applications in which it's used, we always run the
        # prompt itself in a different thread (we can't start an event loop
        # within an event loop). This new thread won't have any event loop
        # running, and here we run our prompt-loop.
        self.preloop()

        try:
            if intro is not None:
                self.intro = intro
            if self.intro:
                print(self.intro, file=self.stdout)
            stop = None
            while not stop:
                if self.cmdqueue:
                    line = self.cmdqueue.pop(0)
                else:
                    self._ptcomp.ipy_completer.namespace = self.curframe_locals
                    self._ptcomp.ipy_completer.global_namespace = self.curframe.f_globals

                    # Run the prompt in a different thread.
                    if not _use_simple_prompt:
                        try:
                            line = self.thread_executor.submit(
                                self.pt_app.prompt
                            ).result()
                        except EOFError:
                            line = "EOF"
                    else:
                        line = input("ipdb> ")

                line = self.precmd(line)
                stop = self.onecmd(line)
                stop = self.postcmd(stop, line)
            self.postloop()
        except Exception:
            raise

    def do_interact(self, arg):
        ipshell = embed.InteractiveShellEmbed(
            config=self.shell.config,
            banner1="*interactive*",
            exit_msg="*exiting interactive console...*",
        )
        global_ns = self.curframe.f_globals
        ipshell(
            module=sys.modules.get(global_ns["__name__"], None),
            local_ns=self.curframe_locals,
        )


def set_trace(frame=None):
    """
    Start debugging from `frame`.

    If frame is not specified, debugging starts from caller's frame.
    """
    TerminalPdb().set_trace(frame or sys._getframe().f_back)


if __name__ == '__main__':
    import pdb
    # IPython.core.debugger.Pdb.trace_dispatch shall not catch
    # bdb.BdbQuit. When started through __main__ and an exception
    # happened after hitting "c", this is needed in order to
    # be able to quit the debugging session (see #9950).
    old_trace_dispatch = pdb.Pdb.trace_dispatch
    pdb.Pdb = TerminalPdb  # type: ignore
    pdb.Pdb.trace_dispatch = old_trace_dispatch  # type: ignore
    pdb.main()