sohojoe commited on
Commit
ad67495
1 Parent(s): 62a21bd

Charles running between two applications

Browse files
.vscode/launch.json CHANGED
@@ -1,6 +1,17 @@
1
  {
2
- "version": "0.1.0",
 
 
 
3
  "configurations": [
 
 
 
 
 
 
 
 
4
  {
5
  "name": "debug streamlit",
6
  "type": "python",
@@ -11,6 +22,6 @@
11
  // "app.py"
12
  "d_app.py"
13
  ]
14
- }
15
  ]
16
- }
 
1
  {
2
+ // Use IntelliSense to learn about possible attributes.
3
+ // Hover to view descriptions of existing attributes.
4
+ // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
5
+ "version": "0.2.0",
6
  "configurations": [
7
+ {
8
+ "name": "Python: Current File",
9
+ "type": "python",
10
+ "request": "launch",
11
+ "program": "${file}",
12
+ "console": "integratedTerminal"
13
+ // "justMyCode": true
14
+ },
15
  {
16
  "name": "debug streamlit",
17
  "type": "python",
 
22
  // "app.py"
23
  "d_app.py"
24
  ]
25
+ }
26
  ]
27
+ }
charles_actor.py ADDED
@@ -0,0 +1,120 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import ray
2
+ import time
3
+ import asyncio
4
+ import os
5
+
6
+ @ray.remote
7
+ class CharlesActor:
8
+ def __init__(self):
9
+ self._needs_init = True
10
+ self._system_one_audio_history_output = ""
11
+ self._state = "Initializing"
12
+
13
+ def get_state(self):
14
+ return self._state
15
+
16
+
17
+ def get_system_one_audio_history_output(self):
18
+ return self._system_one_audio_history_output
19
+
20
+ async def _initalize_resources(self):
21
+ # Initialize resources
22
+ print("000")
23
+ from streamlit_av_queue import StreamlitAVQueue
24
+ self._streamlit_av_queue = StreamlitAVQueue()
25
+
26
+ print("002")
27
+ from speech_to_text_vosk import SpeechToTextVosk
28
+ self._speech_to_text_vosk = SpeechToTextVosk()
29
+
30
+ from chat_pipeline import ChatPipeline
31
+ self._chat_pipeline = ChatPipeline()
32
+ await self._chat_pipeline.start()
33
+
34
+ self._debug_queue = [
35
+ # "hello, how are you today?",
36
+ # "hmm, interesting, tell me more about that.",
37
+ ]
38
+ print("010")
39
+ self._needs_init = True
40
+ self._state = "Initialized"
41
+
42
+ async def start(self):
43
+ if self._needs_init:
44
+ await self._initalize_resources()
45
+
46
+ system_one_audio_history = []
47
+
48
+ self._state = "Waiting for input"
49
+ total_video_frames = 0
50
+ total_audio_frames = 0
51
+ loops = 0
52
+
53
+ while True:
54
+ if len(self._debug_queue) > 0:
55
+ prompt = self._debug_queue.pop(0)
56
+ await self._chat_pipeline.enqueue(prompt)
57
+ audio_frames = self._streamlit_av_queue.get_audio_frames()
58
+ if len(audio_frames) > 0:
59
+ total_audio_frames += len(audio_frames)
60
+ # Concatenate all audio frames into a single buffer
61
+ audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
62
+ self._speech_to_text_vosk.add_speech_bytes(audio_buffer)
63
+ prompt, speaker_finished = self._speech_to_text_vosk.get_text()
64
+ if speaker_finished and len(prompt) > 0:
65
+ print(f"Prompt: {prompt}")
66
+ system_one_audio_history.append(prompt)
67
+ if len(system_one_audio_history) > 10:
68
+ system_one_audio_history = system_one_audio_history[-10:]
69
+ table_content = "| System 1 Audio History |\n| --- |\n"
70
+ table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
71
+ self._system_one_audio_history_output = table_content
72
+ await self._chat_pipeline.enqueue(prompt)
73
+ video_frames = self._streamlit_av_queue.get_video_frames()
74
+ if len(video_frames) > 0:
75
+ total_video_frames += len(video_frames)
76
+ # for video_frame in video_frames:
77
+ # system_one_video_output.image(video_frame.to_ndarray())
78
+ # pass
79
+
80
+ # update debug output
81
+ if (total_video_frames >0 or total_audio_frames > 0):
82
+ self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames"
83
+ await asyncio.sleep(0.1)
84
+ loops+=1
85
+ self._state = f"Processed {total_video_frames} video frames and {total_audio_frames} audio frames, loops: {loops}"
86
+
87
+ if __name__ == "__main__":
88
+ if not ray.is_initialized():
89
+ # Try to connect to a running Ray cluster
90
+ ray_address = os.getenv('RAY_ADDRESS')
91
+ if ray_address:
92
+ ray.init(ray_address, namespace="project_charles")
93
+ else:
94
+ ray.init(namespace="project_charles")
95
+
96
+ charles_actor = CharlesActor.options(
97
+ name="CharlesActor",
98
+ get_if_exists=True,
99
+ ).remote()
100
+ future = charles_actor.start.remote()
101
+
102
+ try:
103
+ while True:
104
+ ready, _ = ray.wait([future], timeout=0)
105
+ if ready:
106
+ # The start method has terminated. You can fetch the result (if any) with ray.get().
107
+ # If the method raised an exception, it will be re-raised here.
108
+ try:
109
+ result = ray.get(future)
110
+ print(f"The start method has terminated with result: {result}")
111
+ except Exception as e:
112
+ print(f"The start method raised an exception: {e}")
113
+ break
114
+ else:
115
+ # The start method is still running. You can poll for debug information here.
116
+ time.sleep(1)
117
+ state = charles_actor.get_state.remote()
118
+ print(f"Charles is in state: {ray.get(state)}")
119
+ except KeyboardInterrupt:
120
+ print("Script was manually terminated")
d_app.py CHANGED
@@ -27,7 +27,12 @@ webrtc_ctx = None
27
  # Initialize Ray
28
  import ray
29
  if not ray.is_initialized():
30
- ray.init()
 
 
 
 
 
31
 
32
 
33
 
@@ -46,35 +51,12 @@ async def main():
46
  system_one_audio_history_output = st.empty()
47
 
48
  # Initialize resources if not already done
49
- print("000")
50
  system_one_audio_status.write("Initializing streaming")
51
  if "streamlit_av_queue" not in st.session_state:
52
- print("001")
53
  from streamlit_av_queue import StreamlitAVQueue
54
  st.session_state.streamlit_av_queue = StreamlitAVQueue()
55
 
56
- if "speech_to_text_vosk" not in st.session_state:
57
- print("002")
58
- from speech_to_text_vosk import SpeechToTextVosk
59
- st.session_state.speech_to_text_vosk = SpeechToTextVosk()
60
-
61
- from chat_pipeline import ChatPipeline
62
- if "chat_pipeline" not in st.session_state:
63
- print("003")
64
- # from chat_pipeline import ChatPipeline
65
- # st.session_state.chat_pipeline = ChatPipeline()
66
- # await st.session_state.chat_pipeline.start()
67
- st.session_state.chat_pipeline = ChatPipeline()
68
- await st.session_state.chat_pipeline.start()
69
-
70
- if "debug_queue" not in st.session_state:
71
- st.session_state.debug_queue = [
72
- # "hello, how are you today?",
73
- # "hmm, interesting, tell me more about that.",
74
- ]
75
-
76
  system_one_audio_status.write("resources referecned")
77
- print("010")
78
 
79
 
80
 
@@ -93,6 +75,10 @@ async def main():
93
  exit
94
 
95
  system_one_audio_status.write("Initializing speech")
 
 
 
 
96
 
97
  try:
98
  while True:
@@ -100,62 +86,24 @@ async def main():
100
  system_one_audio_status.write("Stopped.")
101
  await asyncio.sleep(0.1)
102
  continue
103
- system_one_audio_status.write("Streaming.")
104
- if len(st.session_state.debug_queue) > 0:
105
- prompt = st.session_state.debug_queue.pop(0)
106
- await st.session_state.chat_pipeline.enqueue(prompt)
107
- audio_frames = st.session_state.streamlit_av_queue.get_audio_frames()
108
- if len(audio_frames) > 0:
109
- # Concatenate all audio frames into a single buffer
110
- audio_buffer = b"".join([buffer.tobytes() for buffer in audio_frames])
111
- st.session_state.speech_to_text_vosk.add_speech_bytes(audio_buffer)
112
- prompt, speaker_finished = st.session_state.speech_to_text_vosk.get_text()
113
- if speaker_finished and len(prompt) > 0:
114
- print(f"Prompt: {prompt}")
115
- system_one_audio_history.append(prompt)
116
- if len(system_one_audio_history) > 10:
117
- system_one_audio_history = system_one_audio_history[-10:]
118
- table_content = "| System 1 Audio History |\n| --- |\n"
119
- table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
120
- system_one_audio_history_output.markdown(table_content)
121
- await st.session_state.chat_pipeline.enqueue(prompt)
122
- video_frames = st.session_state.streamlit_av_queue.get_video_frames()
123
- if len(video_frames) > 0:
124
- # for video_frame in video_frames:
125
- # system_one_video_output.image(video_frame.to_ndarray())
126
  pass
 
 
 
127
  await asyncio.sleep(0.1)
128
 
129
- # try:
130
- # prompts = [
131
- # "hello, how are you today?",
132
- # "tell me about your shadow self?",
133
- # "hmm, interesting, tell me more about that.",
134
- # "wait, that is so interesting, what else?",
135
- # ]
136
- # for prompt in prompts:
137
- # system_one_audio_history.append(prompt)
138
- # if len(system_one_audio_history) > 10:
139
- # system_one_audio_history = system_one_audio_history[-10:]
140
- # table_content = "| System 1 Audio History |\n| --- |\n"
141
- # table_content += "\n".join([f"| {item} |" for item in reversed(system_one_audio_history)])
142
- # system_one_audio_history_output.markdown(table_content)
143
- # await chat_pipeline.enqueue(prompt)
144
-
145
  except Exception as e:
146
  print(f"An error occurred: {e}")
147
  traceback.print_exc()
148
  raise e
149
 
150
 
151
- # while True:
152
- # if webrtc_ctx.state.playing:
153
- # system_one_audio_status.write("Streaming.")
154
- # else:
155
- # system_one_audio_status.write("Stopped.")
156
- # await asyncio.sleep(0.5)
157
-
158
-
159
  if __name__ == "__main__":
160
  try:
161
  asyncio.run(main())
@@ -164,12 +112,6 @@ if __name__ == "__main__":
164
  del webrtc_ctx
165
  webrtc_ctx = None
166
  if "streamlit_av_queue" in st.session_state:
167
- del st.session_state.streamlit_av_queue
168
-
169
- if "speech_to_text_vosk" in st.session_state:
170
- del st.session_state.speech_to_text_vosk
171
-
172
- if "chat_pipeline" in st.session_state:
173
- del st.session_state.chat_pipeline
174
  finally:
175
  pass
 
27
  # Initialize Ray
28
  import ray
29
  if not ray.is_initialized():
30
+ # Try to connect to a running Ray cluster
31
+ ray_address = os.getenv('RAY_ADDRESS')
32
+ if ray_address:
33
+ ray.init(ray_address, namespace="project_charles")
34
+ else:
35
+ ray.init(namespace="project_charles")
36
 
37
 
38
 
 
51
  system_one_audio_history_output = st.empty()
52
 
53
  # Initialize resources if not already done
 
54
  system_one_audio_status.write("Initializing streaming")
55
  if "streamlit_av_queue" not in st.session_state:
 
56
  from streamlit_av_queue import StreamlitAVQueue
57
  st.session_state.streamlit_av_queue = StreamlitAVQueue()
58
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
59
  system_one_audio_status.write("resources referecned")
 
60
 
61
 
62
 
 
75
  exit
76
 
77
  system_one_audio_status.write("Initializing speech")
78
+
79
+ from charles_actor import CharlesActor
80
+ charles_actor = None
81
+
82
 
83
  try:
84
  while True:
 
86
  system_one_audio_status.write("Stopped.")
87
  await asyncio.sleep(0.1)
88
  continue
89
+ if charles_actor is None:
90
+ try:
91
+ charles_actor = ray.get_actor("CharlesActor")
92
+ system_one_audio_status.write("Charles is here.")
93
+ except ValueError as e:
94
+ system_one_audio_status.write("Charles is sleeping.")
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
95
  pass
96
+ if charles_actor is not None:
97
+ audio_history = ray.get(charles_actor.get_system_one_audio_history_output.remote())
98
+ system_one_audio_history_output.markdown(audio_history)
99
  await asyncio.sleep(0.1)
100
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
101
  except Exception as e:
102
  print(f"An error occurred: {e}")
103
  traceback.print_exc()
104
  raise e
105
 
106
 
 
 
 
 
 
 
 
 
107
  if __name__ == "__main__":
108
  try:
109
  asyncio.run(main())
 
112
  del webrtc_ctx
113
  webrtc_ctx = None
114
  if "streamlit_av_queue" in st.session_state:
115
+ del st.session_state.streamlit_av_queue
 
 
 
 
 
 
116
  finally:
117
  pass
input_av_queue_actor.py CHANGED
@@ -12,9 +12,15 @@ class InputAVQueueActor:
12
  self.video_queue = Queue(maxsize=100) # Adjust the size as needed
13
 
14
  def enqueue_video_frame(self, shared_tensor_ref):
 
 
 
15
  self.video_queue.put(shared_tensor_ref)
16
 
17
  def enqueue_audio_frame(self, shared_buffer_ref):
 
 
 
18
  self.audio_queue.put(shared_buffer_ref)
19
 
20
 
 
12
  self.video_queue = Queue(maxsize=100) # Adjust the size as needed
13
 
14
  def enqueue_video_frame(self, shared_tensor_ref):
15
+ if self.video_queue.full():
16
+ evicted_item = self.video_queue.get()
17
+ del evicted_item
18
  self.video_queue.put(shared_tensor_ref)
19
 
20
  def enqueue_audio_frame(self, shared_buffer_ref):
21
+ if self.audio_queue.full():
22
+ evicted_item = self.audio_queue.get()
23
+ del evicted_item
24
  self.audio_queue.put(shared_buffer_ref)
25
 
26
 
streamlit_av_queue.py CHANGED
@@ -13,11 +13,11 @@ import torch
13
  class StreamlitAVQueue:
14
  def __init__(self, audio_bit_rate=16000):
15
  self._audio_bit_rate = audio_bit_rate
16
- try:
17
- self.queue_actor = ray.get_actor("InputAVQueueActor")
18
- except ValueError as e:
19
- self.queue_actor = InputAVQueueActor.options(name="InputAVQueueActor").remote()
20
-
21
  async def queued_video_frames_callback(
22
  self,
23
  frames: List[av.AudioFrame],
 
13
  class StreamlitAVQueue:
14
  def __init__(self, audio_bit_rate=16000):
15
  self._audio_bit_rate = audio_bit_rate
16
+ self.queue_actor = InputAVQueueActor.options(
17
+ name="InputAVQueueActor",
18
+ get_if_exists=True,
19
+ ).remote()
20
+
21
  async def queued_video_frames_callback(
22
  self,
23
  frames: List[av.AudioFrame],