Spaces:
Sleeping
Sleeping
| use std::fmt::Debug; | |
| use std::sync::Arc; | |
| use futures::Stream; | |
| use futures::StreamExt; | |
| use tokio::runtime::Builder; | |
| use tokio::{pin, select}; | |
| use super::ComponentRuntime; | |
| // use super::executor::StreamComponentExecutor; | |
| use super::sender::{self, Sender, Wrapper}; | |
| use super::{executor, ComponentContext}; | |
| use super::{executor::ComponentExecutor, Component, ComponentHandle, Handler, StreamHandler}; | |
| use std::sync::Mutex; | |
| pub(crate) struct System { | |
| inner: Arc<Mutex<Inner>>, | |
| } | |
| struct Inner {} | |
| impl System { | |
| pub(crate) fn new() -> System { | |
| System { | |
| inner: Arc::new(Mutex::new(Inner {})), | |
| } | |
| } | |
| pub(crate) fn start_component<C>(&mut self, mut component: C) -> ComponentHandle<C> | |
| where | |
| C: Component + Send + 'static, | |
| { | |
| let (tx, rx) = tokio::sync::mpsc::channel(component.queue_size()); | |
| let sender = Sender::new(tx); | |
| let cancel_token = tokio_util::sync::CancellationToken::new(); | |
| let _ = component.on_start(&ComponentContext { | |
| system: self.clone(), | |
| sender: sender.clone(), | |
| cancellation_token: cancel_token.clone(), | |
| }); | |
| let mut executor = ComponentExecutor::new( | |
| sender.clone(), | |
| cancel_token.clone(), | |
| component, | |
| self.clone(), | |
| ); | |
| match C::runtime() { | |
| ComponentRuntime::Global => { | |
| let join_handle = tokio::spawn(async move { executor.run(rx).await }); | |
| return ComponentHandle::new(cancel_token, Some(join_handle), sender); | |
| } | |
| ComponentRuntime::Dedicated => { | |
| println!("Spawning on dedicated thread"); | |
| // Spawn on a dedicated thread | |
| let mut rt = Builder::new_current_thread().enable_all().build().unwrap(); | |
| let join_handle = std::thread::spawn(move || { | |
| rt.block_on(async move { executor.run(rx).await }); | |
| }); | |
| // TODO: Implement Join for dedicated threads | |
| return ComponentHandle::new(cancel_token, None, sender); | |
| } | |
| } | |
| } | |
| pub(super) fn register_stream<C, S, M>(&self, stream: S, ctx: &ComponentContext<C>) | |
| where | |
| C: StreamHandler<M> + Handler<M>, | |
| M: Send + Debug + 'static, | |
| S: Stream + Send + Stream<Item = M> + 'static, | |
| { | |
| let ctx = ComponentContext { | |
| system: self.clone(), | |
| sender: ctx.sender.clone(), | |
| cancellation_token: ctx.cancellation_token.clone(), | |
| }; | |
| tokio::spawn(async move { stream_loop(stream, &ctx).await }); | |
| } | |
| } | |
| async fn stream_loop<C, S, M>(stream: S, ctx: &ComponentContext<C>) | |
| where | |
| C: StreamHandler<M> + Handler<M>, | |
| M: Send + Debug + 'static, | |
| S: Stream + Send + Stream<Item = M> + 'static, | |
| { | |
| pin!(stream); | |
| loop { | |
| select! { | |
| _ = ctx.cancellation_token.cancelled() => { | |
| break; | |
| } | |
| message = stream.next() => { | |
| match message { | |
| Some(message) => { | |
| let res = ctx.sender.send(message).await; | |
| match res { | |
| Ok(_) => {} | |
| Err(e) => { | |
| println!("Failed to send message: {:?}", e); | |
| // TODO: switch to logging | |
| // Terminate the stream | |
| break; | |
| } | |
| } | |
| }, | |
| None => { | |
| break; | |
| } | |
| } | |
| } | |
| } | |
| } | |
| } | |