Spaces:
Sleeping
Sleeping
File size: 2,378 Bytes
09321b6 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 |
import re
from ..subprocess_code_interpreter import SubprocessCodeInterpreter
class R(SubprocessCodeInterpreter):
file_extension = 'r'
proper_name = 'R'
def __init__(self):
super().__init__()
self.start_cmd = 'R -q --vanilla' # Start R in quiet and vanilla mode
def preprocess_code(self, code):
"""
Add active line markers
Wrap in a tryCatch for better error handling in R
Add end of execution marker
"""
lines = code.split('\n')
processed_lines = []
for i, line in enumerate(lines, 1):
# Add active line print
processed_lines.append(f'cat("##active_line{i}##\\n");{line}')
# Join lines to form the processed code
processed_code = '\n'.join(processed_lines)
# Wrap in a tryCatch for error handling and add end of execution marker
processed_code = f"""
tryCatch({{
{processed_code}
}}, error=function(e){{
cat("## execution_error ##\\n", conditionMessage(e), "\\n");
}})
cat("## end_of_execution ##\\n");
"""
# Count the number of lines of processed_code
# (R echoes all code back for some reason, but we can skip it if we track this!)
self.code_line_count = len(processed_code.split('\n')) - 1
return processed_code
def line_postprocessor(self, line):
# If the line count attribute is set and non-zero, decrement and skip the line
if hasattr(self, 'code_line_count') and self.code_line_count > 0:
self.code_line_count -= 1
return None
if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*|\s*>\s*|\s*\+\s*|\s*)$', line):
return None
if 'R version' in line: # Startup message
return None
if line.strip().startswith('[1] "') and line.endswith(
'"'): # For strings, trim quotation marks
return line[5:-1].strip()
if line.strip().startswith(
'[1]'): # Normal R output prefix for non-string outputs
return line[4:].strip()
return line
def detect_active_line(self, line):
if '##active_line' in line:
return int(line.split('##active_line')[1].split('##')[0])
return None
def detect_end_of_execution(self, line):
return '##end_of_execution##' in line or '## execution_error ##' in line
|