Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazily allocate the vec in the Sorter #43

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 88 additions & 42 deletions src/sorter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,19 +185,26 @@ impl<MF, CC: ChunkCreator> SorterBuilder<MF, CC> {
}

/// Stores entries memory efficiently in a buffer.
struct Entries {
/// The internal buffer that contains the bounds of the buffer
/// on the front and the key and data bytes on the back of it.
///
/// [----bounds---->--remaining--<--key+data--]
///
buffer: EntryBoundAlignedBuffer,

/// The amount of bytes stored in the buffer.
entries_len: usize,

/// The number of bounds stored in the buffer.
bounds_count: usize,
enum Entries {
/// No allocation is performed yet, the buffer will be initialized when needed.
NotInitialized {
/// The size the buffer must allocate when effectively initialized.
buffer_capacity: usize,
},
/// The buffer is allocated and used by the sorter.
Initialized {
/// The internal buffer that contains the bounds of the buffer
/// on the front and the key and data bytes on the back of it.
///
/// [----bounds---->--remaining--<--key+data--]
buffer: EntryBoundAlignedBuffer,

/// The amount of bytes stored in the buffer.
entries_len: usize,

/// The number of bounds stored in the buffer.
bounds_count: usize,
},
}

impl Entries {
Expand All @@ -211,13 +218,15 @@ impl Entries {
/// If you want to be sure about the amount of memory used you can use
/// the `fits` method.
pub fn with_capacity(capacity: usize) -> Self {
Self { buffer: EntryBoundAlignedBuffer::new(capacity), entries_len: 0, bounds_count: 0 }
Self::NotInitialized { buffer_capacity: capacity }
}

/// Clear the entries.
pub fn clear(&mut self) {
self.entries_len = 0;
self.bounds_count = 0;
if let Self::Initialized { entries_len, bounds_count, .. } = self {
*entries_len = 0;
*bounds_count = 0;
}
}

/// Inserts a new entry into the buffer, if there is not
Expand All @@ -226,26 +235,28 @@ impl Entries {
assert!(key.len() <= u32::max_value() as usize);
assert!(data.len() <= u32::max_value() as usize);

let Self::Initialized { buffer, entries_len, bounds_count } = self.initialize();

if self.fits(key, data) {
// We store the key and data bytes one after the other at the back of the buffer.
self.entries_len += key.len() + data.len();
let entries_start = self.buffer.len() - self.entries_len;
self.buffer[entries_start..][..key.len()].copy_from_slice(key);
self.buffer[entries_start + key.len()..][..data.len()].copy_from_slice(data);
entries_len += key.len() + data.len();
let entries_start = buffer.len() - entries_len;
buffer[entries_start..][..key.len()].copy_from_slice(key);
buffer[entries_start + key.len()..][..data.len()].copy_from_slice(data);

let bound = EntryBound {
key_start: self.entries_len,
key_start: entries_len,
key_length: key.len() as u32,
data_length: data.len() as u32,
};

// We store the bounds at the front of the buffer and grow from the end to the start
// of it. We interpret the front of the buffer as a slice of EntryBounds + 1 entry
// that is not assigned and replace it with the new one we want to insert.
let bounds_end = (self.bounds_count + 1) * size_of::<EntryBound>();
let bounds = cast_slice_mut::<_, EntryBound>(&mut self.buffer[..bounds_end]);
bounds[self.bounds_count] = bound;
self.bounds_count += 1;
let bounds_end = (bounds_count + 1) * size_of::<EntryBound>();
let bounds = cast_slice_mut::<_, EntryBound>(&mut buffer[..bounds_end]);
bounds[bounds_count] = bound;
bounds_count += 1;
} else {
self.reallocate_buffer();
self.insert(key, data);
Expand All @@ -254,36 +265,55 @@ impl Entries {

/// Returns `true` if inserting this entry will not trigger a reallocation.
pub fn fits(&self, key: &[u8], data: &[u8]) -> bool {
// The number of memory aligned EntryBounds that we can store.
let aligned_bounds_count = unsafe { self.buffer.align_to::<EntryBound>().1.len() };
let remaining_aligned_bounds = aligned_bounds_count - self.bounds_count;
match *self {
Self::NotInitialized { buffer_capacity } => {
// ...
}
Self::Initialized { buffer, entries_len, bounds_count } => {
// The number of memory aligned EntryBounds that we can store.
let aligned_bounds_count = unsafe { buffer.align_to::<EntryBound>().1.len() };
let remaining_aligned_bounds = aligned_bounds_count - bounds_count;

self.remaining() >= Self::entry_size(key, data) && remaining_aligned_bounds >= 1
self.remaining() >= Self::entry_size(key, data) && remaining_aligned_bounds >= 1
}
}
}

/// Simply returns the size of the internal buffer.
pub fn memory_usage(&self) -> usize {
self.buffer.len()
match self {
Self::NotInitialized { .. } => 0,
Self::Initialized { buffer, .. } => buffer.len(),
}
}

/// Sorts the entry bounds by the entries keys, after a sort
/// the `iter` method will yield the entries sorted.
pub fn sort_by_key(&mut self, algorithm: SortAlgorithm) {
let bounds_end = self.bounds_count * size_of::<EntryBound>();
let (bounds, tail) = self.buffer.split_at_mut(bounds_end);
let bounds = cast_slice_mut::<_, EntryBound>(bounds);
let sort = match algorithm {
SortAlgorithm::Stable => <[EntryBound]>::sort_by_key,
SortAlgorithm::Unstable => <[EntryBound]>::sort_unstable_by_key,
};
sort(bounds, |b: &EntryBound| &tail[tail.len() - b.key_start..][..b.key_length as usize]);
if let Self::Initialized { buffer, entries_len, bounds_count } = *self {
let bounds_end = bounds_count * size_of::<EntryBound>();
let (bounds, tail) = buffer.split_at_mut(bounds_end);
let bounds = cast_slice_mut::<_, EntryBound>(bounds);
let sort = match algorithm {
SortAlgorithm::Stable => <[EntryBound]>::sort_by_key,
SortAlgorithm::Unstable => <[EntryBound]>::sort_unstable_by_key,
};
sort(bounds, |b: &EntryBound| {
&tail[tail.len() - b.key_start..][..b.key_length as usize]
});
}
}

/// Returns an iterator over the keys and datas.
pub fn iter(&self) -> impl Iterator<Item = (&[u8], &[u8])> + '_ {
let bounds_end = self.bounds_count * size_of::<EntryBound>();
let (bounds, tail) = self.buffer.split_at(bounds_end);
let bounds = cast_slice::<_, EntryBound>(bounds);
let (bounds, tail) = match self {
Self::NotInitialized { .. } => (&[][..], &[][..]),
Self::Initialized { buffer, bounds_count, .. } => {
let bounds_end = bounds_count * size_of::<EntryBound>();
let (bounds, tail) = buffer.split_at(bounds_end);
(cast_slice::<_, EntryBound>(bounds), tail)
}
};

bounds.iter().map(move |b| {
let entries_start = tail.len() - b.key_start;
Expand All @@ -304,7 +334,12 @@ impl Entries {

/// The remaining amount of bytes before we need to reallocate a new buffer.
fn remaining(&self) -> usize {
self.buffer.len() - self.entries_len - self.bounds_count * size_of::<EntryBound>()
match *self {
Self::NotInitialized { buffer_capacity } => buffer_capacity,
Self::Initialized { buffer, entries_len, bounds_count } => {
buffer.len() - entries_len - bounds_count * size_of::<EntryBound>()
}
}
}

/// The size that this entry will need to be stored in the buffer.
Expand All @@ -327,6 +362,17 @@ impl Entries {

self.buffer = new_buffer;
}

fn initialize(mut self) -> Self {
match self {
Self::NotInitialized { buffer_capacity } => Self::Initialized {
buffer: EntryBoundAlignedBuffer::new(buffer_capacity),
entries_len: 0,
bounds_count: 0,
},
otherwise => otherwise,
}
}
}

#[derive(Default, Copy, Clone, Pod, Zeroable)]
Expand Down