Skip to content

Commit

Permalink
Attempt to clean up index before remote transfer
Browse files Browse the repository at this point in the history
If a node crashes during recovery, it may leave temporary files behind
that can consume disk space, which may be needed to complete recovery.
So we attempt to clean up the index before transferring files from
a recovery source. We first attempt to call the store's
`cleanupAndVerify` method, which removes anything not referenced by
the latest local commit. If that fails, because the local index isn't
in a state to produce a local commit, then we fall back to removing
temporary files by name.

Closes elastic#104473
  • Loading branch information
bcully committed Oct 18, 2024
1 parent 9770ab7 commit b27b0e7
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,17 @@
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.indices.IndicesService;
import org.elasticsearch.indices.recovery.PeerRecoveryTargetService;
import org.elasticsearch.indices.recovery.RecoveryFileChunkRequest;
import org.elasticsearch.indices.recovery.RecoveryFilesInfoRequest;
import org.elasticsearch.node.RecoverySettingsChunkSizePlugin;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.transport.TransportService;

import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand Down Expand Up @@ -72,16 +75,14 @@ public void testCancelRecoveryAndResume() throws Exception {
// we use 2 nodes a lucky and unlucky one
// the lucky one holds the primary
// the unlucky one gets the replica and the truncated leftovers
NodeStats primariesNode = dataNodeStats.get(0);
NodeStats unluckyNode = dataNodeStats.get(1);
String primariesNode = dataNodeStats.get(0).getNode().getName();
String unluckyNode = dataNodeStats.get(1).getNode().getName();

// create the index and prevent allocation on any other nodes than the lucky one
// we have no replicas so far and make sure that we allocate the primary on the lucky node
assertAcked(
prepareCreate("test").setMapping("field1", "type=text", "the_id", "type=text")
.setSettings(
indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode.getNode().getName())
)
.setSettings(indexSettings(numberOfShards(), 0).put("index.routing.allocation.include._name", primariesNode))
); // only allocate on the lucky node

// index some docs and check if they are coming back
Expand All @@ -104,18 +105,43 @@ public void testCancelRecoveryAndResume() throws Exception {

final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean truncate = new AtomicBoolean(true);
IndicesService unluckyIndices = internalCluster().getInstance(IndicesService.class, unluckyNode);
for (NodeStats dataNode : dataNodeStats) {
MockTransportService.getInstance(dataNode.getNode().getName())
.addSendBehavior(
internalCluster().getInstance(TransportService.class, unluckyNode.getNode().getName()),
internalCluster().getInstance(TransportService.class, unluckyNode),
(connection, requestId, action, request, options) -> {
if (action.equals(PeerRecoveryTargetService.Actions.FILE_CHUNK)) {
RecoveryFileChunkRequest req = (RecoveryFileChunkRequest) request;
logger.info("file chunk [{}] lastChunk: {}", req, req.lastChunk());
if ((req.name().endsWith("cfs") || req.name().endsWith("fdt")) && req.lastChunk() && truncate.get()) {
// write some garbage in the shard directory so we can verify that it is cleaned up before we resend.
// This helps prevent recovery from failing due to lack of space from garbage left over from a previous
// recovery that crashed during file transmission.
// We can't look for the presence of the recovery temp files themselves because they are automatically
// cleaned up on clean shutdown by MultiFileWriter.
var shardId = req.shardId();
var shardPath = unluckyIndices.indexService(shardId.getIndex())
.getShard(shardId.getId())
.shardPath()
.resolveIndex();
var garbagePath = Files.createFile(shardPath.resolve("recovery.garbage"));
logger.info("writing garbage at: {}", garbagePath);
latch.countDown();
throw new RuntimeException("Caused some truncated files for fun and profit");
}
} else if (action.equals(PeerRecoveryTargetService.Actions.FILES_INFO) && truncate.get() == false) {
// verify on the second recovery attempt that the garbage has been removed before we fetch files again #104473
var shardId = ((RecoveryFilesInfoRequest) request).shardId();
var shardPath = unluckyIndices.indexService(shardId.getIndex())
.getShard(shardId.getId())
.shardPath()
.resolveIndex();
logger.info("checking for garbage at {}", shardPath.resolve("recovery.garbage"));
assertFalse(
"garbage file should have been cleaned before file transmission",
Files.exists(shardPath.resolve("garbage"))
);
}
connection.sendRequest(requestId, action, request, options);
}
Expand All @@ -128,14 +154,14 @@ public void testCancelRecoveryAndResume() throws Exception {
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 1)
.put(
"index.routing.allocation.include._name", // now allow allocation on all nodes
primariesNode.getNode().getName() + "," + unluckyNode.getNode().getName()
primariesNode + "," + unluckyNode
),
"test"
);

latch.await();

// at this point we got some truncated left overs on the replica on the unlucky node
// at this point we got some truncated leftovers on the replica on the unlucky node
// now we are allowing the recovery to allocate again and finish to see if we wipe the truncated files
truncate.compareAndSet(true, false);
ensureGreen("test");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -397,6 +397,32 @@ record StartRecoveryRequestToSend(StartRecoveryRequest startRecoveryRequest, Str
}
indexShard.recoverLocallyUpToGlobalCheckpoint(ActionListener.assertOnce(l));
})
// peer recovery can consume a lot of disk space, so it's worth cleaning up locally ahead of the attempt
// operation runs only if the previous operation succeeded, and returns the previous operation's result.
// Failures at this stage aren't fatal, we can attempt to recover and then clean up again at the end. #104473
.andThenApply(startingSeqNo -> {
Store store = indexShard.store();
store.incRef();
try {
store.cleanupAndVerify("cleanup before peer recovery", indexShard.snapshotStoreMetadata());
} catch (IOException e) {
// if we can't read a snapshot, then we don't have enough intact data locally to create one.
// Make an attempt to clean up temporary files from the previous run. We could also just delete
// everything in the directory but in that case we might want to be a little more thoughtful about
// the exact exception triggered here. Corrupt indices might be recoverable without a full transfer, for
// example.
logger.info("cleanup before peer recovery failed on shard [{}]; exception: [{}]", indexShard.shardId(), e);
try {
recoveryTarget.cleanTempFiles();
} catch (IOException ie) {
// log and ignore. Recovery gets another chance to clean up after file transfer.
logger.warn("temporary file cleanup failed on shard [{}]; exception: [{}]", indexShard.shardId(), ie);
}
} finally {
store.decRef();
}
return startingSeqNo;
})
// now construct the start-recovery request
.andThenApply(startingSeqNo -> {
assert startingSeqNo == UNASSIGNED_SEQ_NO || recoveryTarget.state().getStage() == RecoveryState.Stage.TRANSLOG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.apache.lucene.index.CorruptIndexException;
import org.apache.lucene.index.IndexFormatTooNewException;
import org.apache.lucene.index.IndexFormatTooOldException;
import org.apache.lucene.index.IndexWriter;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.flush.FlushRequest;
Expand Down Expand Up @@ -624,6 +625,28 @@ public int read(byte[] b, int off, int len) throws IOException {
}
}

/**
* Clean any temporary files present in the index
* In normal operation, recovery will not leave temporary files around after either successful or failed recovery.
* But if the process crashes, then it may. In this case, we want to delete temporary files from a previous attempt
* before starting again, in order to recover disk space we may need to complete the recovery (#104473).
* <p>
* Unlike {@link #cleanFiles} this has no dependency on a snapshot, and works in cases where a snapshot isn't available,
* e.g., when the index has temporary files but no complete snapshot.
*/
void cleanTempFiles() throws IOException {
var directory = store().directory();
try (var _lock = directory.obtainLock(IndexWriter.WRITE_LOCK_NAME)) {
for (var fileName : directory.listAll()) {
if (fileName.startsWith(RECOVERY_PREFIX)) {
// remove it
logger.debug("removing stray recovery file from directory {}: {}", indexShard.shardPath().resolveIndex(), fileName);
directory.deleteFile(fileName);
}
}
}
}

private void registerThrottleTime(long throttleTimeInNanos) {
state().getIndex().addTargetThrottling(throttleTimeInNanos);
indexShard.recoveryStats().addThrottleTime(throttleTimeInNanos);
Expand Down

0 comments on commit b27b0e7

Please sign in to comment.