File size: 3,437 Bytes
287a0bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use crate::{
    config::{Configurable, WorkerConfig},
    errors::ChromaError,
};

use super::{
    config::{AssignmentPolicyConfig, HasherType},
    rendezvous_hash::{assign, AssignmentError, Murmur3Hasher},
};
use async_trait::async_trait;

/*
===========================================
Interfaces
===========================================
*/

/// AssignmentPolicy is a trait that defines how to assign a key to a set of members.
/// # Notes
/// This trait mirrors the go and python versions of the assignment policy
/// interface.
/// # Methods
/// - assign: Assign a key to a topic.
/// - get_members: Get the members that can be assigned to.
/// - set_members: Set the members that can be assigned to.
/// # Notes
/// An assignment policy is not responsible for creating the topics it assigns to.
/// It is the responsibility of the caller to ensure that the topics exist.
/// An assignment policy must be Send.
pub(crate) trait AssignmentPolicy: Send {
    fn assign(&self, key: &str) -> Result<String, AssignmentError>;
    fn get_members(&self) -> Vec<String>;
    fn set_members(&mut self, members: Vec<String>);
}

/*
===========================================
Implementation
===========================================
*/

pub(crate) struct RendezvousHashingAssignmentPolicy {
    hasher: Murmur3Hasher,
    members: Vec<String>,
}

impl RendezvousHashingAssignmentPolicy {
    // Rust beginners note
    // The reason we take String and not &str is because we need to put the strings into our
    // struct, and we can't do that with references so rather than clone the strings, we just
    // take ownership of them and put the responsibility on the caller to clone them if they
    // need to. This is the general pattern we should follow in rust - put the burden of cloning
    // on the caller, and if they don't need to clone, they can pass ownership.
    pub(crate) fn new(
        pulsar_tenant: String,
        pulsar_namespace: String,
    ) -> RendezvousHashingAssignmentPolicy {
        return RendezvousHashingAssignmentPolicy {
            hasher: Murmur3Hasher {},
            members: vec![],
        };
    }

    pub(crate) fn set_members(&mut self, members: Vec<String>) {
        self.members = members;
    }
}

#[async_trait]
impl Configurable for RendezvousHashingAssignmentPolicy {
    async fn try_from_config(worker_config: &WorkerConfig) -> Result<Self, Box<dyn ChromaError>> {
        let assignment_policy_config = match &worker_config.assignment_policy {
            AssignmentPolicyConfig::RendezvousHashing(config) => config,
        };
        let hasher = match assignment_policy_config.hasher {
            HasherType::Murmur3 => Murmur3Hasher {},
        };
        return Ok(RendezvousHashingAssignmentPolicy {
            hasher: hasher,
            members: vec![],
        });
    }
}

impl AssignmentPolicy for RendezvousHashingAssignmentPolicy {
    fn assign(&self, key: &str) -> Result<String, AssignmentError> {
        let topics = self.get_members();
        let topic = assign(key, topics, &self.hasher);
        return topic;
    }

    fn get_members(&self) -> Vec<String> {
        // This is not designed to be used frequently for now, nor is the number of members
        // expected to be large, so we can just clone the members
        return self.members.clone();
    }

    fn set_members(&mut self, members: Vec<String>) {
        self.members = members;
    }
}