dai
fold instructions
25268db
import os
import openai
import numpy as np
from tempfile import NamedTemporaryFile
import copy
import shapely
from shapely.geometry import *
from shapely.affinity import *
from omegaconf import OmegaConf
from moviepy.editor import ImageSequenceClip
import gradio as gr
from lmp import LMP, LMPFGen
from sim import PickPlaceEnv, LMP_wrapper
from consts import ALL_BLOCKS, ALL_BOWLS
from md_logger import MarkdownLogger
default_open_ai_key = os.getenv('OPEN_AI_SECRET')
chain_of_thought_affix = ' with a step by step explanation'
ask_for_clarification_affix = ' or ask for clarification if you feel unclear'
class DemoRunner:
def __init__(self):
self._cfg = OmegaConf.to_container(OmegaConf.load('cfg.yaml'), resolve=True)
self._env = None
self._model_name = ''
self._md_logger = MarkdownLogger()
def make_LMP(self, env):
# LMP env wrapper
cfg = copy.deepcopy(self._cfg)
cfg['env'] = {
'init_objs': list(env.obj_name_to_id.keys()),
'coords': cfg['tabletop_coords']
}
for vs in cfg['lmps'].values():
vs['engine'] = self._model_name
LMP_env = LMP_wrapper(env, cfg)
# creating APIs that the LMPs can interact with
fixed_vars = {
'np': np
}
fixed_vars.update({
name: eval(name)
for name in shapely.geometry.__all__ + shapely.affinity.__all__
})
variable_vars = {
k: getattr(LMP_env, k)
for k in [
'get_bbox', 'get_obj_pos', 'get_color', 'is_obj_visible', 'denormalize_xy',
'put_first_on_second', 'get_obj_names',
'get_corner_name', 'get_side_name',
]
}
# variable_vars['say'] = lambda msg: self._md_logger.log_text(f'Robot says: "{msg}"')
variable_vars['say'] = lambda msg: self._md_logger.log_message(
f'{msg}')
# creating the function-generating LMP
lmp_fgen = LMPFGen(cfg['lmps']['fgen'], fixed_vars, variable_vars, self._md_logger)
# creating other low-level LMPs
variable_vars.update({
k: LMP(k, cfg['lmps'][k], lmp_fgen, fixed_vars, variable_vars, self._md_logger)
for k in ['parse_obj_name', 'parse_position', 'parse_question', 'transform_shape_pts']
})
# creating the LMP that deals w/ high-level language commands
lmp_tabletop_ui = LMP(
'tabletop_ui', cfg['lmps']['tabletop_ui'], lmp_fgen, fixed_vars, variable_vars, self._md_logger
)
return lmp_tabletop_ui
def setup(self, api_key, model_name, n_blocks, n_bowls):
openai.api_key = api_key
self._model_name = model_name
self._env = PickPlaceEnv(render=True, high_res=True, high_frame_rate=False)
list_idxs = np.random.choice(len(ALL_BLOCKS), size=max(n_blocks, n_bowls), replace=False)
block_list = [ALL_BLOCKS[i] for i in list_idxs[:n_blocks]]
bowl_list = [ALL_BOWLS[i] for i in list_idxs[:n_bowls]]
obj_list = block_list + bowl_list
self._env.reset(obj_list)
self._lmp_tabletop_ui = self.make_LMP(self._env)
info = '### Available Objects: \n- ' + '\n- '.join(obj_list)
img = self._env.get_camera_image()
return info, img
def run(self, instruction, history):
if self._env is None:
return 'Please run setup first!', None, history
self._env.cache_video = []
self._md_logger.clear()
try:
self._lmp_tabletop_ui(instruction, f'objects = {self._env.object_list}')
except Exception as e:
return f'Error: {e}', None, history
# Update chat messages
for message in self._md_logger.get_messages():
history.append((None, message))
if self._env.cache_video:
rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25)
video_file_name = NamedTemporaryFile(suffix='.mp4').name
rendered_clip.write_videofile(video_file_name, fps=25)
history.append((None, (video_file_name, )))
return self._md_logger.get_log(), self._env.get_camera_image(), history
def setup(api_key, model_name, n_blocks, n_bowls):
if not api_key:
return 'Please enter your OpenAI API key!', None
if n_blocks + n_bowls == 0:
return 'Please select at least one object!', None
demo_runner = DemoRunner()
info, img = demo_runner.setup(api_key, model_name, n_blocks, n_bowls)
welcome_message = 'How can I help you?'
return info, img, demo_runner, [(None, welcome_message)], None
def run(demo_runner, chat_history):
if demo_runner is None:
return 'Please run setup first!', None, None, chat_history, None
instruction = chat_history[-1][0]
return *demo_runner.run(instruction, chat_history), ''
def submit_chat(chat_message, history):
history += [[chat_message, None]]
return '', history
def add_cot(chat_messsage):
return chat_messsage.strip() + chain_of_thought_affix
def add_clarification(chat_message):
return chat_message.strip() + ask_for_clarification_affix
with open('README.md', 'r') as f:
for _ in range(12):
next(f)
readme_text = f.read()
with gr.Blocks() as demo:
state = gr.State(None)
with gr.Accordion('Readme', open=False):
gr.Markdown(readme_text)
gr.Markdown('# Interactive Demo')
with gr.Row():
with gr.Column():
with gr.Row():
inp_api_key = gr.Textbox(value=default_open_ai_key,
label='OpenAI API Key (this is not stored anywhere)', lines=1)
inp_model_name = gr.Dropdown(label='Model Name', choices=[
'text-davinci-003', 'code-davinci-002', 'text-davinci-002'], value='text-davinci-003')
with gr.Row():
inp_n_blocks = gr.Slider(label='Number of Blocks', minimum=0, maximum=5, value=3, step=1)
inp_n_bowls = gr.Slider(label='Number of Bowls', minimum=0, maximum=5, value=3, step=1)
btn_setup = gr.Button("Setup/Reset Simulation")
info_setup = gr.Markdown(label='Setup Info')
with gr.Row():
with gr.Column():
chat_box = gr.Chatbot()
inp_instruction = gr.Textbox(label='Instruction', lines=1)
examples = gr.Examples(
[
'stack two of the blocks',
'what color is the rightmost block?',
'arrange the blocks into figure 3',
'put blocks into non-matching bowls',
'swap the positions of one block and another',
],
inp_instruction,
)
btn_add_cot = gr.Button(f'+{chain_of_thought_affix} (chain-of-thought)')
btn_add_cla = gr.Button(
f'+{ask_for_clarification_affix} (conversation)')
btn_run = gr.Button("Run (this may take 30+ seconds)")
info_run = gr.Markdown(label='Generated Code')
with gr.Column():
img_setup = gr.Image(label='Current Simulation State')
# video_run = gr.Video(label='Most Recent Manipulation')
btn_setup.click(
setup,
inputs=[inp_api_key, inp_model_name, inp_n_blocks, inp_n_bowls],
outputs=[info_setup, img_setup, state, chat_box, info_run],
)
btn_add_cot.click(
add_cot,
inp_instruction,
inp_instruction,
)
btn_add_cla.click(
add_clarification,
inp_instruction,
inp_instruction,
)
btn_run.click(
submit_chat,
[inp_instruction, chat_box],
[inp_instruction, chat_box],
).then(
run,
inputs=[state, chat_box],
outputs=[info_run, img_setup, chat_box, inp_instruction],
)
inp_instruction.submit(
submit_chat,
[inp_instruction, chat_box],
[inp_instruction, chat_box],
).then(
run,
inputs=[state, chat_box],
outputs=[info_run, img_setup, chat_box, inp_instruction],
)
if __name__ == '__main__':
print(gr.__version__)
demo.queue(concurrency_count=10)
demo.launch()