Spaces:
Running
Running
use async_trait::async_trait; | |
use figment::providers::{Env, Format, Serialized, Yaml}; | |
use serde::Deserialize; | |
use crate::errors::ChromaError; | |
const DEFAULT_CONFIG_PATH: &str = "./chroma_config.yaml"; | |
const ENV_PREFIX: &str = "CHROMA_"; | |
/// # Description | |
/// The RootConfig for all chroma services this is a YAML file that | |
/// is shared between all services, and secondarily, fields can be | |
/// populated from environment variables. The environment variables | |
/// are prefixed with CHROMA_ and are uppercase. Values in the envionment | |
/// variables take precedence over values in the YAML file. | |
/// By default, it is read from the current working directory, | |
/// with the filename chroma_config.yaml. | |
pub(crate) struct RootConfig { | |
// The root config object wraps the worker config object so that | |
// we can share the same config file between multiple services. | |
pub worker: WorkerConfig, | |
} | |
impl RootConfig { | |
/// # Description | |
/// Load the config from the default location. | |
/// # Returns | |
/// The config object. | |
/// # Panics | |
/// - If the config file cannot be read. | |
/// - If the config file is not valid YAML. | |
/// - If the config file does not contain the required fields. | |
/// - If the config file contains invalid values. | |
/// - If the environment variables contain invalid values. | |
/// # Notes | |
/// The default location is the current working directory, with the filename chroma_config.yaml. | |
/// The environment variables are prefixed with CHROMA_ and are uppercase. | |
/// Values in the envionment variables take precedence over values in the YAML file. | |
pub(crate) fn load() -> Self { | |
return Self::load_from_path(DEFAULT_CONFIG_PATH); | |
} | |
/// # Description | |
/// Load the config from a specific location. | |
/// # Arguments | |
/// - path: The path to the config file. | |
/// # Returns | |
/// The config object. | |
/// # Panics | |
/// - If the config file cannot be read. | |
/// - If the config file is not valid YAML. | |
/// - If the config file does not contain the required fields. | |
/// - If the config file contains invalid values. | |
/// - If the environment variables contain invalid values. | |
/// # Notes | |
/// The environment variables are prefixed with CHROMA_ and are uppercase. | |
/// Values in the envionment variables take precedence over values in the YAML file. | |
pub(crate) fn load_from_path(path: &str) -> Self { | |
// Unfortunately, figment doesn't support environment variables with underscores. So we have to map and replace them. | |
// Excluding our own environment variables, which are prefixed with CHROMA_. | |
let mut f = figment::Figment::from(Env::prefixed("CHROMA_").map(|k| match k { | |
k if k == "num_indexing_threads" => k.into(), | |
k if k == "my_ip" => k.into(), | |
k => k.as_str().replace("__", ".").into(), | |
})); | |
if std::path::Path::new(path).exists() { | |
f = figment::Figment::from(Yaml::file(path)).merge(f); | |
} | |
// Apply defaults - this seems to be the best way to do it. | |
// https://github.com/SergioBenitez/Figment/issues/77#issuecomment-1642490298 | |
f = f.join(Serialized::default( | |
"worker.num_indexing_threads", | |
num_cpus::get(), | |
)); | |
let res = f.extract(); | |
match res { | |
Ok(config) => return config, | |
Err(e) => panic!("Error loading config: {}", e), | |
} | |
} | |
} | |
/// # Description | |
/// The primary config for the worker service. | |
/// ## Description of parameters | |
/// - my_ip: The IP address of the worker service. Used for memberlist assignment. Must be provided | |
/// - num_indexing_threads: The number of indexing threads to use. If not provided, defaults to the number of cores on the machine. | |
/// - pulsar_tenant: The pulsar tenant to use. Must be provided. | |
/// - pulsar_namespace: The pulsar namespace to use. Must be provided. | |
/// - assignment_policy: The assignment policy to use. Must be provided. | |
/// # Notes | |
/// In order to set the enviroment variables, you must prefix them with CHROMA_WORKER__<FIELD_NAME>. | |
/// For example, to set my_ip, you would set CHROMA_WORKER__MY_IP. | |
/// Each submodule that needs to be configured from the config object should implement the Configurable trait and | |
/// have its own field in this struct for its Config struct. | |
pub(crate) struct WorkerConfig { | |
pub(crate) my_ip: String, | |
pub(crate) my_port: u16, | |
pub(crate) num_indexing_threads: u32, | |
pub(crate) pulsar_tenant: String, | |
pub(crate) pulsar_namespace: String, | |
pub(crate) pulsar_url: String, | |
pub(crate) kube_namespace: String, | |
pub(crate) assignment_policy: crate::assignment::config::AssignmentPolicyConfig, | |
pub(crate) memberlist_provider: crate::memberlist::config::MemberlistProviderConfig, | |
pub(crate) ingest: crate::ingest::config::IngestConfig, | |
pub(crate) sysdb: crate::sysdb::config::SysDbConfig, | |
pub(crate) segment_manager: crate::segment::config::SegmentManagerConfig, | |
pub(crate) storage: crate::storage::config::StorageConfig, | |
} | |
/// # Description | |
/// A trait for configuring a struct from a config object. | |
/// # Notes | |
/// This trait is used to configure structs from the config object. | |
/// Components that need to be configured from the config object should implement this trait. | |
pub(crate) trait Configurable { | |
async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> | |
where | |
Self: Sized; | |
} | |
mod tests { | |
use super::*; | |
use figment::Jail; | |
fn test_config_from_default_path() { | |
Jail::expect_with(|jail| { | |
let _ = jail.create_file( | |
"chroma_config.yaml", | |
r#" | |
worker: | |
my_ip: "192.0.0.1" | |
my_port: 50051 | |
num_indexing_threads: 4 | |
pulsar_tenant: "public" | |
pulsar_namespace: "default" | |
pulsar_url: "pulsar://localhost:6650" | |
kube_namespace: "chroma" | |
assignment_policy: | |
RendezvousHashing: | |
hasher: Murmur3 | |
memberlist_provider: | |
CustomResource: | |
memberlist_name: "worker-memberlist" | |
queue_size: 100 | |
ingest: | |
queue_size: 100 | |
sysdb: | |
Grpc: | |
host: "localhost" | |
port: 50051 | |
segment_manager: | |
storage_path: "/tmp" | |
storage: | |
S3: | |
bucket: "chroma" | |
"#, | |
); | |
let config = RootConfig::load(); | |
assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
assert_eq!(config.worker.num_indexing_threads, 4); | |
assert_eq!(config.worker.pulsar_tenant, "public"); | |
assert_eq!(config.worker.pulsar_namespace, "default"); | |
assert_eq!(config.worker.kube_namespace, "chroma"); | |
Ok(()) | |
}); | |
} | |
fn test_config_from_specific_path() { | |
Jail::expect_with(|jail| { | |
let _ = jail.create_file( | |
"random_path.yaml", | |
r#" | |
worker: | |
my_ip: "192.0.0.1" | |
my_port: 50051 | |
num_indexing_threads: 4 | |
pulsar_tenant: "public" | |
pulsar_namespace: "default" | |
pulsar_url: "pulsar://localhost:6650" | |
kube_namespace: "chroma" | |
assignment_policy: | |
RendezvousHashing: | |
hasher: Murmur3 | |
memberlist_provider: | |
CustomResource: | |
memberlist_name: "worker-memberlist" | |
queue_size: 100 | |
ingest: | |
queue_size: 100 | |
sysdb: | |
Grpc: | |
host: "localhost" | |
port: 50051 | |
segment_manager: | |
storage_path: "/tmp" | |
storage: | |
S3: | |
bucket: "chroma" | |
"#, | |
); | |
let config = RootConfig::load_from_path("random_path.yaml"); | |
assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
assert_eq!(config.worker.num_indexing_threads, 4); | |
assert_eq!(config.worker.pulsar_tenant, "public"); | |
assert_eq!(config.worker.pulsar_namespace, "default"); | |
assert_eq!(config.worker.kube_namespace, "chroma"); | |
Ok(()) | |
}); | |
} | |
fn test_config_missing_required_field() { | |
Jail::expect_with(|jail| { | |
let _ = jail.create_file( | |
"chroma_config.yaml", | |
r#" | |
worker: | |
num_indexing_threads: 4 | |
"#, | |
); | |
let _ = RootConfig::load(); | |
Ok(()) | |
}); | |
} | |
fn test_missing_default_field() { | |
Jail::expect_with(|jail| { | |
let _ = jail.create_file( | |
"chroma_config.yaml", | |
r#" | |
worker: | |
my_ip: "192.0.0.1" | |
my_port: 50051 | |
pulsar_tenant: "public" | |
pulsar_namespace: "default" | |
kube_namespace: "chroma" | |
pulsar_url: "pulsar://localhost:6650" | |
assignment_policy: | |
RendezvousHashing: | |
hasher: Murmur3 | |
memberlist_provider: | |
CustomResource: | |
memberlist_name: "worker-memberlist" | |
queue_size: 100 | |
ingest: | |
queue_size: 100 | |
sysdb: | |
Grpc: | |
host: "localhost" | |
port: 50051 | |
segment_manager: | |
storage_path: "/tmp" | |
storage: | |
S3: | |
bucket: "chroma" | |
"#, | |
); | |
let config = RootConfig::load(); | |
assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
assert_eq!(config.worker.num_indexing_threads, num_cpus::get() as u32); | |
Ok(()) | |
}); | |
} | |
fn test_config_with_env_override() { | |
Jail::expect_with(|jail| { | |
let _ = jail.set_env("CHROMA_WORKER__MY_IP", "192.0.0.1"); | |
let _ = jail.set_env("CHROMA_WORKER__MY_PORT", 50051); | |
let _ = jail.set_env("CHROMA_WORKER__PULSAR_TENANT", "A"); | |
let _ = jail.set_env("CHROMA_WORKER__PULSAR_NAMESPACE", "B"); | |
let _ = jail.set_env("CHROMA_WORKER__KUBE_NAMESPACE", "C"); | |
let _ = jail.set_env("CHROMA_WORKER__PULSAR_URL", "pulsar://localhost:6650"); | |
let _ = jail.create_file( | |
"chroma_config.yaml", | |
r#" | |
worker: | |
assignment_policy: | |
RendezvousHashing: | |
hasher: Murmur3 | |
memberlist_provider: | |
CustomResource: | |
memberlist_name: "worker-memberlist" | |
queue_size: 100 | |
ingest: | |
queue_size: 100 | |
sysdb: | |
Grpc: | |
host: "localhost" | |
port: 50051 | |
segment_manager: | |
storage_path: "/tmp" | |
storage: | |
S3: | |
bucket: "chroma" | |
"#, | |
); | |
let config = RootConfig::load(); | |
assert_eq!(config.worker.my_ip, "192.0.0.1"); | |
assert_eq!(config.worker.my_port, 50051); | |
assert_eq!(config.worker.num_indexing_threads, num_cpus::get() as u32); | |
assert_eq!(config.worker.pulsar_tenant, "A"); | |
assert_eq!(config.worker.pulsar_namespace, "B"); | |
assert_eq!(config.worker.kube_namespace, "C"); | |
Ok(()) | |
}); | |
} | |
} | |