khulnasoft commited on
Commit
5958f7e
1 Parent(s): 91a9352

Upload 12 files

Browse files
codeql_kernel/__init__.py ADDED
@@ -0,0 +1,3 @@
 
 
 
 
1
+ from .kernel import CodeQLKernel
2
+ from .codeql import CLIClient, QueryClient
3
+ from ._version import __version__
codeql_kernel/__main__.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ from ipykernel.kernelapp import IPKernelApp
2
+ from . import CodeQLKernel
3
+
4
+ IPKernelApp.launch_instance(kernel_class=CodeQLKernel)
codeql_kernel/_version.py ADDED
@@ -0,0 +1 @@
 
 
1
+ __version__ = '0.0.1'
codeql_kernel/codeql.py ADDED
@@ -0,0 +1,220 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import logging
2
+ import os
3
+ import tempfile
4
+ import time
5
+ from subprocess import PIPE, Popen
6
+ from typing import Optional, Tuple
7
+
8
+ from .jsonrpc import RPC as JSONRPC
9
+ from .rawrpc import RPC as RawRPC
10
+
11
+
12
+ class CLIClient:
13
+ """
14
+ Represents a JSONRPC client to connect to CodeQL CLI Server
15
+ """
16
+
17
+ def __init__(self):
18
+ self.cache = {"ram": []}
19
+ self.conn = RawRPC(
20
+ [
21
+ "codeql",
22
+ "execute",
23
+ "cli-server",
24
+ "--logdir",
25
+ "/tmp/codeql_kernel_cliserver",
26
+ ]
27
+ )
28
+
29
+ def stop(self):
30
+ self.conn.stop()
31
+
32
+ def resolve_ram(self) -> Tuple[Optional[str], Optional[list]]:
33
+ if self.cache.get("ram"):
34
+ return (None, self.cache.get("ram"))
35
+ else:
36
+ cmd = ["resolve", "ram", "--format=json"]
37
+ (err, result) = self.conn.request(cmd)
38
+ if err:
39
+ return (err, None)
40
+ self.cache["ram"] = [x for x in result if x.startswith("-J")]
41
+ return (None, self.cache.get("ram"))
42
+
43
+ def resolve_metadata(self, query) -> Tuple[Optional[str], dict]:
44
+ cmd = ["resolve", "metadata", "--format=json", query]
45
+ return self.conn.request(cmd)
46
+
47
+ def resolve_database(self, db_path) -> Tuple[Optional[str], dict]:
48
+ cmd = ["resolve", "database", "--format=json", db_path]
49
+ return self.conn.request(cmd)
50
+
51
+ def resolve_library_path(self, query) -> Tuple[Optional[str], Optional[dict]]:
52
+ cmd = ["resolve", "library-path", "--format=json", "--query", query]
53
+ return self.conn.request(cmd)
54
+
55
+ def bqrs_info(self, bqrs_path) -> Tuple[Optional[str], dict]:
56
+ cmd = ["bqrs", "info", "--format=json", bqrs_path]
57
+ return self.conn.request(cmd)
58
+
59
+ def bqrs_decode(self, bqrs_path) -> Tuple[Optional[str], Optional[str]]:
60
+ (err, ram_opts) = self.resolve_ram()
61
+ if err or not ram_opts:
62
+ return (f"Error resolving ram options {err}", None)
63
+ results_path = tempfile.NamedTemporaryFile(delete=False)
64
+ cmd = [
65
+ "bqrs",
66
+ "decode",
67
+ "--format=csv",
68
+ f"-o={results_path.name}",
69
+ "--entities=string,url",
70
+ bqrs_path,
71
+ ]
72
+ cmd.extend(ram_opts)
73
+ (err, _) = self.conn.request(cmd)
74
+ if err:
75
+ return (f"Error decoding bqrs file {err}", None)
76
+ if os.path.exists(results_path.name):
77
+ with open(results_path.name, "r") as f:
78
+ data = f.read()
79
+ # return json.loads(data)
80
+ return (None, data)
81
+ else:
82
+ return ("Error decoding results", None)
83
+
84
+
85
+ class QueryClient:
86
+ """
87
+ Represents a JSONRPC client to connect to CodeQL Query Server
88
+ """
89
+
90
+ def __init__(self, on_progress=None, on_result=None):
91
+ self._cli_client: CLIClient = CLIClient()
92
+ cmd = ["codeql", "execute", "query-server2", "--threads=0", "--evaluator-log-level", "5"]
93
+ # debug
94
+ # cmd.extend(["--debug", "--tuple-counting", "-v", "--log-to-stderr"])
95
+ # --save-cache --max-disk-cache XX
96
+ (err, ram_opts) = self._cli_client.resolve_ram()
97
+ if err or not ram_opts:
98
+ return (f"Error resolving ram options {err}", None)
99
+ cmd.extend(ram_opts)
100
+ self._proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
101
+ handlers = {}
102
+ if on_progress:
103
+ handlers["ql/progressUpdated"] = on_progress
104
+ self._conn = JSONRPC(
105
+ handlers=handlers, stdout=self._proc.stdin, stdin=self._proc.stdout
106
+ )
107
+ self._progress_id = 0
108
+ self._evaluate_id = 0
109
+ self._db_metadata = {}
110
+ # TODO: wait for query server to be ready
111
+ time.sleep(2)
112
+
113
+ def stop(self):
114
+ if self._proc.stdin:
115
+ self._proc.stdin.close()
116
+ if self._proc.stdout:
117
+ self._proc.stdout.close()
118
+ self._proc.terminate()
119
+ self._proc.wait()
120
+ if self._cli_client:
121
+ self._cli_client.stop()
122
+
123
+ def next_progress_id(self) -> int:
124
+ self._progress_id += 1
125
+ return self._progress_id
126
+
127
+ def next_evaluate_id(self) -> int:
128
+ self._evaluate_id += 1
129
+ return self._evaluate_id
130
+
131
+ def register_database(self, db_path) -> Optional[str]:
132
+ """
133
+ Register a database with the query server
134
+ """
135
+ if not db_path.endswith("/"):
136
+ db_path = db_path + "/"
137
+ if not os.path.isdir(db_path):
138
+ return f"Database path {db_path} is not a directory"
139
+
140
+ (err, db_metadata) = self._cli_client.resolve_database(db_path)
141
+ if err:
142
+ return "Failed to resolve database metadata"
143
+
144
+ # TODO: implement on-the-fly query patching
145
+
146
+ params = {
147
+ "body": {
148
+ "databases": [db_path],
149
+ "progressId": self.next_progress_id(),
150
+ }
151
+ }
152
+ (err, _) = self._conn.request("evaluation/registerDatabases", args=params)
153
+
154
+ if err:
155
+ return err
156
+
157
+ self._db_metadata = db_metadata
158
+ self._db_metadata["path"] = db_path
159
+
160
+ return None
161
+
162
+ def run_query(
163
+ self, query_path, quick_eval={}
164
+ ) -> Tuple[Optional[str], Optional[str]]:
165
+ logging.info(f"Running query {query_path}")
166
+ bqrs_path = tempfile.NamedTemporaryFile(suffix=".bqrs").name
167
+ target = {"query": {"xx": ""}}
168
+ if bool(quick_eval):
169
+ target = {
170
+ "quickEval": {
171
+ "quickEvalPos": {
172
+ "fileName": query_path,
173
+ "line": quick_eval.get("startLine"),
174
+ "column": quick_eval.get("startColumn"),
175
+ "endLine": quick_eval.get("endLine"),
176
+ "endColumn": quick_eval.get("endColumn"),
177
+ }
178
+ }
179
+ }
180
+
181
+ run_queries_params = {
182
+ "body": {
183
+ "db": self._db_metadata["path"],
184
+ # TODO: get additional packs from ENV, command, config, etc.
185
+ "additionalPacks": ["/Users/pwntester/src/github.com/github/codeql"],
186
+ "externalInputs": [],
187
+ "singletonExternalInputs": [], # opts.templateValues or {},
188
+ "outputPath": bqrs_path,
189
+ "queryPath": query_path,
190
+ "target": target,
191
+ },
192
+ "progressId": self.next_progress_id(),
193
+ }
194
+
195
+ (err, resp) = self._conn.request(
196
+ "evaluation/runQuery", args=run_queries_params
197
+ )
198
+
199
+ if resp and resp["resultType"] != 0:
200
+ return (resp["message"], None)
201
+
202
+ if err:
203
+ return (str(err), None)
204
+
205
+ if os.path.exists(bqrs_path):
206
+ (err, bqrs_info) = self._cli_client.bqrs_info(bqrs_path)
207
+ if err:
208
+ return (err, "")
209
+ if not bqrs_info or not bqrs_info["result-sets"]:
210
+ return ("Failed to get bqrs info", "")
211
+ count = bqrs_info["result-sets"][0]["rows"]
212
+ for result_set in bqrs_info["result-sets"]:
213
+ if result_set["name"] == "#select":
214
+ count = result_set["rows"]
215
+ if count > 0:
216
+ return self._cli_client.bqrs_decode(bqrs_path)
217
+ else:
218
+ return (None, "No results")
219
+ else:
220
+ return (f"Failed to find results file at {bqrs_path}", "")
codeql_kernel/images/logo-32x32.png ADDED
codeql_kernel/images/logo-64x64.png ADDED
codeql_kernel/jsonrpc.py ADDED
@@ -0,0 +1,664 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # coding: utf-8
2
+
3
+ """
4
+ Minimal python RPC implementation in a single file based on the JSON-RPC 2.0 specs from
5
+ http://www.jsonrpc.org/specification.
6
+ """
7
+
8
+
9
+ __author__ = "Marcel Rieger"
10
+ __email__ = "python-jsonrpyc@googlegroups.com"
11
+ __copyright__ = "Copyright 2016-2021, Marcel Rieger"
12
+ __credits__ = ["Marcel Rieger"]
13
+ __contact__ = "https://github.com/riga/jsonrpyc"
14
+ __license__ = "BSD-3-Clause"
15
+ __status__ = "Development"
16
+ __version__ = "1.1.1"
17
+ __all__ = ["RPC"]
18
+
19
+
20
+ import io
21
+ import json
22
+ import logging
23
+ import sys
24
+ import threading
25
+ import time
26
+ from queue import Queue
27
+
28
+
29
+ class Spec(object):
30
+ """
31
+ This class wraps methods that create JSON-RPC 2.0 compatible string representations of
32
+ request, response and error objects. All methods are class members, so you might never want to
33
+ create an instance of this class, but rather use the methods directly:
34
+
35
+ .. code-block:: python
36
+
37
+ Spec.request("my_method", 18) # the id is optional
38
+ # => '{"jsonrpc":"2.0","method":"my_method","id": 18}'
39
+
40
+ Spec.response(18, "some_result")
41
+ # => '{"jsonrpc":"2.0","id":18,"result":"some_result"}'
42
+
43
+ Spec.error(18, -32603)
44
+ # => '{"jsonrpc":"2.0","id":18,"error":{"code":-32603,"message":"Internal error"}}'
45
+ """
46
+
47
+ @classmethod
48
+ def check_id(cls, id, allow_empty=False):
49
+ """
50
+ Value check for *id* entries. When *allow_empty* is *True*, *id* is allowed to be *None*.
51
+ Raises a *TypeError* when *id* is neither an integer nor a string.
52
+ """
53
+ if (id is not None or not allow_empty) and not isinstance(id, (int, str)):
54
+ raise TypeError(
55
+ "id must be an integer or string, got {} ({})".format(id, type(id))
56
+ )
57
+
58
+ @classmethod
59
+ def check_method(cls, method):
60
+ """
61
+ Value check for *method* entries. Raises a *TypeError* when *method* is not a string.
62
+ """
63
+ if not isinstance(method, str):
64
+ raise TypeError(
65
+ "method must be a string, got {} ({})".format(method, type(method))
66
+ )
67
+
68
+ @classmethod
69
+ def check_code(cls, code):
70
+ """
71
+ Value check for *code* entries. Raises a *TypeError* when *code* is not an integer, or a
72
+ *KeyError* when there is no :py:class:`RPCError` subclass registered for that *code*.
73
+ """
74
+ if not isinstance(code, int):
75
+ raise TypeError("code must be an integer, got {} ({})".format(id, type(id)))
76
+
77
+ if not get_error(code):
78
+ raise ValueError("unknown code, got {} ({})".format(code, type(code)))
79
+
80
+ @classmethod
81
+ def request(cls, method, id=None, params=None):
82
+ """
83
+ Creates the string representation of a request that calls *method* with optional *params*
84
+ which are encoded by ``json.dumps``. When *id* is *None*, the request is considered a
85
+ notification.
86
+ """
87
+ try:
88
+ cls.check_method(method)
89
+ cls.check_id(id, allow_empty=True)
90
+ except Exception as e:
91
+ raise RPCInvalidRequest(str(e))
92
+
93
+ # start building the request string
94
+ req = '{{"jsonrpc":"2.0","method":"{}"'.format(method)
95
+
96
+ # add the id when given
97
+ if id is not None:
98
+ # encode string ids
99
+ if isinstance(id, str):
100
+ id = json.dumps(id)
101
+ req += ',"id":{}'.format(id)
102
+
103
+ # add parameters when given
104
+ if params is not None:
105
+ try:
106
+ req += ',"params":{}'.format(json.dumps(params))
107
+ except Exception as e:
108
+ raise RPCParseError(str(e))
109
+
110
+ # end the request string
111
+ req += "}"
112
+
113
+ return req
114
+
115
+ @classmethod
116
+ def response(cls, id, result):
117
+ """
118
+ Creates the string representation of a respone that was triggered by a request with *id*.
119
+ A *result* is required, even if it is *None*.
120
+ """
121
+ try:
122
+ cls.check_id(id)
123
+ except Exception as e:
124
+ raise RPCInvalidRequest(str(e))
125
+
126
+ # encode string ids
127
+ if isinstance(id, str):
128
+ id = json.dumps(id)
129
+
130
+ # build the response string
131
+ try:
132
+ res = '{{"jsonrpc":"2.0","id":{},"result":{}}}'.format(
133
+ id, json.dumps(result)
134
+ )
135
+ except Exception as e:
136
+ raise RPCParseError(str(e))
137
+
138
+ return res
139
+
140
+ @classmethod
141
+ def error(cls, id, code, data=None):
142
+ """
143
+ Creates the string representation of an error that occured while processing a request with
144
+ *id*. *code* must lead to a registered :py:class:`RPCError`. *data* might contain
145
+ additional, detailed error information and is encoded by ``json.dumps`` when set.
146
+ """
147
+ try:
148
+ cls.check_id(id)
149
+ cls.check_code(code)
150
+ except Exception as e:
151
+ raise RPCInvalidRequest(str(e))
152
+
153
+ # build the inner error data
154
+ message = get_error(code).title
155
+ err_data = '{{"code":{},"message":"{}"'.format(code, message)
156
+
157
+ # insert data when given
158
+ if data is not None:
159
+ try:
160
+ err_data += ',"data":{}}}'.format(json.dumps(data))
161
+ except Exception as e:
162
+ raise RPCParseError(str(e))
163
+ else:
164
+ err_data += "}"
165
+
166
+ # encode string ids
167
+ if isinstance(id, str):
168
+ id = json.dumps(id)
169
+
170
+ # start building the error string
171
+ err = '{{"jsonrpc":"2.0","id":{},"error":{}}}'.format(id, err_data)
172
+
173
+ return err
174
+
175
+
176
+ class RPC(object):
177
+ """
178
+ The main class of *jsonrpyc*. Instances of this class wrap an input stream *stdin* and an output
179
+ stream *stdout* in order to communicate with other services. A service is not even forced to be
180
+ written in Python as long as it strictly implements the JSON-RPC 2.0 specification. RPC
181
+ instances may wrap a *target* object. By means of a :py:class:`Watchdog` instance, incoming
182
+ requests are routed to methods of this object whose result might be sent back as a response.
183
+ The watchdog instance is created but not started yet, when *watch* is not *True*.
184
+ Example implementation:
185
+
186
+ *server.py*
187
+
188
+ .. code-block:: python
189
+
190
+ import jsonrpyc
191
+
192
+ class MyTarget(object):
193
+
194
+ def greet(self, name):
195
+ return f"Hi, {name}!"
196
+
197
+ jsonrpc.RPC(MyTarget())
198
+
199
+ *client.py*
200
+
201
+ .. code-block:: python
202
+
203
+ import jsonrpyc
204
+ from subprocess import Popen, PIPE
205
+
206
+ p = Popen(["python", "server.py"], stdin=PIPE, stdout=PIPE)
207
+ rpc = jsonrpyc.RPC(stdout=p.stdin, stdin=p.stdout)
208
+
209
+ # non-blocking remote procedure call with callback and js-like signature
210
+ def cb(err, res=None):
211
+ if err:
212
+ throw err
213
+ print(f"callback got: {res}")
214
+
215
+ rpc("greet", args=("John",), callback=cb)
216
+
217
+ # cb is called asynchronously which prints
218
+ # => "callback got: Hi, John!"
219
+
220
+ # blocking remote procedure call with 0.1s polling
221
+ print(rpc("greet", args=("John",), block=0.1))
222
+ # => "Hi, John!"
223
+
224
+ # shutdown the process
225
+ p.stdin.close()
226
+ p.stdout.close()
227
+ p.terminate()
228
+ p.wait()
229
+
230
+ .. py:attribute:: target
231
+
232
+ The wrapped target object. Might be *None* when no object is wrapped, e.g. for the *client*
233
+ RPC instance.
234
+
235
+ .. py:attribute:: stdin
236
+
237
+ The input stream, re-opened with ``"rb"``.
238
+
239
+ .. py:attribute:: stdout
240
+
241
+ The output stream, re-opened with ``"wb"``.
242
+
243
+ .. py:attribute:: watch
244
+
245
+ The :py:class:`Watchdog` instance that optionally watches *stdin* and dispatches incoming
246
+ requests.
247
+ """
248
+
249
+ EMPTY_RESULT = object()
250
+
251
+ def __init__(self, handlers=None, stdin=None, stdout=None, watch=True, **kwargs):
252
+ super(RPC, self).__init__()
253
+
254
+ self.handlers = handlers
255
+
256
+ # open streams
257
+ stdin = sys.stdin if stdin is None else stdin
258
+ stdout = sys.stdout if stdout is None else stdout
259
+ self.stdin = io.open(stdin.fileno(), "rb")
260
+ self.stdout = io.open(stdout.fileno(), "wb")
261
+
262
+ # other attributes
263
+ self._i = 0
264
+ self._callbacks = {}
265
+ self._results = {}
266
+
267
+ # create and optional start the watchdog
268
+ kwargs["start"] = watch
269
+ # kwargs.setdefault("daemon", handlers is None)
270
+ self.watchdog = Watchdog(self, **kwargs)
271
+
272
+ def __del__(self):
273
+ watchdog = getattr(self, "watchdog", None)
274
+ if watchdog:
275
+ watchdog.stop()
276
+
277
+ def __call__(self, *args, **kwargs):
278
+ """
279
+ Shorthand for :py:meth:`call`.
280
+ """
281
+ return self.call(*args, **kwargs)
282
+
283
+ def request(
284
+ self,
285
+ cmd,
286
+ args=(),
287
+ ):
288
+ """
289
+ Sends a request to the remote service and waits for the response
290
+ """
291
+ q = Queue()
292
+
293
+ def cb(err, resp):
294
+ q.put((err, resp))
295
+
296
+ self.call(cmd, args=args, callback=cb)
297
+ return q.get()
298
+
299
+ def call(self, method, args=(), kwargs=None, callback=None, block=0):
300
+ """
301
+ Performs an actual remote procedure call by writing a request representation (a string) to
302
+ the output stream. The remote RPC instance uses *method* to route to the actual method to
303
+ call with *args* and *kwargs*. When *callback* is set, it will be called with the result of
304
+ the remote call. When *block* is larger than *0*, the calling thread is blocked until the
305
+ result is received. In this case, *block* will be the poll interval, emulating synchronuous
306
+ return value behavior. When both *callback* is *None* and *block* is *0* or smaller, the
307
+ request is considered a notification and the remote RPC instance will not send a response.
308
+ """
309
+ # default kwargs
310
+ if kwargs is None:
311
+ kwargs = {}
312
+
313
+ # check if the call is a notification
314
+ is_notification = callback is None and block <= 0
315
+
316
+ # create a new id for requests expecting a response
317
+ id = None
318
+ if not is_notification:
319
+ self._i += 1
320
+ id = self._i
321
+
322
+ # register the callback
323
+ if callback is not None:
324
+ self._callbacks[id] = callback
325
+
326
+ # store an empty result for the meantime
327
+ if block > 0:
328
+ self._results[id] = self.EMPTY_RESULT
329
+
330
+ # create the request
331
+ req = Spec.request(method, id=id, params=args)
332
+ self._write(req)
333
+
334
+ # blocking return value behavior
335
+ if block > 0:
336
+ while True:
337
+ if self._results[id] != self.EMPTY_RESULT:
338
+ result = self._results[id]
339
+ del self._results[id]
340
+ if isinstance(result, Exception):
341
+ raise result
342
+ else:
343
+ return result
344
+ time.sleep(block)
345
+
346
+ def _handle(self, line):
347
+ """
348
+ Handles an incoming *line* and dispatches the parsed object to the request, response, or
349
+ error handlers.
350
+ """
351
+ obj = json.loads(line)
352
+
353
+ # dispatch to the correct handler
354
+ if "method" in obj:
355
+ # request
356
+ self._handle_request(obj)
357
+ elif "error" not in obj:
358
+ # response
359
+ self._handle_response(obj)
360
+ else:
361
+ # error
362
+ self._handle_error(obj)
363
+
364
+ def _handle_request(self, req):
365
+ """
366
+ Handles an incoming request *req*. When it containes an id, a response or error is sent
367
+ back.
368
+ """
369
+ logging.debug(f"Handling request to {req['method']}")
370
+ try:
371
+ method = self._route(req["method"])
372
+ result = method(req["params"])
373
+ if "id" in req:
374
+ res = Spec.response(req["id"], result)
375
+ self._write(res)
376
+ except Exception as e:
377
+ if "id" in req:
378
+ if isinstance(e, RPCError):
379
+ err = Spec.error(req["id"], e.code, e.data)
380
+ else:
381
+ err = Spec.error(req["id"], -32603, str(e))
382
+ self._write(err)
383
+
384
+ def _handle_response(self, res):
385
+ """
386
+ Handles an incoming successful response *res*. Blocking calls are resolved and registered
387
+ callbacks are invoked with the first error argument being set to *None*.
388
+ """
389
+ logging.debug(f"Handling response for {res['id']}: {res}")
390
+ # set the result
391
+ if res["id"] in self._results:
392
+ self._results[res["id"]] = res["result"]
393
+
394
+ # lookup and invoke the callback
395
+ if res["id"] in self._callbacks:
396
+ callback = self._callbacks[res["id"]]
397
+ del self._callbacks[res["id"]]
398
+ callback(None, res["result"])
399
+
400
+ def _handle_error(self, res):
401
+ """
402
+ Handles an incoming failed response *res*. Blocking calls throw an exception and
403
+ registered callbacks are invoked with an exception and the second result argument set to
404
+ *None*.
405
+ """
406
+ logging.debug(f"Handling error {res}")
407
+ # extract the error and create an actual error instance to raise
408
+ err = res["error"]
409
+ error = get_error(err["code"])(err.get("data", err["message"]))
410
+
411
+ # set the error
412
+ if res["id"] in self._results:
413
+ self._results[res["id"]] = error
414
+
415
+ # lookup and invoke the callback
416
+ if res["id"] in self._callbacks:
417
+ callback = self._callbacks[res["id"]]
418
+ del self._callbacks[res["id"]]
419
+ callback(error, None)
420
+
421
+ def _route(self, method):
422
+ if method in self.handlers.keys():
423
+ return self.handlers[method]
424
+ else:
425
+ raise RPCMethodNotFound(data=method)
426
+
427
+ def _write(self, s):
428
+ """
429
+ Writes a string *s* to the output stream.
430
+ """
431
+ msg = f"Content-Length: {len(s)}\n\n{s}"
432
+ logging.debug("SENT: \n" + str(msg) + "\n\n")
433
+ self.stdout.write(bytearray(msg, "utf-8"))
434
+ self.stdout.flush()
435
+
436
+
437
+ class Watchdog(threading.Thread):
438
+ """
439
+ This class represents a thread that watches the input stream of an :py:class:`RPC` instance for
440
+ incoming content and dispatches requests to it.
441
+
442
+ .. py:attribute:: rpc
443
+
444
+ The :py:class:`RPC` instance.
445
+
446
+ .. py:attribute:: name
447
+
448
+ The thread's name.
449
+
450
+ .. py:attribute:: interval
451
+
452
+ The polling interval of the run loop.
453
+
454
+ .. py:attribute:: daemon
455
+
456
+ The thread's daemon flag.
457
+ """
458
+
459
+ def __init__(self, rpc, name="watchdog", interval=0.1, daemon=True, start=True):
460
+ super(Watchdog, self).__init__()
461
+
462
+ # store attributes
463
+ self.rpc = rpc
464
+ self.name = name
465
+ self.interval = interval
466
+ self.daemon = daemon
467
+
468
+ # register a stop event
469
+ self._stop = threading.Event()
470
+
471
+ if start:
472
+ self.start()
473
+
474
+ def start(self):
475
+ """
476
+ Starts with thread's activity.
477
+ """
478
+ super(Watchdog, self).start()
479
+
480
+ def stop(self):
481
+ """
482
+ Stops with thread's activity.
483
+ """
484
+ self._stop.set()
485
+
486
+ def run(self):
487
+ # reset the stop event
488
+ self._stop.clear()
489
+
490
+ # stop here when stdin is not set or closed
491
+ if not self.rpc.stdin or self.rpc.stdin.closed:
492
+ return
493
+
494
+ # read new incoming lines
495
+ last_pos = 0
496
+ while not self._stop.is_set():
497
+ lines = None
498
+
499
+ # stop when stdin is closed
500
+ if self.rpc.stdin.closed:
501
+ break
502
+
503
+ # read from stdin depending on whether it is a tty or not
504
+ if self.rpc.stdin.isatty():
505
+ cur_pos = self.rpc.stdin.tell()
506
+ if cur_pos != last_pos:
507
+ self.rpc.stdin.seek(last_pos)
508
+ lines = self.rpc.stdin.readlines()
509
+ last_pos = self.rpc.stdin.tell()
510
+ self.rpc.stdin.seek(cur_pos)
511
+ else:
512
+ try:
513
+ header = self.rpc.stdin.readline()
514
+ header = header.decode("utf-8").strip()
515
+ if header.startswith("Content-Length:"):
516
+ length = 2 + int(header[len("Content-Length:") :])
517
+ lines = [self.rpc.stdin.read(length)]
518
+ except Exception:
519
+ # prevent residual race conditions occurring when stdin is closed externally
520
+ pass
521
+
522
+ # handle new lines if any
523
+ if lines:
524
+ for line in lines:
525
+ line = line.decode("utf-8").strip()
526
+ if line:
527
+ self.rpc._handle(line)
528
+ else:
529
+ self._stop.wait(self.interval)
530
+
531
+
532
+ class RPCError(Exception):
533
+
534
+ """
535
+ Base class for RPC errors.
536
+
537
+ .. py:attribute:: message
538
+
539
+ The message of this error, i.e., ``"<title> (<code>)[, data: <data>]"``.
540
+
541
+ .. py:attribute:: data
542
+
543
+ Additional data of this error. Setting the data attribute will also change the message
544
+ attribute.
545
+ """
546
+
547
+ def __init__(self, data=None):
548
+ # build the error message
549
+ message = "{} ({})".format(self.title, self.code)
550
+ if data is not None:
551
+ message += ", data: {}".format(data)
552
+ self.message = message
553
+
554
+ super(RPCError, self).__init__(message)
555
+
556
+ self.data = data
557
+
558
+ def __str__(self):
559
+ return self.message
560
+
561
+
562
+ error_map_distinct = {}
563
+ error_map_range = {}
564
+
565
+
566
+ def is_range(code):
567
+ return (
568
+ isinstance(code, tuple)
569
+ and len(code) == 2
570
+ and all(isinstance(i, int) for i in code)
571
+ and code[0] < code[1]
572
+ )
573
+
574
+
575
+ def register_error(cls):
576
+ """
577
+ Decorator that registers a new RPC error derived from :py:class:`RPCError`. The purpose of
578
+ error registration is to have a mapping of error codes/code ranges to error classes for faster
579
+ lookups during error creation.
580
+
581
+ .. code-block:: python
582
+
583
+ @register_error
584
+ class MyCustomRPCError(RPCError):
585
+ code = ...
586
+ title = "My custom error"
587
+ """
588
+ # it would be much cleaner to add a meta class to RPCError as a registry for codes
589
+ # but in CPython 2 exceptions aren't types, so simply provide a registry mechanism here
590
+ if not issubclass(cls, RPCError):
591
+ raise TypeError("'{}' is not a subclass of RPCError".format(cls))
592
+
593
+ code = cls.code
594
+
595
+ if isinstance(code, int):
596
+ error_map = error_map_distinct
597
+ elif is_range(code):
598
+ error_map = error_map_range
599
+ else:
600
+ raise TypeError("invalid RPC error code {}".format(code))
601
+
602
+ if code in error_map:
603
+ raise AttributeError("duplicate RPC error code {}".format(code))
604
+
605
+ error_map[code] = cls
606
+
607
+ return cls
608
+
609
+
610
+ def get_error(code):
611
+ """
612
+ Returns the RPC error class that was previously registered to *code*. *None* is returned when no
613
+ class could be found.
614
+ """
615
+ if code in error_map_distinct:
616
+ return error_map_distinct[code]
617
+
618
+ for (lower, upper), cls in error_map_range.items():
619
+ if lower <= code <= upper:
620
+ return cls
621
+
622
+ return None
623
+
624
+
625
+ @register_error
626
+ class RPCParseError(RPCError):
627
+
628
+ code = -32700
629
+ title = "Parse error"
630
+
631
+
632
+ @register_error
633
+ class RPCInvalidRequest(RPCError):
634
+
635
+ code = -32600
636
+ title = "Invalid Request"
637
+
638
+
639
+ @register_error
640
+ class RPCMethodNotFound(RPCError):
641
+
642
+ code = -32601
643
+ title = "Method not found"
644
+
645
+
646
+ @register_error
647
+ class RPCInvalidParams(RPCError):
648
+
649
+ code = -32602
650
+ title = "Invalid params"
651
+
652
+
653
+ @register_error
654
+ class RPCInternalError(RPCError):
655
+
656
+ code = -32603
657
+ title = "Internal error"
658
+
659
+
660
+ @register_error
661
+ class RPCServerError(RPCError):
662
+
663
+ code = (-32099, -32000)
664
+ title = "Server error"
codeql_kernel/kernel.json ADDED
@@ -0,0 +1,13 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ {
2
+ "argv": [
3
+ "python",
4
+ "-m",
5
+ "codeql_kernel",
6
+ "-f",
7
+ "{connection_file}"
8
+ ],
9
+ "display_name": "CodeQL",
10
+ "mimetype": "text/x-codeql",
11
+ "language": "codeql",
12
+ "name": "codeql"
13
+ }
codeql_kernel/kernel.py ADDED
@@ -0,0 +1,189 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import os
2
+ from io import StringIO
3
+ from tempfile import mkdtemp, mkstemp
4
+
5
+ import pandas as pd
6
+ import tree_sitter
7
+ from IPython.display import HTML
8
+ from metakernel import MetaKernel
9
+ import subprocess
10
+
11
+ from .codeql import QueryClient
12
+
13
+ __version__ = "0.0.1"
14
+
15
+
16
+ class CodeQLKernel(MetaKernel):
17
+ implementation = "CodeQL Kernel"
18
+ implementation_version = "1.0"
19
+ language = "ql"
20
+ language_version = "0.1"
21
+ banner = "CodeQL Kernel - Experimental"
22
+ language_info = {
23
+ "mimetype": "text/x-codeql",
24
+ "name": "codeql",
25
+ "file_extension": ".ql",
26
+ "help_links": MetaKernel.help_links,
27
+ }
28
+
29
+ def __init__(self, **kwargs):
30
+ # get absolute path of running script
31
+ here = os.path.dirname(os.path.abspath(__file__))
32
+ self.QL_LANGUAGE = tree_sitter.Language(
33
+ os.path.join(here, "tree-sitter-ql.so"), "ql"
34
+ )
35
+ self._select_query = self.QL_LANGUAGE.query(
36
+ "(moduleMember (select)) @select_statement"
37
+ )
38
+ self._predicate_query = self.QL_LANGUAGE.query(
39
+ """(moduleMember
40
+ (annotation name: (annotName) @aname (#eq? @aname "query")).
41
+ (classlessPredicate name: (predicateName) @pname)
42
+ ) @annotated_query """
43
+ )
44
+ self._parser = tree_sitter.Parser()
45
+ self._parser.set_language(self.QL_LANGUAGE)
46
+ self._context = ""
47
+
48
+ def on_progress(obj):
49
+ self.Display(obj["message"], clear_output=True)
50
+
51
+ def on_result(obj):
52
+ self.Display(
53
+ f"Query completed in {obj['evaluationTime']}!", clear_output=True
54
+ )
55
+
56
+ self._query_client: QueryClient = QueryClient(
57
+ on_progress=on_progress, on_result=on_result
58
+ )
59
+ MetaKernel.__init__(self, **kwargs)
60
+ print(kwargs)
61
+
62
+ def get_usage(self):
63
+ return "This is the CodeQL kernel."
64
+
65
+ def parse_cell(self, cell):
66
+ """
67
+ parse the cell code using tree-sitter
68
+
69
+ """
70
+ tree = self._parser.parse(bytes(cell, "utf8"))
71
+ select_statements = []
72
+ query_predicates = []
73
+ captures = self._select_query.captures(tree.root_node)
74
+ for capture in captures:
75
+ # capture[0] is the node, capture[1] is the capture name
76
+ if capture[1] == "select_statement":
77
+ start_point = capture[0].start_point
78
+ end_point = capture[0].end_point
79
+ select_statements.append((start_point, end_point))
80
+
81
+ captures = self._predicate_query.captures(tree.root_node)
82
+ for capture in captures:
83
+ # capture[0] is the node
84
+ # capture[1] is the capture name
85
+ if capture[1] == "annotated_query":
86
+ start_point = capture[0].start_point
87
+ end_point = capture[0].end_point
88
+ # extract the annotation name
89
+ # check if its a query predicate
90
+ for i, line in enumerate(cell.split("\n")):
91
+ if i == start_point[0]:
92
+ if (line[start_point[1]: start_point[1] + len("query")] == "query"):
93
+ query_predicates.append((start_point, end_point))
94
+ return (select_statements, query_predicates)
95
+
96
+ def evaluate(self, code, quick_eval=None):
97
+ """
98
+ Evaluate the given code and return the result.
99
+ """
100
+ try:
101
+ if not self._query_client._db_metadata:
102
+ self.Error_display("No database registered! Use %set_database to register a database.")
103
+ return
104
+
105
+ # create a temporary directory to hold the query pack and the query
106
+ qlpack = "\n".join(
107
+ [
108
+ "---",
109
+ "library: false",
110
+ "name: jupyter-kernel/temporary-qlpack",
111
+ "version: 0.0.1",
112
+ "dependencies:",
113
+ " codeql/{}-all: '*'",
114
+ "",
115
+ ]
116
+ ).format(self._query_client._db_metadata["languages"][0])
117
+ tmp_dir = mkdtemp(dir="/tmp", prefix="codeql_kernel")
118
+ with open(os.path.join(tmp_dir, "qlpack.yml"), "w") as f:
119
+ f.write(qlpack)
120
+ subprocess.run("codeql pack install", cwd=tmp_dir, shell=True)
121
+ fd, query_path = mkstemp(suffix=".ql", dir=tmp_dir, text=True)
122
+ os.write(fd, bytearray(code, "utf-8"))
123
+ os.close(fd)
124
+ self.Display("Running query ...", clear_output=True)
125
+ (err, resp) = self._query_client.run_query(
126
+ query_path, quick_eval=quick_eval
127
+ )
128
+ if err:
129
+ self.clear_output(wait=True)
130
+ self.Error_display(
131
+ "Error running query: {}".format(err)
132
+ )
133
+ else:
134
+ csv = StringIO(resp)
135
+ chunks = (chunk for chunk in pd.read_csv(csv, chunksize=5000))
136
+ df = pd.concat(chunks)
137
+ self.Display(HTML(df.to_html()), clear_output=True)
138
+
139
+ except Exception as e:
140
+ self.Error_display("Error running query: {}".format(e))
141
+
142
+ def do_execute_direct(self, code):
143
+ """
144
+ Execute the given code directly.
145
+ """
146
+ (select_statements, query_predicates) = self.parse_cell(code)
147
+ if len(query_predicates) == 1 and len(select_statements) == 0:
148
+ # we have exactly one query predicate:
149
+ # add cell to the context and evaluate the query predicate
150
+ offset = len(self._context.split("\n"))
151
+ self._context += code + "\n"
152
+ predicate = query_predicates[0][0]
153
+ pred_line = predicate[0]
154
+ pred_col = predicate[1]
155
+ cell_lines = code.split("\n")
156
+ words = cell_lines[pred_line].strip().split(" ")
157
+ position = {
158
+ "startLine": offset + pred_line,
159
+ "endLine": offset + pred_line,
160
+ "startColumn": pred_col + len(words[0]) + len(words[1]) + 3,
161
+ "endColumn": pred_col + len(words[0]) + len(words[1]) + 3,
162
+ }
163
+ self.Display("Evaluating predicate '" + words[2].split("(")[0] + "'", clear_output=True)
164
+ self.evaluate(self._context, quick_eval=position)
165
+ elif len(select_statements) == 1:
166
+ # we have exactly one select statement:
167
+ # add cell to the context and evaluate the whole context
168
+ self._context += code + "\n"
169
+ self.Display("Evaluating select statement ...", clear_output=True)
170
+ self.evaluate(self._context)
171
+ else:
172
+ self._context += code + "\n"
173
+
174
+ def repr(self, data):
175
+ return repr(data)
176
+
177
+ def do_shutdown(self, restart):
178
+ if self._query_client:
179
+ self._query_client.stop()
180
+ if restart:
181
+ self.Print("Restarting kernel...")
182
+ self.reload_magics()
183
+ self.restart_kernel()
184
+ self.Print("Done!")
185
+ super(CodeQLKernel, self).do_shutdown(restart)
186
+
187
+
188
+ if __name__ == "__main__":
189
+ CodeQLKernel.run_as_main()
codeql_kernel/magics/__init__.py ADDED
File without changes
codeql_kernel/magics/set_database_magic.py ADDED
@@ -0,0 +1,22 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from metakernel import Magic
2
+ import os
3
+
4
+
5
+ class SetDatabaseMagic(Magic):
6
+
7
+ def line_set_database(self, path):
8
+ if not os.path.exists(path):
9
+ self.kernel.Error_display("Databae path does not exist: {}".format(path))
10
+ return
11
+ self.kernel.Display("Registering database ...", clear_output=False)
12
+ err = self.kernel._query_client.register_database(path)
13
+ if err:
14
+ self.kernel.Error_display("Error registering database: {}".format(err))
15
+ return
16
+ else:
17
+ self.kernel.Display("Database registered!", clear_output=True)
18
+ return
19
+
20
+
21
+ def register_magics(kernel):
22
+ kernel.register_magics(SetDatabaseMagic)
codeql_kernel/rawrpc.py ADDED
@@ -0,0 +1,131 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import io
2
+ import json
3
+ import threading
4
+ from queue import Queue
5
+ from subprocess import PIPE, Popen
6
+
7
+
8
+ class RPC(object):
9
+ def __init__(self, cmd):
10
+ self.cmd = cmd
11
+ self.start()
12
+ super(RPC, self).__init__()
13
+
14
+ def start(self):
15
+ self._proc: Popen = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
16
+
17
+ assert self._proc.stdin
18
+ assert self._proc.stdout
19
+ assert self._proc.stderr
20
+
21
+ self.stdin = io.open(self._proc.stdin.fileno(), "wb")
22
+ self.stdout = io.open(self._proc.stdout.fileno(), "rb")
23
+ self.stderr = io.open(self._proc.stderr.fileno(), "rb")
24
+
25
+ self._queue = Queue()
26
+
27
+ self.watchdog_stdout = Watchdog(self, name="stdout")
28
+ self.watchdog_stderr = Watchdog(self, name="stderr")
29
+
30
+ def stop(self):
31
+ if self.stdin and not self.stdin.closed:
32
+ self.stdin.close()
33
+ if self.stdout and not self.stdout.closed:
34
+ self.stdout.close()
35
+ if self.stderr and not self.stderr.closed:
36
+ self.stderr.close()
37
+ self.watchdog_stdout.stop()
38
+ self.watchdog_stderr.stop()
39
+ self._proc.terminate()
40
+ self._proc.wait()
41
+ self._proc.kill()
42
+
43
+ def restart(self):
44
+ self.stop()
45
+ self.start()
46
+
47
+ def request(self, cmd):
48
+ self._write(cmd)
49
+ return self._queue.get()
50
+
51
+ def _handle_stdout(self, resp):
52
+ try:
53
+ self._queue.put((None, json.loads(resp)))
54
+ except:
55
+ self._queue.put((None, resp))
56
+
57
+ def _handle_stderr(self, resp):
58
+ self._queue.put((resp, None))
59
+
60
+ def _write(self, s):
61
+ req = json.dumps(s)
62
+ req = req + "\0"
63
+ try:
64
+ self.stdin.write(bytearray(req, "utf-8"))
65
+ self.stdin.flush()
66
+ except:
67
+ pass
68
+
69
+
70
+ class Watchdog(threading.Thread):
71
+ def __init__(self, rpc, name="watchdog", interval=0.1):
72
+ super(Watchdog, self).__init__()
73
+
74
+ if name == "stderr":
75
+ self.stream = rpc.stderr
76
+ self.handle = rpc._handle_stderr
77
+ elif name == "stdout":
78
+ self.stream = rpc.stdout
79
+ self.handle = rpc._handle_stdout
80
+
81
+ # store attributes
82
+ self.rpc = rpc
83
+ self.name = name
84
+ self.interval = interval
85
+ self.daemon = True
86
+
87
+ # register a stop event
88
+ self._stop = threading.Event()
89
+
90
+ self.start()
91
+
92
+ def start(self):
93
+ super(Watchdog, self).start()
94
+
95
+ def stop(self):
96
+ self._stop.set()
97
+
98
+ def run(self):
99
+ # reset the stop event
100
+ self._stop.clear()
101
+
102
+ # stop here when stream is not set or closed
103
+ if not self.stream or self.stream.closed:
104
+ return
105
+
106
+ # read new incoming lines
107
+ while not self._stop.is_set():
108
+ resp = None
109
+
110
+ # stop when stream is closed
111
+ if self.stream.closed:
112
+ break
113
+
114
+ try:
115
+ resp = ""
116
+ while True:
117
+ c = self.stream.read(1).decode("utf-8")
118
+ if c == "\x00" and self.name == "stdout":
119
+ break
120
+ elif c == "\n" and self.name == "stderr":
121
+ break
122
+ elif not c:
123
+ # EOF
124
+ break
125
+ else:
126
+ resp = resp + c
127
+ except IOError:
128
+ # prevent residual race conditions occurring when stream is closed externally
129
+ pass
130
+ self.handle(resp)
131
+ self._stop.wait(self.interval)