badalsahani's picture
feat: chroma initial deploy
287a0bc
use std::fmt::Debug;
use std::sync::Arc;
use futures::Stream;
use futures::StreamExt;
use tokio::runtime::Builder;
use tokio::{pin, select};
use super::ComponentRuntime;
// use super::executor::StreamComponentExecutor;
use super::sender::{self, Sender, Wrapper};
use super::{executor, ComponentContext};
use super::{executor::ComponentExecutor, Component, ComponentHandle, Handler, StreamHandler};
use std::sync::Mutex;
#[derive(Clone)]
pub(crate) struct System {
inner: Arc<Mutex<Inner>>,
}
struct Inner {}
impl System {
pub(crate) fn new() -> System {
System {
inner: Arc::new(Mutex::new(Inner {})),
}
}
pub(crate) fn start_component<C>(&mut self, mut component: C) -> ComponentHandle<C>
where
C: Component + Send + 'static,
{
let (tx, rx) = tokio::sync::mpsc::channel(component.queue_size());
let sender = Sender::new(tx);
let cancel_token = tokio_util::sync::CancellationToken::new();
let _ = component.on_start(&ComponentContext {
system: self.clone(),
sender: sender.clone(),
cancellation_token: cancel_token.clone(),
});
let mut executor = ComponentExecutor::new(
sender.clone(),
cancel_token.clone(),
component,
self.clone(),
);
match C::runtime() {
ComponentRuntime::Global => {
let join_handle = tokio::spawn(async move { executor.run(rx).await });
return ComponentHandle::new(cancel_token, Some(join_handle), sender);
}
ComponentRuntime::Dedicated => {
println!("Spawning on dedicated thread");
// Spawn on a dedicated thread
let mut rt = Builder::new_current_thread().enable_all().build().unwrap();
let join_handle = std::thread::spawn(move || {
rt.block_on(async move { executor.run(rx).await });
});
// TODO: Implement Join for dedicated threads
return ComponentHandle::new(cancel_token, None, sender);
}
}
}
pub(super) fn register_stream<C, S, M>(&self, stream: S, ctx: &ComponentContext<C>)
where
C: StreamHandler<M> + Handler<M>,
M: Send + Debug + 'static,
S: Stream + Send + Stream<Item = M> + 'static,
{
let ctx = ComponentContext {
system: self.clone(),
sender: ctx.sender.clone(),
cancellation_token: ctx.cancellation_token.clone(),
};
tokio::spawn(async move { stream_loop(stream, &ctx).await });
}
}
async fn stream_loop<C, S, M>(stream: S, ctx: &ComponentContext<C>)
where
C: StreamHandler<M> + Handler<M>,
M: Send + Debug + 'static,
S: Stream + Send + Stream<Item = M> + 'static,
{
pin!(stream);
loop {
select! {
_ = ctx.cancellation_token.cancelled() => {
break;
}
message = stream.next() => {
match message {
Some(message) => {
let res = ctx.sender.send(message).await;
match res {
Ok(_) => {}
Err(e) => {
println!("Failed to send message: {:?}", e);
// TODO: switch to logging
// Terminate the stream
break;
}
}
},
None => {
break;
}
}
}
}
}
}