use std::fmt::Debug; use std::sync::Arc; use futures::Stream; use futures::StreamExt; use tokio::runtime::Builder; use tokio::{pin, select}; use super::ComponentRuntime; // use super::executor::StreamComponentExecutor; use super::sender::{self, Sender, Wrapper}; use super::{executor, ComponentContext}; use super::{executor::ComponentExecutor, Component, ComponentHandle, Handler, StreamHandler}; use std::sync::Mutex; #[derive(Clone)] pub(crate) struct System { inner: Arc>, } struct Inner {} impl System { pub(crate) fn new() -> System { System { inner: Arc::new(Mutex::new(Inner {})), } } pub(crate) fn start_component(&mut self, mut component: C) -> ComponentHandle where C: Component + Send + 'static, { let (tx, rx) = tokio::sync::mpsc::channel(component.queue_size()); let sender = Sender::new(tx); let cancel_token = tokio_util::sync::CancellationToken::new(); let _ = component.on_start(&ComponentContext { system: self.clone(), sender: sender.clone(), cancellation_token: cancel_token.clone(), }); let mut executor = ComponentExecutor::new( sender.clone(), cancel_token.clone(), component, self.clone(), ); match C::runtime() { ComponentRuntime::Global => { let join_handle = tokio::spawn(async move { executor.run(rx).await }); return ComponentHandle::new(cancel_token, Some(join_handle), sender); } ComponentRuntime::Dedicated => { println!("Spawning on dedicated thread"); // Spawn on a dedicated thread let mut rt = Builder::new_current_thread().enable_all().build().unwrap(); let join_handle = std::thread::spawn(move || { rt.block_on(async move { executor.run(rx).await }); }); // TODO: Implement Join for dedicated threads return ComponentHandle::new(cancel_token, None, sender); } } } pub(super) fn register_stream(&self, stream: S, ctx: &ComponentContext) where C: StreamHandler + Handler, M: Send + Debug + 'static, S: Stream + Send + Stream + 'static, { let ctx = ComponentContext { system: self.clone(), sender: ctx.sender.clone(), cancellation_token: ctx.cancellation_token.clone(), }; tokio::spawn(async move { stream_loop(stream, &ctx).await }); } } async fn stream_loop(stream: S, ctx: &ComponentContext) where C: StreamHandler + Handler, M: Send + Debug + 'static, S: Stream + Send + Stream + 'static, { pin!(stream); loop { select! { _ = ctx.cancellation_token.cancelled() => { break; } message = stream.next() => { match message { Some(message) => { let res = ctx.sender.send(message).await; match res { Ok(_) => {} Err(e) => { println!("Failed to send message: {:?}", e); // TODO: switch to logging // Terminate the stream break; } } }, None => { break; } } } } } }