Spaces:
Running
Running
File size: 3,437 Bytes
287a0bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 |
use crate::{
config::{Configurable, WorkerConfig},
errors::ChromaError,
};
use super::{
config::{AssignmentPolicyConfig, HasherType},
rendezvous_hash::{assign, AssignmentError, Murmur3Hasher},
};
use async_trait::async_trait;
/*
===========================================
Interfaces
===========================================
*/
/// AssignmentPolicy is a trait that defines how to assign a key to a set of members.
/// # Notes
/// This trait mirrors the go and python versions of the assignment policy
/// interface.
/// # Methods
/// - assign: Assign a key to a topic.
/// - get_members: Get the members that can be assigned to.
/// - set_members: Set the members that can be assigned to.
/// # Notes
/// An assignment policy is not responsible for creating the topics it assigns to.
/// It is the responsibility of the caller to ensure that the topics exist.
/// An assignment policy must be Send.
pub(crate) trait AssignmentPolicy: Send {
fn assign(&self, key: &str) -> Result<String, AssignmentError>;
fn get_members(&self) -> Vec<String>;
fn set_members(&mut self, members: Vec<String>);
}
/*
===========================================
Implementation
===========================================
*/
pub(crate) struct RendezvousHashingAssignmentPolicy {
hasher: Murmur3Hasher,
members: Vec<String>,
}
impl RendezvousHashingAssignmentPolicy {
// Rust beginners note
// The reason we take String and not &str is because we need to put the strings into our
// struct, and we can't do that with references so rather than clone the strings, we just
// take ownership of them and put the responsibility on the caller to clone them if they
// need to. This is the general pattern we should follow in rust - put the burden of cloning
// on the caller, and if they don't need to clone, they can pass ownership.
pub(crate) fn new(
pulsar_tenant: String,
pulsar_namespace: String,
) -> RendezvousHashingAssignmentPolicy {
return RendezvousHashingAssignmentPolicy {
hasher: Murmur3Hasher {},
members: vec![],
};
}
pub(crate) fn set_members(&mut self, members: Vec<String>) {
self.members = members;
}
}
#[async_trait]
impl Configurable for RendezvousHashingAssignmentPolicy {
async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> {
let assignment_policy_config = match &worker_config.assignment_policy {
AssignmentPolicyConfig::RendezvousHashing(config) => config,
};
let hasher = match assignment_policy_config.hasher {
HasherType::Murmur3 => Murmur3Hasher {},
};
return Ok(RendezvousHashingAssignmentPolicy {
hasher: hasher,
members: vec![],
});
}
}
impl AssignmentPolicy for RendezvousHashingAssignmentPolicy {
fn assign(&self, key: &str) -> Result<String, AssignmentError> {
let topics = self.get_members();
let topic = assign(key, topics, &self.hasher);
return topic;
}
fn get_members(&self) -> Vec<String> {
// This is not designed to be used frequently for now, nor is the number of members
// expected to be large, so we can just clone the members
return self.members.clone();
}
fn set_members(&mut self, members: Vec<String>) {
self.members = members;
}
}
|