import os import openai import numpy as np from tempfile import NamedTemporaryFile import copy import shapely from shapely.geometry import * from shapely.affinity import * from omegaconf import OmegaConf from moviepy.editor import ImageSequenceClip import gradio as gr from lmp import LMP, LMPFGen from sim import PickPlaceEnv, LMP_wrapper from consts import ALL_BLOCKS, ALL_BOWLS from md_logger import MarkdownLogger default_open_ai_key = os.getenv('OPEN_AI_SECRET') chain_of_thought_affix = ' with a step by step explanation' ask_for_clarification_affix = ' or ask for clarification if you feel unclear' class DemoRunner: def __init__(self): self._cfg = OmegaConf.to_container(OmegaConf.load('cfg.yaml'), resolve=True) self._env = None self._model_name = '' self._md_logger = MarkdownLogger() def make_LMP(self, env): # LMP env wrapper cfg = copy.deepcopy(self._cfg) cfg['env'] = { 'init_objs': list(env.obj_name_to_id.keys()), 'coords': cfg['tabletop_coords'] } for vs in cfg['lmps'].values(): vs['engine'] = self._model_name LMP_env = LMP_wrapper(env, cfg) # creating APIs that the LMPs can interact with fixed_vars = { 'np': np } fixed_vars.update({ name: eval(name) for name in shapely.geometry.__all__ + shapely.affinity.__all__ }) variable_vars = { k: getattr(LMP_env, k) for k in [ 'get_bbox', 'get_obj_pos', 'get_color', 'is_obj_visible', 'denormalize_xy', 'put_first_on_second', 'get_obj_names', 'get_corner_name', 'get_side_name', ] } # variable_vars['say'] = lambda msg: self._md_logger.log_text(f'Robot says: "{msg}"') variable_vars['say'] = lambda msg: self._md_logger.log_message( f'{msg}') # creating the function-generating LMP lmp_fgen = LMPFGen(cfg['lmps']['fgen'], fixed_vars, variable_vars, self._md_logger) # creating other low-level LMPs variable_vars.update({ k: LMP(k, cfg['lmps'][k], lmp_fgen, fixed_vars, variable_vars, self._md_logger) for k in ['parse_obj_name', 'parse_position', 'parse_question', 'transform_shape_pts'] }) # creating the LMP that deals w/ high-level language commands lmp_tabletop_ui = LMP( 'tabletop_ui', cfg['lmps']['tabletop_ui'], lmp_fgen, fixed_vars, variable_vars, self._md_logger ) return lmp_tabletop_ui def setup(self, api_key, model_name, n_blocks, n_bowls): openai.api_key = api_key self._model_name = model_name self._env = PickPlaceEnv(render=True, high_res=True, high_frame_rate=False) list_idxs = np.random.choice(len(ALL_BLOCKS), size=max(n_blocks, n_bowls), replace=False) block_list = [ALL_BLOCKS[i] for i in list_idxs[:n_blocks]] bowl_list = [ALL_BOWLS[i] for i in list_idxs[:n_bowls]] obj_list = block_list + bowl_list self._env.reset(obj_list) self._lmp_tabletop_ui = self.make_LMP(self._env) info = '### Available Objects: \n- ' + '\n- '.join(obj_list) img = self._env.get_camera_image() return info, img def run(self, instruction, history): if self._env is None: return 'Please run setup first!', None, history self._env.cache_video = [] self._md_logger.clear() try: self._lmp_tabletop_ui(instruction, f'objects = {self._env.object_list}') except Exception as e: return f'Error: {e}', None, history # Update chat messages for message in self._md_logger.get_messages(): history.append((None, message)) if self._env.cache_video: rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25) video_file_name = NamedTemporaryFile(suffix='.mp4').name rendered_clip.write_videofile(video_file_name, fps=25) history.append((None, (video_file_name, ))) return self._md_logger.get_log(), self._env.get_camera_image(), history def setup(api_key, model_name, n_blocks, n_bowls): if not api_key: return 'Please enter your OpenAI API key!', None if n_blocks + n_bowls == 0: return 'Please select at least one object!', None demo_runner = DemoRunner() info, img = demo_runner.setup(api_key, model_name, n_blocks, n_bowls) welcome_message = 'How can I help you?' return info, img, demo_runner, [(None, welcome_message)], None def run(demo_runner, chat_history): if demo_runner is None: return 'Please run setup first!', None, None, chat_history, None instruction = chat_history[-1][0] return *demo_runner.run(instruction, chat_history), '' def submit_chat(chat_message, history): history += [[chat_message, None]] return '', history def add_cot(chat_messsage): return chat_messsage.strip() + chain_of_thought_affix def add_clarification(chat_message): return chat_message.strip() + ask_for_clarification_affix with open('README.md', 'r') as f: for _ in range(12): next(f) readme_text = f.read() with gr.Blocks() as demo: state = gr.State(None) with gr.Accordion('Readme', open=False): gr.Markdown(readme_text) gr.Markdown('# Interactive Demo') with gr.Row(): with gr.Column(): with gr.Row(): inp_api_key = gr.Textbox(value=default_open_ai_key, label='OpenAI API Key (this is not stored anywhere)', lines=1) inp_model_name = gr.Dropdown(label='Model Name', choices=[ 'text-davinci-003', 'code-davinci-002', 'text-davinci-002'], value='text-davinci-003') with gr.Row(): inp_n_blocks = gr.Slider(label='Number of Blocks', minimum=0, maximum=5, value=3, step=1) inp_n_bowls = gr.Slider(label='Number of Bowls', minimum=0, maximum=5, value=3, step=1) btn_setup = gr.Button("Setup/Reset Simulation") info_setup = gr.Markdown(label='Setup Info') with gr.Row(): with gr.Column(): chat_box = gr.Chatbot() inp_instruction = gr.Textbox(label='Instruction', lines=1) examples = gr.Examples( [ 'stack two of the blocks', 'what color is the rightmost block?', 'arrange the blocks into figure 3', 'put blocks into non-matching bowls', 'swap the positions of one block and another', ], inp_instruction, ) btn_add_cot = gr.Button(f'+{chain_of_thought_affix} (chain-of-thought)') btn_add_cla = gr.Button( f'+{ask_for_clarification_affix} (conversation)') btn_run = gr.Button("Run (this may take 30+ seconds)") info_run = gr.Markdown(label='Generated Code') with gr.Column(): img_setup = gr.Image(label='Current Simulation State') # video_run = gr.Video(label='Most Recent Manipulation') btn_setup.click( setup, inputs=[inp_api_key, inp_model_name, inp_n_blocks, inp_n_bowls], outputs=[info_setup, img_setup, state, chat_box, info_run], ) btn_add_cot.click( add_cot, inp_instruction, inp_instruction, ) btn_add_cla.click( add_clarification, inp_instruction, inp_instruction, ) btn_run.click( submit_chat, [inp_instruction, chat_box], [inp_instruction, chat_box], ).then( run, inputs=[state, chat_box], outputs=[info_run, img_setup, chat_box, inp_instruction], ) inp_instruction.submit( submit_chat, [inp_instruction, chat_box], [inp_instruction, chat_box], ).then( run, inputs=[state, chat_box], outputs=[info_run, img_setup, chat_box, inp_instruction], ) if __name__ == '__main__': print(gr.__version__) demo.queue(concurrency_count=10) demo.launch()