|
from ..subprocess_code_interpreter import SubprocessCodeInterpreter |
|
import re |
|
|
|
class R(SubprocessCodeInterpreter): |
|
file_extension = "r" |
|
proper_name = "R" |
|
|
|
def __init__(self): |
|
super().__init__() |
|
self.start_cmd = "R -q --vanilla" |
|
|
|
def preprocess_code(self, code): |
|
""" |
|
Add active line markers |
|
Wrap in a tryCatch for better error handling in R |
|
Add end of execution marker |
|
""" |
|
|
|
lines = code.split("\n") |
|
processed_lines = [] |
|
|
|
for i, line in enumerate(lines, 1): |
|
|
|
processed_lines.append(f'cat("## active_line {i} ##\\n");{line}') |
|
|
|
|
|
processed_code = "\n".join(processed_lines) |
|
|
|
|
|
processed_code = f""" |
|
tryCatch({{ |
|
{processed_code} |
|
}}, error=function(e){{ |
|
cat("## execution_error ##\\n", conditionMessage(e), "\\n"); |
|
}}) |
|
cat("## end_of_execution ##\\n"); |
|
""" |
|
|
|
|
|
self.code_line_count = len(processed_code.split("\n")) - 1 |
|
|
|
return processed_code |
|
|
|
def line_postprocessor(self, line): |
|
|
|
if hasattr(self, "code_line_count") and self.code_line_count > 0: |
|
self.code_line_count -= 1 |
|
return None |
|
|
|
if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*|\s*>\s*|\s*\+\s*|\s*)$', line): |
|
return None |
|
if "R version" in line: |
|
return None |
|
if line.strip().startswith("[1] \"") and line.endswith("\""): |
|
return line[5:-1].strip() |
|
if line.strip().startswith("[1]"): |
|
return line[4:].strip() |
|
|
|
return line |
|
|
|
def detect_active_line(self, line): |
|
if "## active_line " in line: |
|
return int(line.split("## active_line ")[1].split(" ##")[0]) |
|
return None |
|
|
|
def detect_end_of_execution(self, line): |
|
return "## end_of_execution ##" in line or "## execution_error ##" in line |
|
|