diff --git a/README.md b/README.md index 754ee0fb..c436a2e9 100644 --- a/README.md +++ b/README.md @@ -45,7 +45,7 @@ cargo x book If you changed public API in the reference solution, you might also need to synchronize it to the starter crate. To do this, use `cargo x sync`. -## Structure +## Code Structure * mini-lsm: the final solution code for <= week 2 * mini-lsm-mvcc: the final solution code for week 3 MVCC @@ -70,27 +70,38 @@ cargo run --bin compaction-simulator-ref cargo run --bin compaction-simulator-mvcc-ref ``` -## Progress +## Tutorial Structure -We are working on chapter 3 and more test cases for all existing contents. +We have 3 weeks + 1 extra week (in progress) for this tutorial. * Week 1: Storage Format + Engine Skeleton * Week 2: Compaction and Persistence * Week 3: Multi-Version Concurrency Control -* The Extra Week / Rest of Your Life: Optimizations (unlikely to be available in 2024...) - -✅: Finished \ -🚧: WIP and will likely be available soon - -| Week + Chapter | Topic | Solution | Starter Code | Writeup | -| -------------- | ----------------------------------------------- | -------- | ------------ | ------- | -| 3.1 | Timestamp Key Encoding | ✅ | ✅ | ✅ | -| 3.2 | Snapshot Read - Blocks, Memtables, and SSTs | ✅ | ✅ | ✅ | -| 3.3 | Snapshot Read - Engine Read Path | ✅ | ✅ | ✅ | -| 3.4 | Watermark and Garbage Collection | ✅ | ✅ | ✅ | -| 3.5 | Transactions and Optimistic Concurrency Control | ✅ | ✅ | ✅ | -| 3.6 | Serializable Snapshot Isolation | ✅ | ✅ | ✅ | -| 3.7 | Compaction Filter | 🚧 | | | +* The Extra Week / Rest of Your Life: Optimizations (unlikely to be available in 2024...) + +| Week + Chapter | Topic | +| -------------- | ----------------------------------------------------------- | +| 1.1 | Memtable | +| 1.2 | Merge Iterator | +| 1.3 | Block | +| 1.4 | Sorted String Table (SST) | +| 1.5 | Read Path | +| 1.6 | Write Path | +| 1.7 | SST Optimizations: Prefix Key Encoding + Bloom Filters | +| 2.1 | Compaction Implementation | +| 2.2 | Simple Compaction Strategy (Traditional Leveled Compaction) | +| 2.3 | Tiered Compaction Strategy (RocksDB Universal Compaction) | +| 2.4 | Leveled Compaction Strategy (RocksDB Leveled Compaction) | +| 2.5 | Manifest | +| 2.6 | Write-Ahead Log (WAL) | +| 2.7 | Batch Write and Checksums | +| 3.1 | Timestamp Key Encoding | +| 3.2 | Snapshot Read - Memtables and Timestamps | +| 3.3 | Snapshot Read - Transaction API | +| 3.4 | Watermark and Garbage Collection | +| 3.5 | Transactions and Optimistic Concurrency Control | +| 3.6 | Serializable Snapshot Isolation | +| 3.7 | Compaction Filters | ## License diff --git a/mini-lsm-book/src/SUMMARY.md b/mini-lsm-book/src/SUMMARY.md index aed4da92..ed66d7d8 100644 --- a/mini-lsm-book/src/SUMMARY.md +++ b/mini-lsm-book/src/SUMMARY.md @@ -29,7 +29,7 @@ - [Watermark and GC](./week3-04-watermark.md) - [Transaction and OCC](./week3-05-txn-occ.md) - [Serializable Snapshot Isolation](./week3-06-serializable.md) - - [Snack Time: Compaction Filter (WIP)](./week3-07-compaction-filter.md) + - [Snack Time: Compaction Filters](./week3-07-compaction-filter.md) - [The Rest of Your Life (TBD)](./week4-overview.md) --- diff --git a/mini-lsm-book/src/week3-07-compaction-filter.md b/mini-lsm-book/src/week3-07-compaction-filter.md index 82213a63..a9a5eea0 100644 --- a/mini-lsm-book/src/week3-07-compaction-filter.md +++ b/mini-lsm-book/src/week3-07-compaction-filter.md @@ -1,4 +1,46 @@ -# Snack Time: Compaction Filter +# Snack Time: Compaction Filters +Congratulations! You made it there! In the previous chapter, you made your LSM engine multi-version capable, and the users can use transaction APIs to interact with your storage engine. At the end of this week, we will implement some easy but important features of the storage engine. Welcome to Mini-LSM's week 3 snack time! + +In this chapter, we will generalize our compaction garbage collection logic to become compaction filters. + +For now, our compaction will simply retain the keys above the watermark and the latest version of the keys below the watermark. We can add some magic to the compaction process to help the user collect some unused data automatically as a background job. + +Consider a case that the user uses Mini-LSM to store database tables. Each row in the table are prefixed with the table name. For example, + +``` +table1_key1 -> row +table1_key2 -> row +table1_key3 -> row +table2_key1 -> row +table2_key2 -> row +``` + +Now the user executes `DROP TABLE table1`. The engine will need to clean up all the data beginning with `table1`. + +There are a lot of ways to achieve the goal. The user of Mini-LSM can scan all the keys beginning with `table1` and requests the engine to delete it. However, scanning a very large database might be slow, and it will generate the same number of delete tombstones as the existing keys. Therefore, scan-and-delete will not free up the space occupied by the dropped table -- instead, it will add more data to the engine and the space can only be reclaimed when the tombstones reach the bottom level of the engine. + +Or, they can create column families (we will talk about this in *rest of your life* chapter). They store each table in a column family, which is a standalone LSM state, and directly remove the SST files corresponding to the column family when the user drop the table. + +In this tutorial, we will implement the third approach: compaction filters. Compaction filters can be dynamically added to the engine at runtime. During the compaction, if a key matching the compaction filter is found, we can silently remove it in the background. Therefore, the user can attach a compaction filter of `prefix=table1` to the engine, and all these keys will be removed during compaction. + +## Task 1: Compaction Filter + +In this task, you will need to modify: + +``` +src/compact.rs +``` + +You can iterate all compaction filters in `LsmStorageInner::compaction_filters`. If the first version of the key below watermark matches the compaction filter, simply remove it instead of keeping it in the SST file. + +To run test cases, + +``` +cargo x copy-test --week 3 --day 7 +cargo x scheck +``` + +You can assume that the user will not get the keys within the prefix filter range. And, they will not scan the keys in the prefix range. Therefore, it is okay to return a wrong value when a user requests the keys in the prefix filter range (i.e., undefined behavior). {{#include copyright.md}} diff --git a/mini-lsm-mvcc/src/compact.rs b/mini-lsm-mvcc/src/compact.rs index 06df389e..cc52c63f 100644 --- a/mini-lsm-mvcc/src/compact.rs +++ b/mini-lsm-mvcc/src/compact.rs @@ -19,7 +19,7 @@ use crate::iterators::merge_iterator::MergeIterator; use crate::iterators::two_merge_iterator::TwoMergeIterator; use crate::iterators::StorageIterator; use crate::key::KeySlice; -use crate::lsm_storage::{LsmStorageInner, LsmStorageState}; +use crate::lsm_storage::{CompactionFilter, LsmStorageInner, LsmStorageState}; use crate::manifest::ManifestRecord; use crate::table::{SsTable, SsTableBuilder, SsTableIterator}; @@ -122,7 +122,8 @@ impl LsmStorageInner { let watermark = self.mvcc().watermark(); let mut last_key = Vec::::new(); let mut first_key_below_watermark = false; - while iter.is_valid() { + let compaction_filters = self.compaction_filters.lock().clone(); + 'outer: while iter.is_valid() { if builder.is_none() { builder = Some(SsTableBuilder::new(self.options.block_size)); } @@ -144,12 +145,26 @@ impl LsmStorageInner { continue; } - if same_as_last_key && iter.key().ts() <= watermark { - if !first_key_below_watermark { + if iter.key().ts() <= watermark { + if same_as_last_key && !first_key_below_watermark { iter.next()?; continue; } + first_key_below_watermark = false; + + if !compaction_filters.is_empty() { + for filter in &compaction_filters { + match filter { + CompactionFilter::Prefix(x) => { + if iter.key().key_ref().starts_with(x) { + iter.next()?; + continue 'outer; + } + } + } + } + } } let builder_inner = builder.as_mut().unwrap(); diff --git a/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs b/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs index 8488cd28..2ff2ce3f 100644 --- a/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs +++ b/mini-lsm-mvcc/src/iterators/two_merge_iterator.rs @@ -53,8 +53,10 @@ impl< fn key(&self) -> A::KeyType<'_> { if self.choose_a { + debug_assert!(self.a.is_valid()); self.a.key() } else { + debug_assert!(self.b.is_valid()); self.b.key() } } diff --git a/mini-lsm-mvcc/src/lsm_storage.rs b/mini-lsm-mvcc/src/lsm_storage.rs index c284b4a7..25786dfd 100644 --- a/mini-lsm-mvcc/src/lsm_storage.rs +++ b/mini-lsm-mvcc/src/lsm_storage.rs @@ -149,6 +149,11 @@ fn key_within(user_key: &[u8], table_begin: KeySlice, table_end: KeySlice) -> bo table_begin.key_ref() <= user_key && user_key <= table_end.key_ref() } +#[derive(Clone, Debug)] +pub enum CompactionFilter { + Prefix(Bytes), +} + /// The storage interface of the LSM tree. pub(crate) struct LsmStorageInner { pub(crate) state: Arc>>, @@ -160,6 +165,7 @@ pub(crate) struct LsmStorageInner { pub(crate) compaction_controller: CompactionController, pub(crate) manifest: Option, pub(crate) mvcc: Option, + pub(crate) compaction_filters: Arc>>, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. @@ -243,6 +249,10 @@ impl MiniLsm { })) } + pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { + self.inner.add_compaction_filter(compaction_filter) + } + pub fn get(&self, key: &[u8]) -> Result> { self.inner.get(key) } @@ -431,12 +441,18 @@ impl LsmStorageInner { manifest: Some(manifest), options: options.into(), mvcc: Some(LsmMvccInner::new(last_commit_ts)), + compaction_filters: Arc::new(Mutex::new(Vec::new())), }; storage.sync_dir()?; Ok(storage) } + pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { + let mut compaction_filters = self.compaction_filters.lock(); + compaction_filters.push(compaction_filter); + } + pub fn sync(&self) -> Result<()> { self.state.read().memtable.sync_wal() } diff --git a/mini-lsm-mvcc/src/tests.rs b/mini-lsm-mvcc/src/tests.rs index ab4e4052..7b12c113 100644 --- a/mini-lsm-mvcc/src/tests.rs +++ b/mini-lsm-mvcc/src/tests.rs @@ -18,3 +18,4 @@ mod week3_day3; mod week3_day4; mod week3_day5; mod week3_day6; +mod week3_day7; diff --git a/mini-lsm-mvcc/src/tests/week3_day7.rs b/mini-lsm-mvcc/src/tests/week3_day7.rs new file mode 100644 index 00000000..bfbc05d7 --- /dev/null +++ b/mini-lsm-mvcc/src/tests/week3_day7.rs @@ -0,0 +1,70 @@ +use bytes::Bytes; +use tempfile::tempdir; + +use crate::{ + compact::CompactionOptions, + lsm_storage::{CompactionFilter, LsmStorageOptions, MiniLsm, WriteBatchRecord}, +}; + +use super::harness::{check_iter_result_by_key, construct_merge_iterator_over_storage}; + +#[test] +fn test_task3_mvcc_compaction() { + let dir = tempdir().unwrap(); + let options = LsmStorageOptions::default_for_week2_test(CompactionOptions::NoCompaction); + let storage = MiniLsm::open(&dir, options.clone()).unwrap(); + storage + .write_batch(&[ + WriteBatchRecord::Put("table1_a", "1"), + WriteBatchRecord::Put("table1_b", "1"), + WriteBatchRecord::Put("table1_c", "1"), + WriteBatchRecord::Put("table2_a", "1"), + WriteBatchRecord::Put("table2_b", "1"), + WriteBatchRecord::Put("table2_c", "1"), + ]) + .unwrap(); + storage.force_flush().unwrap(); + let snapshot0 = storage.new_txn().unwrap(); + storage + .write_batch(&[ + WriteBatchRecord::Put("table1_a", "2"), + WriteBatchRecord::Del("table1_b"), + WriteBatchRecord::Put("table1_c", "2"), + WriteBatchRecord::Put("table2_a", "2"), + WriteBatchRecord::Del("table2_b"), + WriteBatchRecord::Put("table2_c", "2"), + ]) + .unwrap(); + storage.force_flush().unwrap(); + storage.add_compaction_filter(CompactionFilter::Prefix(Bytes::from("table2_"))); + storage.force_full_compaction().unwrap(); + + let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from("table1_a"), Bytes::from("2")), + (Bytes::from("table1_a"), Bytes::from("1")), + (Bytes::from("table1_b"), Bytes::new()), + (Bytes::from("table1_b"), Bytes::from("1")), + (Bytes::from("table1_c"), Bytes::from("2")), + (Bytes::from("table1_c"), Bytes::from("1")), + (Bytes::from("table2_a"), Bytes::from("2")), + (Bytes::from("table2_b"), Bytes::new()), + (Bytes::from("table2_c"), Bytes::from("2")), + ], + ); + + drop(snapshot0); + + storage.force_full_compaction().unwrap(); + + let mut iter = construct_merge_iterator_over_storage(&storage.inner.state.read()); + check_iter_result_by_key( + &mut iter, + vec![ + (Bytes::from("table1_a"), Bytes::from("2")), + (Bytes::from("table1_c"), Bytes::from("2")), + ], + ); +} diff --git a/mini-lsm-starter/src/lsm_storage.rs b/mini-lsm-starter/src/lsm_storage.rs index 428ef25b..94c646f8 100644 --- a/mini-lsm-starter/src/lsm_storage.rs +++ b/mini-lsm-starter/src/lsm_storage.rs @@ -113,6 +113,11 @@ impl LsmStorageOptions { } } +#[derive(Clone, Debug)] +pub enum CompactionFilter { + Prefix(Bytes), +} + /// The storage interface of the LSM tree. pub(crate) struct LsmStorageInner { pub(crate) state: Arc>>, @@ -124,6 +129,7 @@ pub(crate) struct LsmStorageInner { pub(crate) compaction_controller: CompactionController, pub(crate) manifest: Option, pub(crate) mvcc: Option, + pub(crate) compaction_filters: Arc>>, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. @@ -176,6 +182,10 @@ impl MiniLsm { self.inner.write_batch(batch) } + pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { + self.inner.add_compaction_filter(compaction_filter) + } + pub fn get(&self, key: &[u8]) -> Result> { self.inner.get(key) } @@ -252,6 +262,7 @@ impl LsmStorageInner { manifest: None, options: options.into(), mvcc: None, + compaction_filters: Arc::new(Mutex::new(Vec::new())), }; Ok(storage) @@ -261,6 +272,11 @@ impl LsmStorageInner { unimplemented!() } + pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { + let mut compaction_filters = self.compaction_filters.lock(); + compaction_filters.push(compaction_filter); + } + /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(&self, _key: &[u8]) -> Result> { unimplemented!() diff --git a/mini-lsm/src/lsm_storage.rs b/mini-lsm/src/lsm_storage.rs index 738fdbde..0ee8497a 100644 --- a/mini-lsm/src/lsm_storage.rs +++ b/mini-lsm/src/lsm_storage.rs @@ -148,6 +148,11 @@ fn key_within(user_key: &[u8], table_begin: KeySlice, table_end: KeySlice) -> bo table_begin.raw_ref() <= user_key && user_key <= table_end.raw_ref() } +#[derive(Clone, Debug)] +pub enum CompactionFilter { + Prefix(Bytes), +} + /// The storage interface of the LSM tree. pub(crate) struct LsmStorageInner { pub(crate) state: Arc>>, @@ -160,6 +165,8 @@ pub(crate) struct LsmStorageInner { pub(crate) manifest: Option, #[allow(dead_code)] pub(crate) mvcc: Option, + #[allow(dead_code)] + pub(crate) compaction_filters: Arc>>, } /// A thin wrapper for `LsmStorageInner` and the user interface for MiniLSM. @@ -243,6 +250,10 @@ impl MiniLsm { })) } + pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { + self.inner.add_compaction_filter(compaction_filter) + } + pub fn get(&self, key: &[u8]) -> Result> { self.inner.get(key) } @@ -418,6 +429,7 @@ impl LsmStorageInner { manifest: Some(manifest), options: options.into(), mvcc: None, + compaction_filters: Arc::new(Mutex::new(Vec::new())), }; storage.sync_dir()?; @@ -428,6 +440,11 @@ impl LsmStorageInner { self.state.read().memtable.sync_wal() } + pub fn add_compaction_filter(&self, compaction_filter: CompactionFilter) { + let mut compaction_filters = self.compaction_filters.lock(); + compaction_filters.push(compaction_filter); + } + /// Get a key from the storage. In day 7, this can be further optimized by using a bloom filter. pub fn get(&self, key: &[u8]) -> Result> { let snapshot = { diff --git a/mini-lsm/src/tests/harness.rs b/mini-lsm/src/tests/harness.rs index 4b0d7e30..afbe8db8 100644 --- a/mini-lsm/src/tests/harness.rs +++ b/mini-lsm/src/tests/harness.rs @@ -341,6 +341,9 @@ pub fn check_compaction_ratio(storage: Arc) { for idx in 1..level_size.len() { let prev_size = level_size[idx - 1]; let this_size = level_size[idx]; + if this_size == 0 { + continue; + } assert!( // do not add hard requirement on level size multiplier considering bloom filters... this_size as f64 / prev_size as f64 >= (level_size_multiplier as f64 - 0.5),