File size: 5,115 Bytes
287a0bc
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
// This implementation mirrors the rendezvous hash implementation
// in the go and python services.
// The go implementation is located go/internal/utils/rendezvous_hash.go
// The python implementation is located chromadb/utils/rendezvous_hash.py

use crate::errors::{ChromaError, ErrorCodes};
use std::io::Cursor;
use thiserror::Error;

use murmur3::murmur3_x64_128;

/// A trait for hashing a member and a key to a score.
pub(crate) trait Hasher {
    fn hash(&self, member: &str, key: &str) -> Result<u64, AssignmentError>;
}

/// Error codes for assignment
#[derive(Error, Debug)]
pub(crate) enum AssignmentError {
    #[error("Cannot assign empty key")]
    EmptyKey,
    #[error("No members to assign to")]
    NoMembers,
    #[error("Error hashing member")]
    HashError,
}

impl ChromaError for AssignmentError {
    fn code(&self) -> ErrorCodes {
        match self {
            AssignmentError::EmptyKey => ErrorCodes::InvalidArgument,
            AssignmentError::NoMembers => ErrorCodes::InvalidArgument,
            AssignmentError::HashError => ErrorCodes::Internal,
        }
    }
}

/// Assign a key to a member using the rendezvous hash algorithm.
/// # Arguments
/// - key: The key to assign.
/// - members: The members to assign to.
/// - hasher: The hasher to use.
/// # Returns
/// The member that the key was assigned to.
/// # Errors
/// - If the key is empty.
/// - If there are no members to assign to.
/// - If there is an error hashing a member.
/// # Notes
/// This implementation mirrors the rendezvous hash implementation
/// in the go and python services.
pub(crate) fn assign<H: Hasher>(
    key: &str,
    members: impl IntoIterator<Item = impl AsRef<str>>,
    hasher: &H,
) -> Result<String, AssignmentError> {
    if key.is_empty() {
        return Err(AssignmentError::EmptyKey);
    }

    let mut iterated = false;
    let mut max_score = u64::MIN;
    let mut max_member = None;

    for member in members {
        if !iterated {
            iterated = true;
        }
        let score = hasher.hash(member.as_ref(), key);
        let score = match score {
            Ok(score) => score,
            Err(err) => return Err(AssignmentError::HashError),
        };
        if score > max_score {
            max_score = score;
            max_member = Some(member);
        }
    }

    if !iterated {
        return Err(AssignmentError::NoMembers);
    }

    match max_member {
        Some(max_member) => return Ok(max_member.as_ref().to_string()),
        None => return Err(AssignmentError::NoMembers),
    }
}

fn merge_hashes(x: u64, y: u64) -> u64 {
    let mut acc = x ^ y;
    acc ^= acc >> 33;
    acc = acc.wrapping_mul(0xFF51AFD7ED558CCD);
    acc ^= acc >> 33;
    acc = acc.wrapping_mul(0xC4CEB9FE1A85EC53);
    acc ^= acc >> 33;
    acc
}

pub(crate) struct Murmur3Hasher {}

impl Hasher for Murmur3Hasher {
    fn hash(&self, member: &str, key: &str) -> Result<u64, AssignmentError> {
        let member_hash = murmur3_x64_128(&mut Cursor::new(member), 0);
        let key_hash = murmur3_x64_128(&mut Cursor::new(key), 0);
        // The murmur library returns a 128 bit hash, but we only need 64 bits, grab the first 64 bits
        match (member_hash, key_hash) {
            (Ok(member_hash), Ok(key_hash)) => {
                let member_hash_64 = member_hash as u64;
                let key_hash_64 = key_hash as u64;
                let merged = merge_hashes(member_hash_64, key_hash_64);
                return Ok(merged);
            }
            _ => return Err(AssignmentError::HashError),
        };
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    struct MockHasher {}

    impl Hasher for MockHasher {
        fn hash(&self, member: &str, _key: &str) -> Result<u64, AssignmentError> {
            match member {
                "a" => Ok(1),
                "b" => Ok(2),
                "c" => Ok(3),
                _ => Err(AssignmentError::HashError),
            }
        }
    }

    #[test]
    fn test_assign() {
        let members = vec!["a", "b", "c"];
        let hasher = MockHasher {};
        let key = "key";
        let member = assign(key, members, &hasher).unwrap();
        assert_eq!(member, "c".to_string());
    }

    #[test]
    fn test_even_distribution() {
        let member_count = 10;
        let tolerance = 25;
        let mut nodes = Vec::with_capacity(member_count);
        let hasher = Murmur3Hasher {};

        for i in 0..member_count {
            let member = format!("member{}", i);
            nodes.push(member);
        }

        let mut counts = vec![0; member_count];
        let num_keys = 1000;
        for i in 0..num_keys {
            let key = format!("key_{}", i);
            let member = assign(&key, &nodes, &hasher).unwrap();
            let index = nodes.iter().position(|x| *x == member).unwrap();
            counts[index] += 1;
        }

        let expected = num_keys / member_count;
        for i in 0..member_count {
            let count = counts[i];
            let diff = count - expected as i32;
            assert!(diff.abs() < tolerance);
        }
    }
}