import re from ..subprocess_code_interpreter import SubprocessCodeInterpreter class R(SubprocessCodeInterpreter): file_extension = 'r' proper_name = 'R' def __init__(self): super().__init__() self.start_cmd = 'R -q --vanilla' # Start R in quiet and vanilla mode def preprocess_code(self, code): """ Add active line markers Wrap in a tryCatch for better error handling in R Add end of execution marker """ lines = code.split('\n') processed_lines = [] for i, line in enumerate(lines, 1): # Add active line print processed_lines.append(f'cat("##active_line{i}##\\n");{line}') # Join lines to form the processed code processed_code = '\n'.join(processed_lines) # Wrap in a tryCatch for error handling and add end of execution marker processed_code = f""" tryCatch({{ {processed_code} }}, error=function(e){{ cat("## execution_error ##\\n", conditionMessage(e), "\\n"); }}) cat("## end_of_execution ##\\n"); """ # Count the number of lines of processed_code # (R echoes all code back for some reason, but we can skip it if we track this!) self.code_line_count = len(processed_code.split('\n')) - 1 return processed_code def line_postprocessor(self, line): # If the line count attribute is set and non-zero, decrement and skip the line if hasattr(self, 'code_line_count') and self.code_line_count > 0: self.code_line_count -= 1 return None if re.match(r'^(\s*>>>\s*|\s*\.\.\.\s*|\s*>\s*|\s*\+\s*|\s*)$', line): return None if 'R version' in line: # Startup message return None if line.strip().startswith('[1] "') and line.endswith( '"'): # For strings, trim quotation marks return line[5:-1].strip() if line.strip().startswith( '[1]'): # Normal R output prefix for non-string outputs return line[4:].strip() return line def detect_active_line(self, line): if '##active_line' in line: return int(line.split('##active_line')[1].split('##')[0]) return None def detect_end_of_execution(self, line): return '##end_of_execution##' in line or '## execution_error ##' in line