Spaces:
Running
Running
File size: 1,652 Bytes
287a0bc |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
use std::ops::Deref;
// mirrors chromadb/utils/messageid.py
use num_bigint::BigInt;
use pulsar::{consumer::data::MessageData, proto::MessageIdData};
use crate::types::SeqId;
pub(crate) struct PulsarMessageIdWrapper(pub(crate) MessageData);
impl Deref for PulsarMessageIdWrapper {
type Target = MessageIdData;
fn deref(&self) -> &Self::Target {
&self.0.id
}
}
pub(crate) fn pulsar_to_int(message_id: PulsarMessageIdWrapper) -> SeqId {
let ledger_id = message_id.ledger_id;
let entry_id = message_id.entry_id;
let batch_index = message_id.batch_index.unwrap_or(0);
let partition = message_id.partition.unwrap_or(0);
let mut ledger_id = BigInt::from(ledger_id);
let mut entry_id = BigInt::from(entry_id);
let mut batch_index = BigInt::from(batch_index);
let mut partition = BigInt::from(partition);
// Convert to offset binary encoding to preserve ordering semantics when encoded
// see https://en.wikipedia.org/wiki/Offset_binary
ledger_id = ledger_id + BigInt::from(2).pow(63);
entry_id = entry_id + BigInt::from(2).pow(63);
batch_index = batch_index + BigInt::from(2).pow(31);
partition = partition + BigInt::from(2).pow(31);
let res = ledger_id << 128 | entry_id << 96 | batch_index << 64 | partition;
res
}
// We can't use From because we don't own the type
// So the pattern is to wrap it in a newtype and implement TryFrom for that
// And implement Dereference for the newtype to the underlying type
impl From<PulsarMessageIdWrapper> for SeqId {
fn from(message_id: PulsarMessageIdWrapper) -> Self {
return pulsar_to_int(message_id);
}
}
|