sohojoe commited on
Commit
09ede70
1 Parent(s): d1b5501

fix start up issues; should be more stable

Browse files
Files changed (3) hide show
  1. app.py +67 -65
  2. charles_actor.py +13 -8
  3. streamlit_av_queue.py +1 -1
app.py CHANGED
@@ -1,6 +1,7 @@
1
  import asyncio
2
  from collections import deque
3
  import os
 
4
  import threading
5
  import time
6
  import traceback
@@ -16,6 +17,7 @@ from sample_utils.turn import get_ice_servers
16
  import json
17
  from typing import List
18
  import subprocess
 
19
 
20
  st.set_page_config(layout="wide")
21
 
@@ -27,25 +29,50 @@ load_dotenv()
27
 
28
  webrtc_ctx = None
29
 
30
- # Initialize Ray
31
- import ray
32
  def init_ray():
33
  try:
34
  subprocess.check_output(["ray", "start", "--head"])
35
  except Exception as e:
36
- print (e)
37
- # Try to connect to a running Ray cluster
38
- ray_address = os.getenv('RAY_ADDRESS')
39
- if ray_address:
40
- ray.init(ray_address, namespace="project_charles")
41
- else:
42
- ray.init(namespace="project_charles")
43
-
44
-
45
- if not ray.is_initialized():
46
- init_ray()
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
47
 
48
  async def main():
 
 
 
 
 
 
 
 
 
 
49
  st.title("Project Charles")
50
 
51
  col1, col2 = st.columns(2)
@@ -58,26 +85,22 @@ async def main():
58
  looking = st.checkbox("Look", value=False)
59
  charles_actor_debug_output = st.empty()
60
  environment_state_ouput = st.empty()
61
- with nested_col3:
62
- if st.button('Reboot Charles'):
63
- st.write('Killing RAY...')
64
- subprocess.check_output(["ray", "start", "--head"])
65
- st.write('Restarting RAY...')
66
- init_ray()
67
- charles_actor = None
68
- st.write('Reboot Charles')
69
 
70
  with col2:
71
- if "streamlit_av_queue" not in st.session_state:
72
- from streamlit_av_queue import StreamlitAVQueue
73
- st.session_state.streamlit_av_queue = StreamlitAVQueue()
74
-
75
  playing = st.checkbox("Playing", value=True)
76
  webrtc_ctx = webrtc_streamer(
77
  key="charles",
78
  desired_playing_state=playing,
79
- queued_audio_frames_callback=st.session_state.streamlit_av_queue.queued_audio_frames_callback,
80
- queued_video_frames_callback=st.session_state.streamlit_av_queue.queued_video_frames_callback,
81
  mode=WebRtcMode.SENDRECV,
82
  media_stream_constraints={
83
  "video": True,
@@ -97,49 +120,30 @@ async def main():
97
  if not webrtc_ctx.state.playing:
98
  exit
99
 
100
- from charles_actor import CharlesActor
101
- charles_actor = None
102
-
103
-
104
  try:
105
  while True:
106
- if "streamlit_av_queue" in st.session_state:
107
- st.session_state.streamlit_av_queue.set_looking_listening(looking, listening)
108
  if not webrtc_ctx.state.playing:
109
  system_one_audio_status.write("Camera has stopped.")
110
  await asyncio.sleep(0.1)
111
  continue
112
  if charles_actor is None:
113
- try:
114
- charles_actor = ray.get_actor("CharlesActor")
115
- system_one_audio_status.write("Charles is here.")
116
- except ValueError as e:
117
- if 'charles_actor_proc' in locals() or 'charles_actor_proc' in globals() and charles_actor_proc is not None:
118
- system_one_audio_status.write("Waiting for instance of Charles...")
119
- else:
120
- await asyncio.sleep(0.5) # give ray a chance to start
121
- charles_actor_proc = subprocess.Popen(["python3", "charles_actor.py"])
122
- system_one_audio_status.write("Creating an instance of Charles...")
123
- await asyncio.sleep(0.5)
124
- # import platform
125
- # print (f"platform:{platform.system()}")
126
- # if platform.system() == 'Linux':
127
- # charles_actor_proc = subprocess.Popen(["python3", "charles_actor.py"])
128
- # system_one_audio_status.write("Creating Charles.")
129
- # else:
130
- # system_one_audio_status.write("Charles is sleeping.")
131
- pass
132
- if charles_actor is not None:
133
- try:
134
- # new_environment_state = await charles_actor.get_environment_state.remote()
135
- # environment_state_ouput.markdown(f"{new_environment_state}")
136
- charles_debug_str = await charles_actor.get_charles_actor_debug_output.remote()
137
- charles_actor_debug_output.markdown(charles_debug_str)
138
- state = await charles_actor.get_state.remote()
139
- system_one_audio_status.write(state)
140
- except Exception as e:
141
- # assume we disconnected
142
- charles_actor = None
143
  await asyncio.sleep(0.1)
144
 
145
  except Exception as e:
@@ -155,7 +159,5 @@ if __name__ == "__main__":
155
  if webrtc_ctx is not None:
156
  del webrtc_ctx
157
  webrtc_ctx = None
158
- if "streamlit_av_queue" in st.session_state:
159
- del st.session_state.streamlit_av_queue
160
  finally:
161
  pass
 
1
  import asyncio
2
  from collections import deque
3
  import os
4
+ import sys
5
  import threading
6
  import time
7
  import traceback
 
17
  import json
18
  from typing import List
19
  import subprocess
20
+ import ray
21
 
22
  st.set_page_config(layout="wide")
23
 
 
29
 
30
  webrtc_ctx = None
31
 
32
+
33
+ @st.cache_resource
34
  def init_ray():
35
  try:
36
  subprocess.check_output(["ray", "start", "--head"])
37
  except Exception as e:
38
+ print (f"app.py init_ray: {e}")
39
+ # Connect to a running Ray cluster
40
+ while not ray.is_initialized():
41
+ time.sleep(0.1)
42
+ ray_address = os.getenv('RAY_ADDRESS')
43
+ if ray_address:
44
+ ray.init(ray_address, namespace="project_charles")
45
+ else:
46
+ ray.init(namespace="project_charles")
47
+
48
+ @st.cache_resource
49
+ def get_charles_actor():
50
+ charles_actor_instance = None
51
+ charles_actor_proc = subprocess.Popen([sys.executable, "charles_actor.py"])
52
+ while charles_actor_instance == None:
53
+ try:
54
+ charles_actor_instance = ray.get_actor("CharlesActor")
55
+ except ValueError as e:
56
+ time.sleep(0.1) # give the subprocess a chance to start
57
+ return charles_actor_instance
58
+
59
+ @st.cache_resource
60
+ def get_streamlit_av_queue():
61
+ from streamlit_av_queue import StreamlitAVQueue
62
+ streamlit_av_queue_instance = StreamlitAVQueue()
63
+ return streamlit_av_queue_instance
64
 
65
  async def main():
66
+ # Initialize Ray
67
+ ray_status = init_ray()
68
+ while not ray.is_initialized():
69
+ await asyncio.sleep(0.1)
70
+ # get ray actors
71
+ charles_actor = get_charles_actor()
72
+ await asyncio.sleep(0.1)
73
+ streamlit_av_queue = get_streamlit_av_queue()
74
+ await asyncio.sleep(0.1)
75
+
76
  st.title("Project Charles")
77
 
78
  col1, col2 = st.columns(2)
 
85
  looking = st.checkbox("Look", value=False)
86
  charles_actor_debug_output = st.empty()
87
  environment_state_ouput = st.empty()
88
+ # with nested_col3:
89
+ # if st.button('Reboot Charles'):
90
+ # st.write('Killing RAY...')
91
+ # subprocess.check_output(["ray", "start", "--head"])
92
+ # st.write('Restarting RAY...')
93
+ # init_ray()
94
+ # charles_actor = None
95
+ # st.write('Reboot Charles')
96
 
97
  with col2:
 
 
 
 
98
  playing = st.checkbox("Playing", value=True)
99
  webrtc_ctx = webrtc_streamer(
100
  key="charles",
101
  desired_playing_state=playing,
102
+ queued_audio_frames_callback=streamlit_av_queue.queued_audio_frames_callback,
103
+ queued_video_frames_callback=streamlit_av_queue.queued_video_frames_callback,
104
  mode=WebRtcMode.SENDRECV,
105
  media_stream_constraints={
106
  "video": True,
 
120
  if not webrtc_ctx.state.playing:
121
  exit
122
 
 
 
 
 
123
  try:
124
  while True:
 
 
125
  if not webrtc_ctx.state.playing:
126
  system_one_audio_status.write("Camera has stopped.")
127
  await asyncio.sleep(0.1)
128
  continue
129
  if charles_actor is None:
130
+ system_one_audio_status.write("Looking for Charles actor...")
131
+ charles_actor = get_charles_actor()
132
+ if charles_actor is None:
133
+ await asyncio.sleep(0.1)
134
+ continue
135
+ system_one_audio_status.write("Found Charles actor.")
136
+ try:
137
+ # new_environment_state = await charles_actor.get_environment_state.remote()
138
+ # environment_state_ouput.markdown(f"{new_environment_state}")
139
+ streamlit_av_queue.set_looking_listening(looking, listening)
140
+ charles_debug_str = await charles_actor.get_charles_actor_debug_output.remote()
141
+ charles_actor_debug_output.markdown(charles_debug_str)
142
+ state = await charles_actor.get_state.remote()
143
+ system_one_audio_status.write(state)
144
+ except Exception as e:
145
+ # assume we disconnected
146
+ charles_actor = None
 
 
 
 
 
 
 
 
 
 
 
 
 
147
  await asyncio.sleep(0.1)
148
 
149
  except Exception as e:
 
159
  if webrtc_ctx is not None:
160
  del webrtc_ctx
161
  webrtc_ctx = None
 
 
162
  finally:
163
  pass
charles_actor.py CHANGED
@@ -7,6 +7,7 @@ from clip_transform import CLIPTransform
7
  from environment_state_actor import EnvironmentStateActor, EnvironmentState
8
  from agent_state_actor import AgentStateActor
9
  import asyncio
 
10
 
11
  @ray.remote
12
  class CharlesActor:
@@ -186,20 +187,24 @@ class CharlesActor:
186
  loops+=1
187
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. {vector_debug}"
188
 
189
- async def main():
190
- if not ray.is_initialized():
191
- # Try to connect to a running Ray cluster
 
 
 
 
 
192
  ray_address = os.getenv('RAY_ADDRESS')
193
- import subprocess
194
- try:
195
- subprocess.check_output(["ray", "start", "--head"])
196
- except Exception as e:
197
- print (e)
198
  if ray_address:
199
  ray.init(ray_address, namespace="project_charles")
200
  else:
201
  ray.init(namespace="project_charles")
202
 
 
 
 
 
203
  charles_actor = CharlesActor.options(
204
  name="CharlesActor",
205
  get_if_exists=True,
 
7
  from environment_state_actor import EnvironmentStateActor, EnvironmentState
8
  from agent_state_actor import AgentStateActor
9
  import asyncio
10
+ import subprocess
11
 
12
  @ray.remote
13
  class CharlesActor:
 
187
  loops+=1
188
  self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}. loops per second: {loops/(time.time()-start_time):.2f}. {vector_debug}"
189
 
190
+ def init_ray():
191
+ try:
192
+ subprocess.check_output(["ray", "start", "--head"])
193
+ except Exception as e:
194
+ print (f"charles_actor.py init_ray: {e}")
195
+ # Connect to a running Ray cluster
196
+ while not ray.is_initialized():
197
+ time.sleep(0.1)
198
  ray_address = os.getenv('RAY_ADDRESS')
 
 
 
 
 
199
  if ray_address:
200
  ray.init(ray_address, namespace="project_charles")
201
  else:
202
  ray.init(namespace="project_charles")
203
 
204
+ async def main():
205
+ if not ray.is_initialized():
206
+ init_ray()
207
+
208
  charles_actor = CharlesActor.options(
209
  name="CharlesActor",
210
  get_if_exists=True,
streamlit_av_queue.py CHANGED
@@ -15,7 +15,7 @@ class StreamlitAVQueue:
15
  self._output_channels = 2
16
  self._audio_bit_rate = audio_bit_rate
17
  self._listening = True
18
- self._looking = True
19
  self._lock = threading.Lock()
20
  self.queue_actor = WebRtcAVQueueActor.options(
21
  name="WebRtcAVQueueActor",
 
15
  self._output_channels = 2
16
  self._audio_bit_rate = audio_bit_rate
17
  self._listening = True
18
+ self._looking = False
19
  self._lock = threading.Lock()
20
  self.queue_actor = WebRtcAVQueueActor.options(
21
  name="WebRtcAVQueueActor",