martinjosifoski commited on
Commit
2dbfb72
1 Parent(s): a27d5a9

Initial commit.

Browse files
DirectCode.py ADDED
@@ -0,0 +1,4 @@
 
 
 
 
 
1
+ # ToDo:
2
+
3
+ class DirectCode:
4
+ pass
RockPaperScissorsJudge.py ADDED
@@ -0,0 +1,46 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from flows.base_flows import Flow
2
+ from typing import Dict
3
+
4
+ from .RockPaperScissorsPlayer import RockPaperScissorsPlayer
5
+
6
+
7
+ class RockPaperScissorsJudge(Flow):
8
+
9
+ def __init__(self, **kwargs):
10
+ super(RockPaperScissorsJudge, self).__init__(**kwargs)
11
+
12
+ self.flow_state["A"] = RockPaperScissorsPlayer(name="Player A", description="RockPaperScissorsPlayer")
13
+ self.flow_state["B"] = RockPaperScissorsPlayer(name="Player B", description="RockPaperScissorsPlayer")
14
+ self.flow_state["A_score"] = 0
15
+ self.flow_state["B_score"] = 0
16
+ self.flow_state["n_party_played"] = 0
17
+
18
+ def run(self, input_data, expected_outputs) -> Dict:
19
+ flow_a = self.flow_state["A"]
20
+ flow_b = self.flow_state["B"]
21
+
22
+ for _ in range(3):
23
+ A_task = self.package_task_message(flow_a, "run", {}, expected_outputs=["choice"])
24
+ B_task = self.package_task_message(flow_b, "run", {}, expected_outputs=["choice"])
25
+ # play another round
26
+ A_output = flow_a(A_task)
27
+ self._log_message(A_output)
28
+ B_output = flow_b(B_task)
29
+ self._log_message(B_output)
30
+
31
+ A_choice = A_output.data["choice"]
32
+ B_choice = B_output.data["choice"]
33
+
34
+ self._update_state({"n_party_played": self.flow_state["n_party_played"] + 1})
35
+
36
+ if A_choice == B_choice:
37
+ # neither has won
38
+ pass
39
+ elif (A_choice == "rock" and B_choice == "scissors"
40
+ or A_choice == "paper" and B_choice == "rock"
41
+ or A_choice == "scissors" and B_choice == "paper"):
42
+ self._update_state({"A_score": self.flow_state["A_score"] + 1})
43
+ else:
44
+ self._update_state({"B_score": self.flow_state["B_score"] + 1})
45
+
46
+ return self._get_keys_from_state(expected_outputs, allow_class_namespace=False)
RockPaperScissorsPlayer.py ADDED
@@ -0,0 +1,15 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from flows.base_flows import Flow
2
+ from typing import List
3
+
4
+ import random
5
+
6
+
7
+ class RockPaperScissorsPlayer(Flow):
8
+
9
+ def __init__(self, **kwargs):
10
+ super(RockPaperScissorsPlayer, self).__init__(**kwargs)
11
+
12
+ def run(self, input_data, expected_outputs: List[str] = None):
13
+ choice = random.choice(["rock", "paper", "scissors"])
14
+
15
+ return {"choice": choice}
__init__.py ADDED
File without changes
pip_requirements.py ADDED
@@ -0,0 +1 @@
 
 
1
+ # ToDo
src/__init__.py ADDED
File without changes
src/datasets/__init__.py ADDED
File without changes
src/datasets/schema.py ADDED
@@ -0,0 +1,74 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ from typing import List, Tuple, Dict
2
+
3
+
4
+ def assert_test_format_codeforces(tests: List[Tuple[List[str], str]]):
5
+ assert isinstance(tests, list) or tests is None
6
+ if tests is None:
7
+ return
8
+ for test in tests:
9
+ assert isinstance(test, list)
10
+ assert len(test) == 2
11
+ inputs, outputs = test
12
+ assert isinstance(inputs, list)
13
+ assert isinstance(outputs, str)
14
+ for input in inputs:
15
+ assert isinstance(input, str)
16
+
17
+
18
+ def assert_entry_format_codeforces(obj: Dict):
19
+ # each data point must follow the same schema
20
+ assert isinstance(obj["id"], str) # contest + problem_name = id, will not change when formatting changes
21
+ assert isinstance(obj["id_hash"], str) # hashsum of all entries, any change to obj will change this
22
+ assert isinstance(obj["contest"], int)
23
+ assert isinstance(obj["problem_name"], str)
24
+ assert isinstance(obj["problem_url"], str)
25
+ assert isinstance(obj["solution_url"], str)
26
+
27
+ assert isinstance(obj["header"], str)
28
+ assert isinstance(obj["problem_description"], str)
29
+ assert isinstance(obj["input_description"], str)
30
+ assert isinstance(obj["output_description"], str)
31
+ assert isinstance(obj["note"], str) or obj["note"] is None
32
+
33
+ assert isinstance(obj["difficulty"], int)
34
+ assert isinstance(obj["tags"], list)
35
+ assert isinstance(obj["working_solution"], str) # can be empty
36
+
37
+ assert_test_format_codeforces(obj["public_tests_io"])
38
+ assert_test_format_codeforces(obj["public_tests_individual_io"])
39
+ assert_test_format_codeforces(obj["hidden_tests_io"])
40
+
41
+
42
+ def assert_test_format_leetcode(tests: List[Tuple[List[str], str]]):
43
+ pass
44
+ # ToDo: Uncomment after the test format is updated
45
+ # assert isinstance(tests, list)
46
+ # for test in tests:
47
+ # assert isinstance(test, tuple)
48
+ # assert len(test) == 2
49
+ # x, y = test
50
+ # assert isinstance(x, str)
51
+ # assert isinstance(y, str)
52
+
53
+
54
+ def assert_entry_format_leetcode(obj: Dict):
55
+ # each data point must follow the same schema
56
+ assert isinstance(obj["id"], str) # contest + problem_name = id, will not change when formatting changes
57
+ assert isinstance(obj["id_hash"], str) # hashsum of all entries, any change to obj will change this
58
+ assert isinstance(obj["index"], int)
59
+ assert isinstance(obj["problem_name"], str)
60
+ assert isinstance(obj["problem_url"], str)
61
+
62
+ assert isinstance(obj["problem_description"], str)
63
+ assert isinstance(obj["constraints"], str)
64
+ assert isinstance(obj["python_stub"], str)
65
+ assert isinstance(obj["difficulty"], str) and obj["difficulty"] in {"easy", "medium", "hard"}
66
+
67
+ # ToDo: Should be added
68
+ # assert isinstance(obj['tags'], list)
69
+ # assert isinstance(obj['solution_url'], str)
70
+ # assert isinstance(obj['working_solution'], str) # can be empty
71
+
72
+ # ToDo: Uncomment after the test format is updated
73
+ # assert_test_format_leetcode(obj['public_tests_io'])
74
+ # assert_test_format_leetcode(obj['hidden_tests_io'])
src/evaluation/__init__.py ADDED
File without changes
src/evaluation/testing_utils_codeforces.py ADDED
@@ -0,0 +1,445 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ # This is based heavily on the huggingface APPS metric
2
+ import re
3
+
4
+ # to run the solution files we're using a timing based approach
5
+ import signal
6
+ import sys
7
+
8
+ # for capturing the stdout
9
+ from io import StringIO
10
+ from typing import List, Tuple
11
+
12
+ # used for testing the code that reads from input
13
+ from unittest.mock import patch, mock_open
14
+
15
+ import numpy as np
16
+ from pyext import RuntimeModule
17
+ from wrapt_timeout_decorator import timeout as wrapt_timeout
18
+ import threading
19
+
20
+ from src.datasets.schema import assert_test_format_codeforces
21
+
22
+ import logging
23
+
24
+ log = logging.getLogger()
25
+ lock = threading.Lock()
26
+
27
+
28
+ def evaluate_solution_for_problem(
29
+ candidate_solution,
30
+ hidden_tests_io=None,
31
+ public_tests_io=None,
32
+ timeout=10,
33
+ debug=False,
34
+ add_extra_imports=False,
35
+ allow_truncated_io=False,
36
+ ):
37
+ with lock:
38
+ """See the readme for the output format of this function."""
39
+ if hidden_tests_io is None:
40
+ hidden_tests_io = []
41
+ if public_tests_io is None:
42
+ public_tests_io = []
43
+
44
+ if candidate_solution is None:
45
+ results_dict = {
46
+ "compilation_status": False,
47
+ "compilation_error_message": "No code was provided.",
48
+ "timeout_error": False,
49
+ "hidden_tests_results": [
50
+ {
51
+ "status": False,
52
+ "error_message": "No code was provided.",
53
+ "generated_output": None,
54
+ "input": test[0],
55
+ "expected_output": test[1],
56
+ }
57
+ for test in hidden_tests_io
58
+ ],
59
+ "public_tests_results": [
60
+ {
61
+ "status": False,
62
+ "error_message": "No code was provided.",
63
+ "generated_output": None,
64
+ "input": test[0],
65
+ "expected_output": test[1],
66
+ }
67
+ for test in public_tests_io
68
+ ],
69
+ }
70
+ return results_dict
71
+
72
+ @wrapt_timeout(timeout, use_signals=False)
73
+ def run_tests():
74
+ hidden_tests_results = check_correctness(
75
+ candidate_solution, hidden_tests_io, timeout, debug, add_extra_imports, allow_truncated_io
76
+ )
77
+ public_tests_results = check_correctness(
78
+ candidate_solution, public_tests_io, timeout, debug, add_extra_imports, allow_truncated_io
79
+ )
80
+
81
+ return hidden_tests_results, public_tests_results
82
+
83
+ try:
84
+ hidden_tests_results, public_tests_results = run_tests()
85
+ timeout_error_occurred = False
86
+ except BaseException as e:
87
+ log.info(e)
88
+ hidden_tests_results = {}
89
+ public_tests_results = {}
90
+
91
+ hidden_tests_results["compilation_status"] = True
92
+ public_tests_results["compilation_status"] = True
93
+ timeout_error_occurred = True
94
+ hidden_tests_results["error_message"] = "Timeout error."
95
+
96
+ hidden_tests_results["results"] = [
97
+ {
98
+ "status": False,
99
+ "error_message": hidden_tests_results["error_message"],
100
+ "generated_output": None,
101
+ "input": test[0],
102
+ "expected_output": test[1],
103
+ }
104
+ for test in hidden_tests_io
105
+ ]
106
+ public_tests_results["results"] = [
107
+ {
108
+ "status": False,
109
+ "error_message": hidden_tests_results["error_message"],
110
+ "generated_output": None,
111
+ "input": test[0],
112
+ "expected_output": test[1],
113
+ }
114
+ for test in public_tests_io
115
+ ]
116
+
117
+ # the compilation status shouldn't depend on the tests
118
+ assert hidden_tests_results["compilation_status"] == public_tests_results["compilation_status"]
119
+
120
+ results_dict = {
121
+ "compilation_status": hidden_tests_results["compilation_status"],
122
+ "compilation_error_message": hidden_tests_results["error_message"],
123
+ "timeout_error": timeout_error_occurred,
124
+ "hidden_tests_results": hidden_tests_results["results"],
125
+ "public_tests_results": public_tests_results["results"],
126
+ }
127
+
128
+ return results_dict
129
+
130
+
131
+ def check_correctness(
132
+ candidate_solution: str,
133
+ tests: List[Tuple[List[str], str]],
134
+ timeout: int = 6000,
135
+ debug=True,
136
+ add_extra_imports=False,
137
+ allow_truncated_io=True,
138
+ ):
139
+ """
140
+ wrapping the testing code in a global timeout, based on huggingface code
141
+ """
142
+
143
+ assert_test_format_codeforces(tests)
144
+ inputs, outputs = [], []
145
+ if len(tests) > 0:
146
+ inputs, outputs = zip(*tests)
147
+
148
+ compilation_error, results = run_test(
149
+ candidate_solution, inputs, outputs, timeout, debug, add_extra_imports, allow_truncated_io
150
+ )
151
+
152
+ assert len(results) == len(inputs)
153
+
154
+ for result in results:
155
+ assert isinstance(result["generated_output"], str) or result["generated_output"] is None
156
+ assert isinstance(result["status"], bool)
157
+ assert isinstance(result["error_message"], str) or result["error_message"] is None
158
+ assert isinstance(result["input"], list)
159
+ assert isinstance(result["expected_output"], str)
160
+
161
+ compilation_status = compilation_error == ""
162
+ if compilation_status:
163
+ compilation_error = None
164
+
165
+ return {"compilation_status": compilation_status, "error_message": compilation_error, "results": results}
166
+
167
+
168
+ class TimeoutException(Exception):
169
+ pass
170
+
171
+
172
+ def timeout_handler(signum, frame):
173
+ log.info("alarm went off")
174
+ # return
175
+ raise TimeoutException
176
+
177
+
178
+ signal.signal(signal.SIGALRM, timeout_handler)
179
+
180
+
181
+ # used to capture stdout as a list
182
+ # from https://stackoverflow.com/a/16571630/6416660
183
+ # alternative use redirect_stdout() from contextlib
184
+ class Capturing(list):
185
+ def __enter__(self):
186
+ self._stdout = sys.stdout
187
+ sys.stdout = self._stringio = StringIO()
188
+ # Make closing the StringIO a no-op
189
+ self._stringio.close = lambda x: 1
190
+ return self
191
+
192
+ def __exit__(self, *args):
193
+ self.extend(self._stringio.getvalue().splitlines())
194
+ del self._stringio # free up some memory
195
+ sys.stdout = self._stdout
196
+
197
+
198
+ def run_test(code, inputs, outputs, timeout: int = 6000, debug=True, add_extra_imports=False, allow_truncated_io=True):
199
+ """
200
+ runs the code and tries to match inputs and outputs
201
+ the scraped testcases may be incomplete
202
+ if allow_truncated_io==True, then we ignore an EOF exception at the end of the generated output
203
+ """
204
+ # Disable functionalities that can make destructive changes to the test.
205
+
206
+ results = []
207
+
208
+ if isinstance(code, list):
209
+ tmp_test = code
210
+ elif isinstance(code, str):
211
+ tmp_test = code.split("\n")
212
+ else:
213
+ raise AssertionError("code must be provided as list of lines or string with \\n linebreaks.")
214
+
215
+ # parse the code into code and imports
216
+ import_lines = []
217
+ future_import_lines = []
218
+ code_lines = []
219
+ for x in tmp_test:
220
+ if (not x.startswith("from ")) and (not x.startswith("import ")):
221
+ code_lines.append("\t" + x + "\n")
222
+ else:
223
+ if "__future__" in x:
224
+ future_import_lines.append(x + "\n")
225
+ else:
226
+ import_lines.append(x + "\n")
227
+
228
+ # assemble a new solution snippet which wraps the generated solution in a function code()
229
+ new_test = "stdin = sys.stdin\nstdout = sys.stdout\n"
230
+ new_test += '__name__="__main__"\n'
231
+ new_test += "def code():\n"
232
+ for line in code_lines:
233
+ new_test += line
234
+
235
+ sol = "\n".join(future_import_lines)
236
+ sol += "import sys\n"
237
+ if add_extra_imports:
238
+ sol += "import time\nimport itertools\nfrom itertools import accumulate, product, permutations, combinations\nimport collections\nfrom collections import Counter, OrderedDict, deque, defaultdict, ChainMap\nfrom functools import lru_cache\nimport math\nfrom math import sqrt, sin, cos, tan, ceil, fabs, floor, gcd, exp, log, log2\nimport fractions\nfrom typing import List, Tuple\nimport numpy as np\nimport random\nimport heapq\nfrom heapq import *\n"
239
+ sol += "\n".join(import_lines) + "\n" + new_test
240
+
241
+ if debug:
242
+ log.info(f"sol = {sol}")
243
+ method_name = "code"
244
+ signal.alarm(timeout)
245
+
246
+ # convert the solution snippet into a pyext runtime module
247
+ sol_module = None
248
+ try:
249
+ sol_module = RuntimeModule.from_string("tmp_sol", "", sol)
250
+ signal.alarm(0)
251
+ except Exception as e:
252
+ signal.alarm(0)
253
+ if debug:
254
+ log.info(f"type 1 compilation error = {e}")
255
+ for inp, out in zip(inputs, outputs):
256
+ # consider all inputs failed
257
+ results.append(
258
+ {
259
+ "status": False,
260
+ "input": inp,
261
+ "expected_output": out,
262
+ "generated_output": None,
263
+ "error_message": repr(e),
264
+ }
265
+ )
266
+ return repr(e), results
267
+
268
+ assert sol_module is not None
269
+ signal.alarm(0)
270
+
271
+ try:
272
+ method = getattr(sol_module, method_name) # get_attr second arg must be str
273
+ except:
274
+ signal.alarm(0)
275
+ e = sys.exc_info()
276
+ log.info(f"unable to get function error = {e}")
277
+
278
+ for inp, out in zip(inputs, outputs):
279
+ # consider all inputs failed
280
+ results.append(
281
+ {
282
+ "status": False,
283
+ "input": inp,
284
+ "expected_output": out,
285
+ "generated_output": None,
286
+ "error_message": repr(e),
287
+ }
288
+ )
289
+ return repr(e), results
290
+
291
+ # go through all tests, call our runtime module with the inputs
292
+ # then compare with the reference output
293
+ for index, (test_input, reference_output) in enumerate(zip(inputs, outputs)):
294
+
295
+ result_object = {
296
+ "input": test_input,
297
+ "expected_output": reference_output,
298
+ }
299
+
300
+ # if the last token of the input is truncated and marked with "..." we delete it
301
+ input_truncated = False
302
+ if "".join(test_input).strip().endswith("...") and allow_truncated_io:
303
+ test_input = test_input[:-1]
304
+ input_truncated = True
305
+
306
+ # sometimes the last input token is ""
307
+ # if len(test_input)>0:
308
+ # if test_input[-1]=="":
309
+ # test_input = test_input[:-1]
310
+
311
+ error_code = None
312
+ with Capturing() as generated_output:
313
+ try:
314
+ call_method(method, test_input)
315
+ # reset the alarm
316
+ signal.alarm(0)
317
+ except Exception as e:
318
+ # runtime error or took too long
319
+ signal.alarm(0)
320
+ error_code = e
321
+ if debug:
322
+ log.info(f"Call-based runtime error or time limit exceeded error = {repr(e)}{e}")
323
+ signal.alarm(0)
324
+
325
+ # in some cases we run into truncated tests
326
+ # in such cases we expect the error code to be None, EOFError or ValueError
327
+ if (
328
+ (input_truncated or reference_output.strip().endswith("..."))
329
+ and allow_truncated_io
330
+ and (error_code is None or isinstance(error_code, EOFError) or isinstance(error_code, ValueError))
331
+ ):
332
+
333
+ generated_output = generated_output[:-1]
334
+ reference_output = reference_output.rstrip("...")
335
+ if len(generated_output) == 0:
336
+ # no output left, we pass by default
337
+ result_object.update(
338
+ **{
339
+ "status": True,
340
+ "generated_output": "\n".join(generated_output),
341
+ "error_message": None,
342
+ }
343
+ )
344
+ results.append(result_object)
345
+ else:
346
+ result_object.update(
347
+ **{
348
+ "status": string_compare(generated_output, reference_output, True),
349
+ "generated_output": "\n".join(generated_output),
350
+ "error_message": None,
351
+ }
352
+ )
353
+ results.append(result_object)
354
+
355
+ # if the input and output are not truncated, we don't allow any errors
356
+ elif error_code is not None:
357
+ result_object.update(**{"status": False, "generated_output": None, "error_message": repr(error_code)})
358
+ results.append(result_object)
359
+ # finally, if there are no errors, we expect the output to match the reference output
360
+ else:
361
+ # the execution went well, let's compare the outputs
362
+ result_object.update(
363
+ **{
364
+ "status": string_compare(generated_output, reference_output, False),
365
+ "generated_output": "\n".join(generated_output),
366
+ "error_message": None,
367
+ }
368
+ )
369
+ results.append(result_object)
370
+
371
+ return "", results
372
+
373
+
374
+ def string_compare(candidate, correct, truncate_output=False, floating_point_accuracy=0.01):
375
+ candidate = [o.strip().lower() for o in candidate]
376
+ correct = correct.strip().lower()
377
+
378
+ # normalize whitespace
379
+ candidate = "\n".join(candidate)
380
+ candidate = re.sub("\s+", " ", candidate).strip()
381
+ correct = re.sub("\s+", " ", correct).strip()
382
+
383
+ # split into individual tokens
384
+ candidate = candidate.split(" ")
385
+ correct = correct.split(" ")
386
+
387
+ # some tests may be truncated, if we allow this we don't enforce equal length of inputs/outputs
388
+ if not truncate_output:
389
+ if not len(candidate) == len(correct):
390
+ return False
391
+
392
+ # if we allow truncated io, the last token of the output may have been corrupted
393
+ if truncate_output:
394
+ correct = correct[:-1]
395
+
396
+ # when zip is used for lists of unequal length it will give as many pairs as there are items in the shorter list
397
+ for left, right in zip(candidate, correct):
398
+ if left == right:
399
+ continue
400
+
401
+ try:
402
+ int_left = int(left)
403
+ int_right = int(right)
404
+ if int_left == int_right:
405
+ continue
406
+ except ValueError:
407
+ pass
408
+
409
+ try:
410
+ float_left = float(left)
411
+ float_right = float(right)
412
+ if np.abs(float_left - float_right) < floating_point_accuracy:
413
+ continue
414
+ except ValueError:
415
+ pass
416
+
417
+ return False
418
+
419
+ return True
420
+
421
+
422
+ def call_method(method, inputs):
423
+ if isinstance(inputs, list):
424
+ inputs = "\n".join(inputs)
425
+
426
+ inputs_line_iterator = iter(inputs.split("\n"))
427
+
428
+ # sys.setrecursionlimit(10000)
429
+
430
+ # @patch('builtins.input', side_effect=inputs.split("\n"))
431
+ @patch("builtins.open", mock_open(read_data=inputs))
432
+ @patch("sys.stdin", StringIO(inputs))
433
+ @patch("sys.stdin.readline", lambda *args: next(inputs_line_iterator))
434
+ @patch("sys.stdin.readlines", lambda *args: inputs.split("\n"))
435
+ @patch("sys.stdin.read", lambda *args: inputs)
436
+ # @patch('sys.stdout.write', print)
437
+ def _inner_call_method(_method):
438
+ try:
439
+ return _method()
440
+ except SystemExit as e:
441
+ pass
442
+ finally:
443
+ pass
444
+
445
+ return _inner_call_method(method)