Spaces:
Sleeping
Sleeping
| import ast | |
| import random | |
| import copy | |
| BUILTIN_PAIRS = { | |
| "min": "max", "max": "min", | |
| "any": "all", "all": "any", | |
| "sum": "len", "len": "sum" | |
| } | |
| BLOCKED_IMPORTS = ["os", "sys", "subprocess", "shutil", "pathlib"] | |
| def is_safe_injection(code: str) -> bool: | |
| for blocked in BLOCKED_IMPORTS: | |
| if f"import {blocked}" in code or f"from {blocked}" in code: | |
| return False | |
| return True | |
| class BugInjectorVisitor(ast.NodeTransformer): | |
| def __init__(self, target_operator: str): | |
| super().__init__() | |
| self.target_operator = target_operator | |
| self.mutated = False | |
| def visit_Constant(self, node): | |
| self.generic_visit(node) | |
| if self.mutated: | |
| return node | |
| if self.target_operator == "off_by_one" and isinstance(node.value, int) and not isinstance(node.value, bool): | |
| shift = random.choice([-1, 1]) | |
| node.value += shift | |
| self.mutated = True | |
| return node | |
| return node | |
| def visit_Compare(self, node): | |
| self.generic_visit(node) | |
| if self.mutated: | |
| return node | |
| if self.target_operator == "wrong_operator": | |
| if isinstance(node.ops[0], ast.Lt): | |
| node.ops[0] = ast.GtE() | |
| self.mutated = True | |
| elif isinstance(node.ops[0], ast.LtE): | |
| node.ops[0] = ast.Gt() | |
| self.mutated = True | |
| elif isinstance(node.ops[0], ast.Gt): | |
| node.ops[0] = ast.LtE() | |
| self.mutated = True | |
| elif isinstance(node.ops[0], ast.GtE): | |
| node.ops[0] = ast.Lt() | |
| self.mutated = True | |
| elif isinstance(node.ops[0], ast.Eq): | |
| node.ops[0] = ast.NotEq() | |
| self.mutated = True | |
| elif isinstance(node.ops[0], ast.NotEq): | |
| node.ops[0] = ast.Eq() | |
| self.mutated = True | |
| return node | |
| def visit_BinOp(self, node): | |
| self.generic_visit(node) | |
| if self.mutated: | |
| return node | |
| if self.target_operator == "wrong_operator": | |
| if isinstance(node.op, ast.Add): | |
| node.op = ast.Sub() | |
| self.mutated = True | |
| elif isinstance(node.op, ast.Sub): | |
| node.op = ast.Add() | |
| self.mutated = True | |
| elif isinstance(node.op, ast.Mult): | |
| node.op = ast.FloorDiv() | |
| self.mutated = True | |
| elif isinstance(node.op, ast.Div): | |
| node.op = ast.Mult() | |
| self.mutated = True | |
| return node | |
| def visit_Call(self, node): | |
| self.generic_visit(node) | |
| if self.mutated: | |
| return node | |
| if isinstance(node.func, ast.Name): | |
| # wrong built-in | |
| if self.target_operator == "wrong_builtin" and node.func.id in BUILTIN_PAIRS: | |
| node.func.id = BUILTIN_PAIRS[node.func.id] | |
| self.mutated = True | |
| # loop boundary shift | |
| elif self.target_operator == "loop_boundary_shift" and node.func.id == "range": | |
| if len(node.args) == 1: | |
| # `range(n)` -> `range(n+1)` | |
| node.args[0] = ast.BinOp(left=node.args[0], op=ast.Add(), right=ast.Constant(value=1)) | |
| self.mutated = True | |
| elif len(node.args) == 2: | |
| # `range(a, b)` -> `range(a-1, b)` | |
| node.args[0] = ast.BinOp(left=node.args[0], op=ast.Sub(), right=ast.Constant(value=1)) | |
| self.mutated = True | |
| return node | |
| def visit_If(self, node): | |
| self.generic_visit(node) | |
| if self.mutated: | |
| return node | |
| if self.target_operator == "condition_negation": | |
| # `if x > 0` -> `if not x > 0` | |
| node.test = ast.UnaryOp(op=ast.Not(), operand=node.test) | |
| self.mutated = True | |
| if self.target_operator == "missing_base_case": | |
| for idx, child in enumerate(node.body): | |
| if isinstance(child, ast.Return): | |
| node.body[idx] = ast.Pass() | |
| self.mutated = True | |
| break | |
| return node | |
| def visit_Slice(self, node): | |
| self.generic_visit(node) | |
| if self.mutated: | |
| return node | |
| if self.target_operator == "slice_boundary_corruption": | |
| if node.lower is not None: | |
| node.lower = ast.BinOp(left=node.lower, op=ast.Add(), right=ast.Constant(value=1)) | |
| self.mutated = True | |
| elif node.upper is not None: | |
| node.upper = ast.BinOp(left=node.upper, op=ast.Sub(), right=ast.Constant(value=1)) | |
| self.mutated = True | |
| return node | |
| def visit_Name(self, node): | |
| self.generic_visit(node) | |
| return node | |
| def visit_Assign(self, node): | |
| # variable swap strategy applied by exchanging two target assign variables when there are multiple targets | |
| self.generic_visit(node) | |
| if self.mutated: | |
| return node | |
| if self.target_operator == "variable_swap" and getattr(node, "targets", None): | |
| if isinstance(node.targets[0], ast.Tuple) and len(node.targets[0].elts) >= 2: | |
| # swap a, b = x, y -> b, a = x, y | |
| node.targets[0].elts[0], node.targets[0].elts[1] = node.targets[0].elts[1], node.targets[0].elts[0] | |
| self.mutated = True | |
| return node | |
| def inject_bug(original_code: str, proposed_operator: str) -> tuple[str, bool]: | |
| """ | |
| 4 critical checks: | |
| - parse succeeds | |
| - mutation actually changed code | |
| - blocked imports checked | |
| - built pairs correctly swapped | |
| """ | |
| try: | |
| tree = ast.parse(original_code) | |
| except SyntaxError: | |
| return original_code, False | |
| injector = BugInjectorVisitor(proposed_operator) | |
| mutated_tree = injector.visit(copy.deepcopy(tree)) | |
| ast.fix_missing_locations(mutated_tree) | |
| mutated_code = ast.unparse(mutated_tree) | |
| # 2. Check if mutation actually changed something | |
| if mutated_code.strip() == original_code.strip(): | |
| return original_code, False | |
| # 3. Blocked imports | |
| if not is_safe_injection(mutated_code): | |
| return original_code, False | |
| # 4. AST parsing check | |
| try: | |
| ast.parse(mutated_code) | |
| except SyntaxError: | |
| return original_code, False | |
| return mutated_code, True | |
| def infer_bug_operator(original_code: str, candidate_code: str) -> str | None: | |
| try: | |
| original_tree = ast.parse(original_code) | |
| candidate_tree = ast.parse(candidate_code) | |
| except SyntaxError: | |
| return None | |
| if ast.dump(original_tree) == ast.dump(candidate_tree): | |
| return None | |
| original_nodes = list(ast.walk(original_tree)) | |
| candidate_nodes = list(ast.walk(candidate_tree)) | |
| inferred = ( | |
| _infer_wrong_builtin(original_nodes, candidate_nodes) | |
| or _infer_loop_boundary_shift(original_nodes, candidate_nodes) | |
| or _infer_slice_boundary_corruption(original_nodes, candidate_nodes) | |
| or _infer_condition_negation(original_nodes, candidate_nodes) | |
| or _infer_wrong_operator(original_nodes, candidate_nodes) | |
| or _infer_off_by_one(original_nodes, candidate_nodes) | |
| ) | |
| return inferred | |
| def _infer_wrong_builtin(original_nodes: list[ast.AST], candidate_nodes: list[ast.AST]) -> str | None: | |
| for original_node, candidate_node in zip(original_nodes, candidate_nodes): | |
| if not isinstance(original_node, ast.Call) or not isinstance(candidate_node, ast.Call): | |
| continue | |
| if not isinstance(original_node.func, ast.Name) or not isinstance(candidate_node.func, ast.Name): | |
| continue | |
| expected = BUILTIN_PAIRS.get(original_node.func.id) | |
| if expected and candidate_node.func.id == expected: | |
| return "wrong_builtin" | |
| return None | |
| def _infer_loop_boundary_shift( | |
| original_nodes: list[ast.AST], | |
| candidate_nodes: list[ast.AST], | |
| ) -> str | None: | |
| for original_node, candidate_node in zip(original_nodes, candidate_nodes): | |
| if not isinstance(original_node, ast.Call) or not isinstance(candidate_node, ast.Call): | |
| continue | |
| if not isinstance(original_node.func, ast.Name) or original_node.func.id != "range": | |
| continue | |
| if not isinstance(candidate_node.func, ast.Name) or candidate_node.func.id != "range": | |
| continue | |
| if len(original_node.args) != len(candidate_node.args): | |
| continue | |
| for original_arg, candidate_arg in zip(original_node.args, candidate_node.args): | |
| if _is_shifted_by_one(original_arg, candidate_arg): | |
| return "loop_boundary_shift" | |
| return None | |
| def _infer_slice_boundary_corruption( | |
| original_nodes: list[ast.AST], | |
| candidate_nodes: list[ast.AST], | |
| ) -> str | None: | |
| for original_node, candidate_node in zip(original_nodes, candidate_nodes): | |
| if not isinstance(original_node, ast.Slice) or not isinstance(candidate_node, ast.Slice): | |
| continue | |
| if original_node.lower is not None and candidate_node.lower is not None: | |
| if _is_shifted_by_one(original_node.lower, candidate_node.lower): | |
| return "slice_boundary_corruption" | |
| if original_node.upper is not None and candidate_node.upper is not None: | |
| if _is_shifted_by_one(original_node.upper, candidate_node.upper): | |
| return "slice_boundary_corruption" | |
| return None | |
| def _infer_condition_negation( | |
| original_nodes: list[ast.AST], | |
| candidate_nodes: list[ast.AST], | |
| ) -> str | None: | |
| for original_node, candidate_node in zip(original_nodes, candidate_nodes): | |
| if not isinstance(original_node, ast.If) or not isinstance(candidate_node, ast.If): | |
| continue | |
| if ( | |
| isinstance(candidate_node.test, ast.UnaryOp) | |
| and isinstance(candidate_node.test.op, ast.Not) | |
| and ast.dump(candidate_node.test.operand) == ast.dump(original_node.test) | |
| ): | |
| return "condition_negation" | |
| return None | |
| def _infer_wrong_operator(original_nodes: list[ast.AST], candidate_nodes: list[ast.AST]) -> str | None: | |
| for original_node, candidate_node in zip(original_nodes, candidate_nodes): | |
| if isinstance(original_node, ast.Compare) and isinstance(candidate_node, ast.Compare): | |
| if original_node.ops and candidate_node.ops and type(original_node.ops[0]) is not type(candidate_node.ops[0]): | |
| return "wrong_operator" | |
| if isinstance(original_node, ast.BinOp) and isinstance(candidate_node, ast.BinOp): | |
| if type(original_node.op) is not type(candidate_node.op): | |
| return "wrong_operator" | |
| return None | |
| def _infer_off_by_one(original_nodes: list[ast.AST], candidate_nodes: list[ast.AST]) -> str | None: | |
| for original_node, candidate_node in zip(original_nodes, candidate_nodes): | |
| if not isinstance(original_node, ast.Constant) or not isinstance(candidate_node, ast.Constant): | |
| continue | |
| if isinstance(original_node.value, bool) or isinstance(candidate_node.value, bool): | |
| continue | |
| if isinstance(original_node.value, int) and isinstance(candidate_node.value, int): | |
| if abs(candidate_node.value - original_node.value) == 1: | |
| return "off_by_one" | |
| return None | |
| def _is_shifted_by_one(original_node: ast.AST, candidate_node: ast.AST) -> bool: | |
| if not isinstance(candidate_node, ast.BinOp): | |
| return False | |
| if ast.dump(candidate_node.left) != ast.dump(original_node): | |
| return False | |
| if not isinstance(candidate_node.right, ast.Constant) or candidate_node.right.value != 1: | |
| return False | |
| return isinstance(candidate_node.op, (ast.Add, ast.Sub)) | |