GenSim / app.py
LeroyWaa's picture
update
d3f2fdb
raw
history blame
4.02 kB
import openai
import numpy as np
from tempfile import NamedTemporaryFile
import copy
import shapely
from shapely.geometry import *
from shapely.affinity import *
from omegaconf import OmegaConf
from moviepy.editor import ImageSequenceClip
import gradio as gr
from consts import ALL_BLOCKS, ALL_BOWLS
from md_logger import MarkdownLogger
import numpy as np
import os
import hydra
import random
import re
import openai
import IPython
import time
import pybullet as p
import traceback
from datetime import datetime
from pprint import pprint
import cv2
import re
import random
import json
from gensim.agent import Agent
from gensim.critic import Critic
from gensim.sim_runner import SimulationRunner
from gensim.memory import Memory
from gensim.utils import set_gpt_model, clear_messages
class DemoRunner:
def __init__(self):
self._env = None
def setup(self, api_key):
openai.api_key = api_key
cfg['model_output_dir'] = 'temp'
cfg['prompt_folder'] = 'topdown_task_generation_prompt_simple_singleprompt'
set_gpt_model(cfg['gpt_model'])
cfg['load_memory'] = True
cfg['task_description_candidate_num'] = 10
cfg['record']['save_video'] = True
memory = Memory(cfg)
agent = Agent(cfg, memory)
critic = Critic(cfg, memory)
self.simulation_runner = SimulationRunner(cfg, agent, critic, memory)
info = '### Build'
img = np.zeros((720, 640, 0))
return info, img
def run(self, instruction):
cfg['target_task_name'] = instruction
self._env.cache_video = []
self._md_logger.clear()
try:
self.simulation_runner.task_creation()
self.simulation_runner.simulate_task()
except Exception as e:
return f'Error: {e}', None, None
video_file_name = None
if self._env.cache_video:
rendered_clip = ImageSequenceClip(self._env.cache_video, fps=25)
video_file_name = NamedTemporaryFile(suffix='.mp4').name
rendered_clip.write_videofile(video_file_name, fps=25)
return self.simulation_runner.chat_log, self.simulation_runner.env.curr_video, video_file_name
def setup(api_key):
if not api_key:
return 'Please enter your OpenAI API key!', None, None
demo_runner = DemoRunner()
info, img = demo_runner.setup(api_key)
return info, img, demo_runner
def run(instruction, demo_runner):
if demo_runner is None:
return 'Please run setup first!', None, None
return demo_runner.run(instruction)
if __name__ == '__main__':
with open('README.md', 'r') as f:
for _ in range(12):
next(f)
readme_text = f.read()
with gr.Blocks() as demo:
state = gr.State(None)
gr.Markdown(readme_text)
gr.Markdown('# Interactive Demo')
with gr.Row():
with gr.Column():
with gr.Row():
inp_api_key = gr.Textbox(label='OpenAI API Key (this is not stored anywhere)', lines=1)
btn_setup = gr.Button("Setup/Reset Simulation")
info_setup = gr.Markdown(label='Setup Info')
with gr.Column():
img_setup = gr.Image(label='Current Simulation')
with gr.Row():
with gr.Column():
inp_instruction = gr.Textbox(label='Task Name', lines=1)
btn_run = gr.Button("Run (this may take 30+ seconds)")
info_run = gr.Markdown(label='Generated Code')
with gr.Column():
video_run = gr.Video(label='Video of Last Instruction')
btn_setup.click(
setup,
inputs=[inp_api_key],
outputs=[info_setup, img_setup, state]
)
btn_run.click(
run,
inputs=[inp_instruction, state],
outputs=[info_run, img_setup, video_run]
)
demo.queue().launch(show_error=True)