mod assignment; mod config; mod errors; mod index; mod ingest; mod memberlist; mod segment; mod server; mod storage; mod sysdb; mod system; mod types; use config::Configurable; use memberlist::MemberlistProvider; use crate::sysdb::sysdb::SysDb; mod chroma_proto { tonic::include_proto!("chroma"); } pub async fn worker_entrypoint() { let config = config::RootConfig::load(); // Create all the core components and start them // TODO: This should be handled by an Application struct and we can push the config into it // for now we expose the config to pub and inject it into the components // The two root components are ingest, and the gRPC server let mut system: system::System = system::System::new(); let mut ingest = match ingest::Ingest::try_from_config(&config.worker).await { Ok(ingest) => ingest, Err(err) => { println!("Failed to create ingest component: {:?}", err); return; } }; let mut memberlist = match memberlist::CustomResourceMemberlistProvider::try_from_config(&config.worker).await { Ok(memberlist) => memberlist, Err(err) => { println!("Failed to create memberlist component: {:?}", err); return; } }; let mut scheduler = ingest::RoundRobinScheduler::new(); let segment_manager = match segment::SegmentManager::try_from_config(&config.worker).await { Ok(segment_manager) => segment_manager, Err(err) => { println!("Failed to create segment manager component: {:?}", err); return; } }; let mut segment_ingestor_receivers = Vec::with_capacity(config.worker.num_indexing_threads as usize); for _ in 0..config.worker.num_indexing_threads { let segment_ingestor = segment::SegmentIngestor::new(segment_manager.clone()); let segment_ingestor_handle = system.start_component(segment_ingestor); let recv = segment_ingestor_handle.receiver(); segment_ingestor_receivers.push(recv); } let mut worker_server = match server::WorkerServer::try_from_config(&config.worker).await { Ok(worker_server) => worker_server, Err(err) => { println!("Failed to create worker server component: {:?}", err); return; } }; worker_server.set_segment_manager(segment_manager.clone()); // Boot the system // memberlist -> ingest -> scheduler -> NUM_THREADS x segment_ingestor -> segment_manager // server <- segment_manager for recv in segment_ingestor_receivers { scheduler.subscribe(recv); } let mut scheduler_handler = system.start_component(scheduler); ingest.subscribe(scheduler_handler.receiver()); let mut ingest_handle = system.start_component(ingest); let recv = ingest_handle.receiver(); memberlist.subscribe(recv); let mut memberlist_handle = system.start_component(memberlist); let server_join_handle = tokio::spawn(async move { crate::server::WorkerServer::run(worker_server).await; }); // Join on all handles let _ = tokio::join!( ingest_handle.join(), memberlist_handle.join(), scheduler_handler.join(), ); }