Skip to content

Commit

Permalink
KAFKA-17713: Don't generate snapshot when published metadata is not b…
Browse files Browse the repository at this point in the history
…atch aligned (#17398)

When MetadataBatchLoader handles a BeginTransactionRecord, it will publish the metadata that has seen so far and not publish again until the transaction is ended or aborted. This means a partial record batch can be published. If a snapshot is generated during this time, the currently published metadata may not align with the end of a record batch. This causes problems with Raft replication which expects a snapshot's offset to exactly precede a record batch boundary.

This patch enhances SnapshotGenerator to refuse to generate a snapshot if the metadata is not batch aligned.

Reviewers: David Arthur <[email protected]>
  • Loading branch information
kevin-wu24 authored Oct 10, 2024
1 parent b98aee3 commit 167e2f7
Show file tree
Hide file tree
Showing 17 changed files with 171 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public class MetadataVersionConfigValidatorTest {
.numBytes(88)
.build();
public static final MetadataProvenance TEST_PROVENANCE =
new MetadataProvenance(50, 3, 8000);
new MetadataProvenance(50, 3, 8000, true);

void testWith(MetadataVersion metadataVersion, KafkaConfig config, FaultHandler faultHandler) throws Exception {
try (MetadataVersionConfigValidator validator = new MetadataVersionConfigValidator(config, faultHandler)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ void testDescribeTopicPartitionsRequestWithEdgeCases() {
void updateKraftMetadataCache(KRaftMetadataCache kRaftMetadataCache, List<ApiMessage> records) {
MetadataImage image = kRaftMetadataCache.currentImage();
MetadataImage partialImage = new MetadataImage(
new MetadataProvenance(100L, 10, 1000L),
new MetadataProvenance(100L, 10, 1000L, true),
image.features(),
ClusterImage.EMPTY,
image.topics(),
Expand All @@ -508,7 +508,7 @@ void updateKraftMetadataCache(KRaftMetadataCache kRaftMetadataCache, List<ApiMes
);
MetadataDelta delta = new MetadataDelta.Builder().setImage(partialImage).build();
records.stream().forEach(record -> delta.replay(record));
kRaftMetadataCache.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)));
kRaftMetadataCache.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true)));
}

private RequestChannel.Request buildRequest(AbstractRequest request,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ class ControllerRegistrationManagerTest {
}
}
}
val provenance = new MetadataProvenance(100, 200, 300)
val provenance = new MetadataProvenance(100, 200, 300, true)
val newImage = delta.apply(provenance)
val manifest = if (!prevImage.features().metadataVersion().equals(metadataVersion)) {
new SnapshotManifest(provenance, 1000)
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ object MetadataCacheTest {
// contains no brokers, but which contains the previous partitions.
val image = c.currentImage()
val partialImage = new MetadataImage(
new MetadataProvenance(100L, 10, 1000L),
new MetadataProvenance(100L, 10, 1000L, true),
image.features(),
ClusterImage.EMPTY,
image.topics(),
Expand Down Expand Up @@ -129,7 +129,7 @@ object MetadataCacheTest {
toRecords(topic).foreach(delta.replay)
}
records.foreach(delta.replay)
c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L)))
c.setImage(delta.apply(new MetadataProvenance(100L, 10, 1000L, true)))
}
case _ => throw new RuntimeException("Unsupported cache type")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6359,7 +6359,7 @@ class ReplicaManagerTest {
MetadataVersion.latestProduction(),
ZkMigrationState.NONE)
new MetadataImage(
new MetadataProvenance(100L, 10, 1000L),
new MetadataProvenance(100L, 10, 1000L, true),
featuresImageLatest,
ClusterImageTest.IMAGE1,
topicsImage,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,23 @@
* Information about the source of a metadata image.
*/
public final class MetadataProvenance {
public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, -1, -1L);
public static final MetadataProvenance EMPTY = new MetadataProvenance(-1L, -1, -1L, false);

private final long lastContainedOffset;
private final int lastContainedEpoch;
private final long lastContainedLogTimeMs;
private final boolean isOffsetBatchAligned;

public MetadataProvenance(
long lastContainedOffset,
int lastContainedEpoch,
long lastContainedLogTimeMs
long lastContainedLogTimeMs,
boolean isOffsetBatchAligned
) {
this.lastContainedOffset = lastContainedOffset;
this.lastContainedEpoch = lastContainedEpoch;
this.lastContainedLogTimeMs = lastContainedLogTimeMs;
this.isOffsetBatchAligned = isOffsetBatchAligned;
}

public OffsetAndEpoch snapshotId() {
Expand All @@ -59,6 +62,13 @@ public long lastContainedLogTimeMs() {
return lastContainedLogTimeMs;
}

/**
* Returns whether lastContainedOffset is the last offset in a record batch
*/
public boolean isOffsetBatchAligned() {
return isOffsetBatchAligned;
}

/**
* Returns the name that a snapshot with this provenance would have.
*/
Expand All @@ -72,14 +82,16 @@ public boolean equals(Object o) {
MetadataProvenance other = (MetadataProvenance) o;
return lastContainedOffset == other.lastContainedOffset &&
lastContainedEpoch == other.lastContainedEpoch &&
lastContainedLogTimeMs == other.lastContainedLogTimeMs;
lastContainedLogTimeMs == other.lastContainedLogTimeMs &&
isOffsetBatchAligned == other.isOffsetBatchAligned;
}

@Override
public int hashCode() {
return Objects.hash(lastContainedOffset,
lastContainedEpoch,
lastContainedLogTimeMs);
lastContainedLogTimeMs,
isOffsetBatchAligned);
}

@Override
Expand All @@ -88,6 +100,7 @@ public String toString() {
"lastContainedOffset=" + lastContainedOffset +
", lastContainedEpoch=" + lastContainedEpoch +
", lastContainedLogTimeMs=" + lastContainedLogTimeMs +
", isOffsetBatchAligned=" + isOffsetBatchAligned +
")";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAn
// 1) this is not the first record in this batch
// 2) this is not the first batch since last emitting a delta
if (transactionState == TransactionState.STARTED_TRANSACTION && (indexWithinBatch > 0 || numBatches > 0)) {
MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
// Accumulated delta is aligned on batch boundaries iff the BeginTransactionRecord is the first record in a batch
MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs, indexWithinBatch == 0);
LogDeltaManifest manifest = LogDeltaManifest.newBuilder()
.provenance(provenance)
.leaderAndEpoch(leaderAndEpoch)
Expand All @@ -152,9 +153,10 @@ public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAn
.numBytes(numBytes) // This will be zero if we have not yet read a batch
.build();
if (log.isDebugEnabled()) {
log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us.",
log.debug("handleCommit: Generated a metadata delta between {} and {} from {} batch(es) in {} us. The delta is {}.",
image.offset(), manifest.provenance().lastContainedOffset(),
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()),
provenance.isOffsetBatchAligned() ? "batch aligned" : "not batch aligned");
}
applyDeltaAndUpdate(delta, manifest);
transactionState = TransactionState.STARTED_TRANSACTION;
Expand All @@ -178,8 +180,8 @@ public long loadBatch(Batch<ApiMessageAndVersion> batch, LeaderAndEpoch leaderAn
* Flush the metadata accumulated in this batch loader if not in the middle of a transaction. The
* flushed metadata will be passed to the {@link MetadataUpdater} configured for this class.
*/
public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch) {
MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
public void maybeFlushBatches(LeaderAndEpoch leaderAndEpoch, boolean isOffsetBatchAligned) {
MetadataProvenance provenance = new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs, isOffsetBatchAligned);
LogDeltaManifest manifest = LogDeltaManifest.newBuilder()
.provenance(provenance)
.leaderAndEpoch(leaderAndEpoch)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
metrics.updateBatchSize(batch.records().size());
metrics.updateBatchProcessingTimeNs(elapsedNs);
}
batchLoader.maybeFlushBatches(currentLeaderAndEpoch);
batchLoader.maybeFlushBatches(currentLeaderAndEpoch, true);
} catch (Throwable e) {
// This is a general catch-all block where we don't expect to end up;
// failure-prone operations should have individual try/catch blocks around them.
Expand Down Expand Up @@ -434,7 +434,7 @@ SnapshotManifest loadSnapshot(
}
delta.finishSnapshot();
MetadataProvenance provenance = new MetadataProvenance(reader.lastContainedLogOffset(),
reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp());
reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp(), true);
return new SnapshotManifest(provenance,
time.nanoseconds() - startNs);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,17 @@ void publishLogDelta(
bytesSinceLastSnapshot += manifest.numBytes();
if (bytesSinceLastSnapshot >= maxBytesSinceLastSnapshot) {
if (eventQueue.isEmpty()) {
scheduleEmit("we have replayed at least " + maxBytesSinceLastSnapshot +
" bytes", newImage);
maybeScheduleEmit("we have replayed at least " + maxBytesSinceLastSnapshot +
" bytes", newImage, manifest.provenance().isOffsetBatchAligned());
} else if (log.isTraceEnabled()) {
log.trace("Not scheduling bytes-based snapshot because event queue is not empty yet.");
}
} else if (maxTimeSinceLastSnapshotNs != 0 &&
(time.nanoseconds() - lastSnapshotTimeNs >= maxTimeSinceLastSnapshotNs)) {
if (eventQueue.isEmpty()) {
scheduleEmit("we have waited at least " +
TimeUnit.NANOSECONDS.toMinutes(maxTimeSinceLastSnapshotNs) + " minute(s)", newImage);
maybeScheduleEmit("we have waited at least " +
TimeUnit.NANOSECONDS.toMinutes(maxTimeSinceLastSnapshotNs) +
" minute(s)", newImage, manifest.provenance().isOffsetBatchAligned());
} else if (log.isTraceEnabled()) {
log.trace("Not scheduling time-based snapshot because event queue is not empty yet.");
}
Expand All @@ -260,27 +261,30 @@ void publishLogDelta(
}
}

void scheduleEmit(
void maybeScheduleEmit(
String reason,
MetadataImage image
MetadataImage image,
boolean isOffsetBatchAligned
) {
resetSnapshotCounters();
eventQueue.append(() -> {
String currentDisabledReason = disabledReason.get();
if (currentDisabledReason != null) {
log.error("Not emitting {} despite the fact that {} because snapshots are " +
"disabled; {}", image.provenance().snapshotName(), reason,
currentDisabledReason);
} else {
String currentDisabledReason = disabledReason.get();
if (currentDisabledReason != null) {
log.error("Not emitting {} despite the fact that {} because snapshots are " +
"disabled; {}", image.provenance().snapshotName(), reason, currentDisabledReason);
} else if (!isOffsetBatchAligned) {
log.debug("Not emitting {} despite the fact that {} because snapshots are " +
"disabled; {}", image.provenance().snapshotName(), reason, "metadata image is not batch aligned");
} else {
eventQueue.append(() -> {
resetSnapshotCounters();
log.info("Creating new KRaft snapshot file {} because {}.",
image.provenance().snapshotName(), reason);
try {
emitter.maybeEmit(image);
} catch (Throwable e) {
faultHandler.handleFault("KRaft snapshot file generation error", e);
}
}
});
});
}
}

public void beginShutdown() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class MetadataImageTest {

static {
IMAGE1 = new MetadataImage(
new MetadataProvenance(100, 4, 2000),
new MetadataProvenance(100, 4, 2000, true),
FeaturesImageTest.IMAGE1,
ClusterImageTest.IMAGE1,
TopicsImageTest.IMAGE1,
Expand All @@ -66,7 +66,7 @@ public class MetadataImageTest {
RecordTestUtils.replayAll(DELTA1, DelegationTokenImageTest.DELTA1_RECORDS);

IMAGE2 = new MetadataImage(
new MetadataProvenance(200, 5, 4000),
new MetadataProvenance(200, 5, 4000, true),
FeaturesImageTest.IMAGE2,
ClusterImageTest.IMAGE2,
TopicsImageTest.IMAGE2,
Expand Down
Loading

0 comments on commit 167e2f7

Please sign in to comment.