Skip to content

Commit

Permalink
storage: add TestChunkedCompaction
Browse files Browse the repository at this point in the history
  • Loading branch information
WillemKauf committed Dec 3, 2024
1 parent d6abeca commit 2ca689c
Showing 1 changed file with 59 additions and 35 deletions.
94 changes: 59 additions & 35 deletions src/v/storage/tests/compaction_e2e_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -363,59 +363,83 @@ TEST_F(CompactionFixtureTest, TestDedupeMultiPass) {
ASSERT_NO_FATAL_FAILURE(check_records(cardinality, num_segments - 1).get());
}

// Test that failing to index a single segment will result in the compaction
// sliding window being reset, allowing compaction to potentially make progress
// in the future instead of repeatedly failing to index the same segment.
TEST_F(CompactionFixtureTest, TestFailToIndexOneSegmentResetWindow) {
constexpr auto num_segments = 1;
constexpr auto cardinality = 10;
size_t records_per_segment = cardinality;
generate_data(num_segments, cardinality, records_per_segment).get();

ss::abort_source never_abort;
auto& disk_log = dynamic_cast<storage::disk_log_impl&>(*log);

// Set the last compaction window start offset, just to be sure that its
// value is reset by failing to index a segment.
disk_log.set_last_compaction_window_start_offset(model::offset::max());
TEST_F(CompactionFixtureTest, TestChunkedCompaction) {
constexpr auto num_segments = 3;
constexpr auto cardinality = 100;
size_t batches_per_segment = 5;
size_t records_per_batch = 10;
map_t latest_kv_map;
generate_data(
num_segments,
cardinality,
batches_per_segment,
records_per_batch,
0,
false,
&latest_kv_map)
.get();

// Compact with a max keys value far below the number of keys in the
// segment.
bool did_compact = do_sliding_window_compact(
log->segments().back()->offsets().get_base_offset(),
std::nullopt,
cardinality - 1)
5)
.get();

ASSERT_FALSE(did_compact);
ASSERT_FALSE(
disk_log.get_last_compaction_window_start_offset().has_value());
ASSERT_TRUE(did_compact);

{
tests::kafka_consume_transport consumer(make_kafka_client().get());
consumer.start().get();
auto consumed_kvs = consumer
.consume_from_partition(
topic_name,
model::partition_id(0),
model::offset(0))
.get();
ASSERT_NO_FATAL_FAILURE();

// Generate some more data in such a way that the first segment can be fully
// indexed after the next round of compaction.
generate_data(num_segments, cardinality, records_per_segment / 2).get();
ASSERT_EQ(consumed_kvs.size(), latest_kv_map.size());

// This round of compaction will fully index the last added segment, and
// deduplicate the keys in the first segment enough that it will no longer
// fail to be indexed.
did_compact = do_sliding_window_compact(
log->segments().back()->offsets().get_base_offset(),
std::nullopt,
cardinality - 1)
.get();
// Assert the key consumed is in the latest_kv_map.
for (const auto& kv : consumed_kvs) {
ASSERT_TRUE(latest_kv_map.contains(kv.key));
ASSERT_EQ(kv.val, latest_kv_map[kv.key]);
}
}
auto& disk_log = dynamic_cast<storage::disk_log_impl&>(*log);
const auto& segs = disk_log.segments();

ASSERT_TRUE(segs[0]->finished_self_compaction());
ASSERT_TRUE(segs[0]->finished_windowed_compaction());
ASSERT_FALSE(segs[0]->index().has_clean_compact_timestamp());

ASSERT_TRUE(segs[1]->finished_self_compaction());
ASSERT_TRUE(segs[1]->finished_windowed_compaction());
ASSERT_FALSE(segs[1]->index().has_clean_compact_timestamp());

ASSERT_TRUE(segs[2]->finished_self_compaction());
ASSERT_TRUE(segs[2]->finished_windowed_compaction());
ASSERT_TRUE(segs[2]->index().has_clean_compact_timestamp());

ASSERT_TRUE(did_compact);
ASSERT_TRUE(disk_log.get_last_compaction_window_start_offset().has_value());
ASSERT_EQ(
disk_log.get_last_compaction_window_start_offset().value(),
disk_log.segments()[1]->offsets().get_base_offset());
segs[2]->offsets().get_base_offset());

// Now, try to compact one last time.
// Compact again, with no limit on keys.
did_compact = do_sliding_window_compact(
log->segments().back()->offsets().get_base_offset(),
std::nullopt,
cardinality - 1)
std::nullopt)
.get();

ASSERT_TRUE(did_compact);

// Now the first two segments should be marked as clean.
ASSERT_TRUE(segs[0]->index().has_clean_compact_timestamp());
ASSERT_TRUE(segs[1]->index().has_clean_compact_timestamp());

ASSERT_FALSE(
disk_log.get_last_compaction_window_start_offset().has_value());
}
Expand Down

0 comments on commit 2ca689c

Please sign in to comment.