Spaces:
				
			
			
	
			
			
		Build error
		
	
	
	
			
			
	
	
	
	
		
		
		Build error
		
	| use std::cmp; | |
| use std::collections::HashMap; | |
| use std::hash::{Hash, Hasher}; | |
| use std::path::{Path, PathBuf}; | |
| use std::sync::atomic::AtomicBool; | |
| use std::sync::Arc; | |
| use ahash::AHasher; | |
| use atomic_refcell::AtomicRefCell; | |
| use bitvec::macros::internal::funty::Integral; | |
| use common::cpu::CpuPermit; | |
| use common::types::PointOffsetType; | |
| use io::storage_version::StorageVersion; | |
| use tempfile::TempDir; | |
| use uuid::Uuid; | |
| use super::{ | |
| create_mutable_id_tracker, create_payload_storage, create_sparse_vector_index, | |
| create_sparse_vector_storage, create_vector_index, get_payload_index_path, | |
| get_vector_index_path, get_vector_storage_path, new_segment_path, open_segment_db, | |
| open_vector_storage, | |
| }; | |
| use crate::common::error_logging::LogError; | |
| use crate::common::operation_error::{check_process_stopped, OperationError, OperationResult}; | |
| use crate::entry::entry_point::SegmentEntry; | |
| use crate::id_tracker::immutable_id_tracker::ImmutableIdTracker; | |
| use crate::id_tracker::in_memory_id_tracker::InMemoryIdTracker; | |
| use crate::id_tracker::{IdTracker, IdTrackerEnum}; | |
| use crate::index::field_index::FieldIndex; | |
| use crate::index::sparse_index::sparse_vector_index::SparseVectorIndexOpenArgs; | |
| use crate::index::struct_payload_index::StructPayloadIndex; | |
| use crate::index::PayloadIndex; | |
| use crate::payload_storage::payload_storage_enum::PayloadStorageEnum; | |
| use crate::payload_storage::PayloadStorage; | |
| use crate::segment::{Segment, SegmentVersion}; | |
| use crate::segment_constructor::load_segment; | |
| use crate::types::{ | |
| ExtendedPointId, PayloadFieldSchema, PayloadKeyType, SegmentConfig, SegmentState, SeqNumberType, | |
| }; | |
| use crate::vector_storage::quantized::quantized_vectors::QuantizedVectors; | |
| use crate::vector_storage::{VectorStorage, VectorStorageEnum}; | |
| /// Structure for constructing segment out of several other segments | |
| pub struct SegmentBuilder { | |
| version: SeqNumberType, | |
| id_tracker: IdTrackerEnum, | |
| payload_storage: PayloadStorageEnum, | |
| vector_storages: HashMap<String, VectorStorageEnum>, | |
| segment_config: SegmentConfig, | |
| // The path, where fully created segment will be moved | |
| destination_path: PathBuf, | |
| // The temporary segment directory | |
| temp_dir: TempDir, | |
| indexed_fields: HashMap<PayloadKeyType, PayloadFieldSchema>, | |
| // Payload key to deframent data to | |
| defragment_keys: Vec<PayloadKeyType>, | |
| } | |
| impl SegmentBuilder { | |
| pub fn new( | |
| segment_path: &Path, | |
| temp_dir: &Path, | |
| segment_config: &SegmentConfig, | |
| ) -> OperationResult<Self> { | |
| // When we build a new segment, it is empty at first, | |
| // so we can ignore the `stopped` flag | |
| let stopped = AtomicBool::new(false); | |
| let temp_dir = create_temp_dir(temp_dir)?; | |
| let database = open_segment_db(temp_dir.path(), segment_config)?; | |
| let id_tracker = if segment_config.is_appendable() { | |
| IdTrackerEnum::MutableIdTracker(create_mutable_id_tracker(database.clone())?) | |
| } else { | |
| IdTrackerEnum::InMemoryIdTracker(InMemoryIdTracker::new()) | |
| }; | |
| let payload_storage = | |
| create_payload_storage(database.clone(), segment_config, segment_path)?; | |
| let mut vector_storages = HashMap::new(); | |
| for (vector_name, vector_config) in &segment_config.vector_data { | |
| let vector_storage_path = get_vector_storage_path(temp_dir.path(), vector_name); | |
| let vector_storage = open_vector_storage( | |
| &database, | |
| vector_config, | |
| &stopped, | |
| &vector_storage_path, | |
| vector_name, | |
| )?; | |
| vector_storages.insert(vector_name.to_owned(), vector_storage); | |
| } | |
| for (vector_name, _sparse_vector_config) in &segment_config.sparse_vector_data { | |
| // `_sparse_vector_config` should be used, once we are able to initialize storage with | |
| // different datatypes | |
| let vector_storage = | |
| create_sparse_vector_storage(database.clone(), vector_name, &stopped)?; | |
| vector_storages.insert(vector_name.to_owned(), vector_storage); | |
| } | |
| let destination_path = new_segment_path(segment_path); | |
| Ok(SegmentBuilder { | |
| version: Default::default(), // default version is 0 | |
| id_tracker, | |
| payload_storage, | |
| vector_storages, | |
| segment_config: segment_config.clone(), | |
| destination_path, | |
| temp_dir, | |
| indexed_fields: Default::default(), | |
| defragment_keys: vec![], | |
| }) | |
| } | |
| pub fn set_defragment_keys(&mut self, keys: Vec<PayloadKeyType>) { | |
| self.defragment_keys = keys; | |
| } | |
| pub fn remove_indexed_field(&mut self, field: &PayloadKeyType) { | |
| self.indexed_fields.remove(field); | |
| } | |
| pub fn add_indexed_field(&mut self, field: PayloadKeyType, schema: PayloadFieldSchema) { | |
| self.indexed_fields.insert(field, schema); | |
| } | |
| /// Get ordering value from the payload index | |
| /// | |
| /// Ordering value is used to sort points to keep points with the same payload together | |
| /// Under the assumption that points are queried together, this will reduce the number of | |
| /// random disk reads. | |
| /// | |
| /// Note: This value doesn't guarantee strict ordering in ambiguous cases. | |
| /// It should only be used in optimization purposes, not for correctness. | |
| fn _get_ordering_value(internal_id: PointOffsetType, indices: &[FieldIndex]) -> u64 { | |
| let mut ordering = 0; | |
| for payload_index in indices { | |
| match payload_index { | |
| FieldIndex::IntMapIndex(index) => { | |
| if let Some(numbers) = index.get_values(internal_id) { | |
| for number in numbers { | |
| ordering = ordering.wrapping_add(*number as u64); | |
| } | |
| } | |
| break; | |
| } | |
| FieldIndex::KeywordIndex(index) => { | |
| if let Some(keywords) = index.get_values(internal_id) { | |
| for keyword in keywords { | |
| let mut hasher = AHasher::default(); | |
| keyword.hash(&mut hasher); | |
| ordering = ordering.wrapping_add(hasher.finish()); | |
| } | |
| } | |
| break; | |
| } | |
| FieldIndex::IntIndex(index) => { | |
| if let Some(numbers) = index.get_values(internal_id) { | |
| for number in numbers { | |
| ordering = ordering.wrapping_add(number as u64); | |
| } | |
| } | |
| break; | |
| } | |
| FieldIndex::FloatIndex(index) => { | |
| if let Some(numbers) = index.get_values(internal_id) { | |
| for number in numbers { | |
| // Bit-level conversion of f64 to u64 preserves ordering | |
| // (for positive numbers) | |
| // | |
| // 0.001 -> 4562254508917369340 | |
| // 0.01 -> 4576918229304087675 | |
| // 0.05 -> 4587366580439587226 | |
| // 0.1 -> 4591870180066957722 | |
| // 1 -> 4607182418800017408 | |
| // 2 -> 4611686018427387904 | |
| // 10 -> 4621819117588971520 | |
| ordering = ordering.wrapping_add(number.to_bits()); | |
| } | |
| } | |
| break; | |
| } | |
| FieldIndex::DatetimeIndex(index) => { | |
| if let Some(dates) = index.get_values(internal_id) { | |
| for date in dates { | |
| ordering = ordering.wrapping_add(date as u64); | |
| } | |
| } | |
| break; | |
| } | |
| FieldIndex::UuidMapIndex(index) => { | |
| if let Some(ids) = index.get_values(internal_id) { | |
| uuid_hash(&mut ordering, ids.copied()); | |
| } | |
| break; | |
| } | |
| FieldIndex::UuidIndex(index) => { | |
| if let Some(ids) = index.get_values(internal_id) { | |
| uuid_hash(&mut ordering, ids); | |
| } | |
| break; | |
| } | |
| FieldIndex::GeoIndex(_) => {} | |
| FieldIndex::FullTextIndex(_) => {} | |
| FieldIndex::BinaryIndex(_) => {} | |
| } | |
| } | |
| ordering | |
| } | |
| /// Update current segment builder with all (not deleted) vectors and payload from `segments`. | |
| /// Also defragments if the `defragment_key` is set. | |
| /// However only points in the same call get defragmented and grouped together. | |
| /// Therefore this function should only be called once, unless this behavior is desired. | |
| /// | |
| /// # Result | |
| /// | |
| /// * `bool` - if `true` - data successfully added, if `false` - process was interrupted | |
| /// | |
| pub fn update(&mut self, segments: &[&Segment], stopped: &AtomicBool) -> OperationResult<bool> { | |
| if segments.is_empty() { | |
| return Ok(true); | |
| } | |
| let mut merged_points: HashMap<ExtendedPointId, PositionedPointMetadata> = HashMap::new(); | |
| for (segment_index, segment) in segments.iter().enumerate() { | |
| for external_id in segment.iter_points() { | |
| let version = segment.point_version(external_id).unwrap_or(0); | |
| merged_points | |
| .entry(external_id) | |
| .and_modify(|entry| { | |
| if entry.version < version { | |
| entry.segment_index = segment_index; | |
| entry.version = version; | |
| } | |
| }) | |
| .or_insert_with(|| { | |
| let internal_id = segment.get_internal_id(external_id).unwrap(); | |
| PositionedPointMetadata { | |
| segment_index, | |
| internal_id, | |
| external_id, | |
| version, | |
| ordering: 0, | |
| } | |
| }); | |
| } | |
| } | |
| let payloads: Vec<_> = segments.iter().map(|i| i.payload_index.borrow()).collect(); | |
| let mut points_to_insert: Vec<_> = merged_points.into_values().collect(); | |
| for defragment_key in &self.defragment_keys { | |
| for point_data in &mut points_to_insert { | |
| let Some(payload_indices) = payloads[point_data.segment_index] | |
| .field_indexes | |
| .get(defragment_key) | |
| else { | |
| continue; | |
| }; | |
| point_data.ordering = point_data.ordering.wrapping_add(Self::_get_ordering_value( | |
| point_data.internal_id, | |
| payload_indices, | |
| )); | |
| } | |
| } | |
| if !self.defragment_keys.is_empty() { | |
| points_to_insert.sort_unstable_by_key(|i| i.ordering); | |
| } | |
| let src_segment_max_version = segments.iter().map(|i| i.version()).max().unwrap(); | |
| self.version = cmp::max(self.version, src_segment_max_version); | |
| let vector_storages: Vec<_> = segments.iter().map(|i| &i.vector_data).collect(); | |
| let mut new_internal_range = None; | |
| for (vector_name, vector_storage) in &mut self.vector_storages { | |
| check_process_stopped(stopped)?; | |
| let other_vector_storages = vector_storages | |
| .iter() | |
| .map(|i| { | |
| let other_vector_storage = i.get(vector_name).ok_or_else(|| { | |
| OperationError::service_error(format!( | |
| "Cannot update from other segment because if missing vector name {vector_name}" | |
| )) | |
| })?; | |
| Ok(other_vector_storage.vector_storage.borrow()) | |
| }) | |
| .collect::<Result<Vec<_>, OperationError>>()?; | |
| let mut iter = points_to_insert.iter().map(|point_data| { | |
| let other_vector_storage = &other_vector_storages[point_data.segment_index]; | |
| let vec = other_vector_storage.get_vector(point_data.internal_id); | |
| let vector_deleted = other_vector_storage.is_deleted_vector(point_data.internal_id); | |
| (vec, vector_deleted) | |
| }); | |
| let internal_range = vector_storage.update_from(&mut iter, stopped)?; | |
| match &new_internal_range { | |
| Some(new_internal_range) => { | |
| if new_internal_range != &internal_range { | |
| return Err(OperationError::service_error( | |
| "Internal ids range mismatch between self segment vectors and other segment vectors", | |
| )); | |
| } | |
| } | |
| None => new_internal_range = Some(internal_range), | |
| } | |
| } | |
| if let Some(new_internal_range) = new_internal_range { | |
| let internal_id_iter = new_internal_range.zip(points_to_insert.iter()); | |
| for (new_internal_id, point_data) in internal_id_iter { | |
| check_process_stopped(stopped)?; | |
| let old_internal_id = point_data.internal_id; | |
| let other_payload = | |
| payloads[point_data.segment_index].get_payload(old_internal_id)?; | |
| match self.id_tracker.internal_id(point_data.external_id) { | |
| Some(existing_internal_id) => { | |
| debug_assert!( | |
| false, | |
| "This code should not be reachable, cause points were resolved with `merged_points`" | |
| ); | |
| let existing_external_version = self | |
| .id_tracker | |
| .internal_version(existing_internal_id) | |
| .unwrap(); | |
| let remove_id = if existing_external_version < point_data.version { | |
| // Other version is the newest, remove the existing one and replace | |
| self.id_tracker.drop(point_data.external_id)?; | |
| self.id_tracker | |
| .set_link(point_data.external_id, new_internal_id)?; | |
| self.id_tracker | |
| .set_internal_version(new_internal_id, point_data.version)?; | |
| self.payload_storage.clear(existing_internal_id)?; | |
| existing_internal_id | |
| } else { | |
| // Old version is still good, do not move anything else | |
| // Mark newly added vector as removed | |
| new_internal_id | |
| }; | |
| for vector_storage in self.vector_storages.values_mut() { | |
| vector_storage.delete_vector(remove_id)?; | |
| } | |
| } | |
| None => { | |
| self.id_tracker | |
| .set_link(point_data.external_id, new_internal_id)?; | |
| self.id_tracker | |
| .set_internal_version(new_internal_id, point_data.version)?; | |
| } | |
| } | |
| // Propagate payload to new segment | |
| if !other_payload.is_empty() { | |
| self.payload_storage.set(new_internal_id, &other_payload)?; | |
| } | |
| } | |
| } | |
| for payload in payloads { | |
| for (field, payload_schema) in payload.indexed_fields() { | |
| self.indexed_fields.insert(field, payload_schema); | |
| } | |
| } | |
| Ok(true) | |
| } | |
| pub fn build(self, permit: CpuPermit, stopped: &AtomicBool) -> Result<Segment, OperationError> { | |
| let (temp_dir, destination_path) = { | |
| let SegmentBuilder { | |
| version, | |
| id_tracker, | |
| payload_storage, | |
| mut vector_storages, | |
| segment_config, | |
| destination_path, | |
| temp_dir, | |
| indexed_fields, | |
| defragment_keys: _, | |
| } = self; | |
| let appendable_flag = segment_config.is_appendable(); | |
| payload_storage.flusher()()?; | |
| let payload_storage_arc = Arc::new(AtomicRefCell::new(payload_storage)); | |
| let id_tracker = match id_tracker { | |
| IdTrackerEnum::InMemoryIdTracker(in_memory_id_tracker) => { | |
| let (versions, mappings) = in_memory_id_tracker.into_internal(); | |
| let immutable_id_tracker = | |
| ImmutableIdTracker::new(temp_dir.path(), &versions, mappings)?; | |
| IdTrackerEnum::ImmutableIdTracker(immutable_id_tracker) | |
| } | |
| IdTrackerEnum::MutableIdTracker(_) => id_tracker, | |
| IdTrackerEnum::ImmutableIdTracker(_) => { | |
| unreachable!("ImmutableIdTracker should not be used for building segment") | |
| } | |
| }; | |
| id_tracker.mapping_flusher()()?; | |
| id_tracker.versions_flusher()()?; | |
| let id_tracker_arc = Arc::new(AtomicRefCell::new(id_tracker)); | |
| // Arc permit to share it with each vector store | |
| let permit = Arc::new(permit); | |
| let mut quantized_vectors = Self::update_quantization( | |
| &segment_config, | |
| &vector_storages, | |
| temp_dir.path(), | |
| &permit, | |
| stopped, | |
| )?; | |
| let mut vector_storages_arc = HashMap::new(); | |
| for vector_name in segment_config.vector_data.keys() { | |
| let Some(vector_storage) = vector_storages.remove(vector_name) else { | |
| return Err(OperationError::service_error(format!( | |
| "Vector storage for vector name {vector_name} not found on segment build" | |
| ))); | |
| }; | |
| vector_storage.flusher()()?; | |
| let vector_storage_arc = Arc::new(AtomicRefCell::new(vector_storage)); | |
| vector_storages_arc.insert(vector_name.to_owned(), vector_storage_arc); | |
| } | |
| for vector_name in segment_config.sparse_vector_data.keys() { | |
| let Some(vector_storage) = vector_storages.remove(vector_name) else { | |
| return Err(OperationError::service_error(format!( | |
| "Vector storage for vector name {vector_name} not found on sparse segment build" | |
| ))); | |
| }; | |
| vector_storage.flusher()()?; | |
| let vector_storage_arc = Arc::new(AtomicRefCell::new(vector_storage)); | |
| vector_storages_arc.insert(vector_name.to_owned(), vector_storage_arc); | |
| } | |
| let payload_index_path = get_payload_index_path(temp_dir.path()); | |
| let mut payload_index = StructPayloadIndex::open( | |
| payload_storage_arc, | |
| id_tracker_arc.clone(), | |
| vector_storages_arc.clone(), | |
| &payload_index_path, | |
| appendable_flag, | |
| )?; | |
| for (field, payload_schema) in indexed_fields { | |
| payload_index.set_indexed(&field, payload_schema)?; | |
| check_process_stopped(stopped)?; | |
| } | |
| payload_index.flusher()()?; | |
| let payload_index_arc = Arc::new(AtomicRefCell::new(payload_index)); | |
| for (vector_name, vector_config) in &segment_config.vector_data { | |
| let vector_storage_arc = vector_storages_arc.remove(vector_name).unwrap(); | |
| let vector_index_path = get_vector_index_path(temp_dir.path(), vector_name); | |
| let quantized_vectors = quantized_vectors.remove(vector_name); | |
| let quantized_vectors_arc = Arc::new(AtomicRefCell::new(quantized_vectors)); | |
| create_vector_index( | |
| vector_config, | |
| &vector_index_path, | |
| id_tracker_arc.clone(), | |
| vector_storage_arc, | |
| payload_index_arc.clone(), | |
| quantized_vectors_arc, | |
| Some(permit.clone()), | |
| stopped, | |
| )?; | |
| } | |
| for (vector_name, sparse_vector_config) in &segment_config.sparse_vector_data { | |
| let vector_index_path = get_vector_index_path(temp_dir.path(), vector_name); | |
| let vector_storage_arc = vector_storages_arc.remove(vector_name).unwrap(); | |
| create_sparse_vector_index(SparseVectorIndexOpenArgs { | |
| config: sparse_vector_config.index, | |
| id_tracker: id_tracker_arc.clone(), | |
| vector_storage: vector_storage_arc.clone(), | |
| payload_index: payload_index_arc.clone(), | |
| path: &vector_index_path, | |
| stopped, | |
| tick_progress: || (), | |
| })?; | |
| } | |
| // We're done with CPU-intensive tasks, release CPU permit | |
| debug_assert_eq!( | |
| Arc::strong_count(&permit), | |
| 1, | |
| "Must release CPU permit Arc everywhere", | |
| ); | |
| drop(permit); | |
| // Finalize the newly created segment by saving config and version | |
| Segment::save_state( | |
| &SegmentState { | |
| version: Some(version), | |
| config: segment_config, | |
| }, | |
| temp_dir.path(), | |
| )?; | |
| // After version is saved, segment can be loaded on restart | |
| SegmentVersion::save(temp_dir.path())?; | |
| // All temp data is evicted from RAM | |
| (temp_dir, destination_path) | |
| }; | |
| // Move fully constructed segment into collection directory and load back to RAM | |
| std::fs::rename(temp_dir.into_path(), &destination_path) | |
| .describe("Moving segment data after optimization")?; | |
| let loaded_segment = load_segment(&destination_path, stopped)?.ok_or_else(|| { | |
| OperationError::service_error(format!( | |
| "Segment loading error: {}", | |
| destination_path.display() | |
| )) | |
| })?; | |
| Ok(loaded_segment) | |
| } | |
| fn update_quantization( | |
| segment_config: &SegmentConfig, | |
| vector_storages: &HashMap<String, VectorStorageEnum>, | |
| temp_path: &Path, | |
| permit: &CpuPermit, | |
| stopped: &AtomicBool, | |
| ) -> OperationResult<HashMap<String, QuantizedVectors>> { | |
| let config = segment_config.clone(); | |
| let mut quantized_vectors_map = HashMap::new(); | |
| for (vector_name, vector_storage) in vector_storages { | |
| let Some(vector_config) = config.vector_data.get(vector_name) else { | |
| continue; | |
| }; | |
| let is_appendable = vector_config.is_appendable(); | |
| // Don't build quantization for appendable vectors | |
| if is_appendable { | |
| continue; | |
| } | |
| let max_threads = permit.num_cpus as usize; | |
| if let Some(quantization) = config.quantization_config(vector_name) { | |
| let segment_path = temp_path; | |
| check_process_stopped(stopped)?; | |
| let vector_storage_path = get_vector_storage_path(segment_path, vector_name); | |
| let quantized_vectors = QuantizedVectors::create( | |
| vector_storage, | |
| quantization, | |
| &vector_storage_path, | |
| max_threads, | |
| stopped, | |
| )?; | |
| quantized_vectors_map.insert(vector_name.to_owned(), quantized_vectors); | |
| } | |
| } | |
| Ok(quantized_vectors_map) | |
| } | |
| } | |
| fn uuid_hash<I>(hash: &mut u64, ids: I) | |
| where | |
| I: Iterator<Item = u128>, | |
| { | |
| for id in ids { | |
| let uuid = Uuid::from_u128(id); | |
| // Not all Uuid versions hold timestamp data. The most common version, v4 for example is completely | |
| // random and can't be sorted. To still allow defragmentation, we assume that usually the same | |
| // version gets used for a payload key and implement an alternative sorting criteria, that just | |
| // takes the Uuids bytes to group equal Uuids together. | |
| if let Some(timestamp) = uuid.get_timestamp() { | |
| *hash = hash.wrapping_add(timestamp.to_gregorian().0); | |
| } else { | |
| // First part of u128 | |
| *hash = hash.wrapping_add((id >> 64) as u64); | |
| // Second part of u128 | |
| *hash = hash.wrapping_add(id as u64); | |
| } | |
| } | |
| } | |
| fn create_temp_dir(parent_path: &Path) -> Result<TempDir, OperationError> { | |
| // Ensure parent path exists | |
| std::fs::create_dir_all(parent_path) | |
| .and_then(|_| TempDir::with_prefix_in("segment_builder_", parent_path)) | |
| .map_err(|err| { | |
| OperationError::service_error(format!( | |
| "Could not create temp directory in `{}`: {}", | |
| parent_path.display(), | |
| err | |
| )) | |
| }) | |
| } | |
| /// Internal point ID and metadata of a point. | |
| struct PositionedPointMetadata { | |
| segment_index: usize, | |
| internal_id: PointOffsetType, | |
| external_id: ExtendedPointId, | |
| version: SeqNumberType, | |
| ordering: u64, | |
| } | |