diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index a8a3f4102c..608f853abd 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -2647,7 +2647,7 @@ private void handleStateRemoving(InetAddress endpoint, String[] pieces) logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?"); try { - drain(); + drainInternal(); } catch (Exception e) { @@ -4611,11 +4611,22 @@ public String getDrainProgress() /** * Shuts node off to writes, empties memtables and the commit log. - * There are two differences between drain and the normal shutdown hook: - * - Drain waits for in-progress streaming to complete + * There is one difference between drain and the normal shutdown hook: * - Drain flushes *all* columnfamilies (shutdown hook only flushes non-durable CFs) */ - public synchronized void drain() throws IOException, InterruptedException, ExecutionException + public void drain() throws IOException, InterruptedException, ExecutionException + { + if (daemon.setupCompleted()) + { + drainInternal(); + } + else + { + throw new IllegalStateException("Cannot drain a node that is initializing or bootstrapping"); + } + } + + private synchronized void drainInternal() throws IOException, InterruptedException, ExecutionException { Stopwatch watch = Stopwatch.createStarted(); @@ -4633,10 +4644,10 @@ public synchronized void drain() throws IOException, InterruptedException, Execu ScheduledExecutors.optionalTasks.shutdown(); Gossiper.instance.stop(); - setMode(Mode.DRAINING, "shutting down MessageService", false); + setMode(Mode.DRAINING, "shutting down MessageService", true); MessagingService.instance().shutdown(); - setMode(Mode.DRAINING, "clearing mutation stage", false); + setMode(Mode.DRAINING, "clearing mutation stage", true); counterMutationStage.shutdown(); mutationStage.shutdown(); counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS); @@ -4644,7 +4655,7 @@ public synchronized void drain() throws IOException, InterruptedException, Execu StorageProxy.instance.verifyNoHintsInProgress(); - setMode(Mode.DRAINING, "flushing column families", false); + setMode(Mode.DRAINING, "flushing column families", true); // count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty totalCFs = 0; for (Keyspace keyspace : Keyspace.nonSystem()) @@ -4666,6 +4677,7 @@ public synchronized void drain() throws IOException, InterruptedException, Execu remainingCFs--; } + setMode(Mode.DRAINING, "shutdown BatchlogManager", true); try { /* not clear this is reasonable time, but propagated from prior embedded behaviour */ @@ -4676,6 +4688,7 @@ public synchronized void drain() throws IOException, InterruptedException, Execu logger.error("Batchlog manager timed out shutting down", t); } + setMode(Mode.DRAINING, "shutdown CompactionManager", true); // Interrupt on going compaction and shutdown to prevent further compaction CompactionManager.instance.forceShutdown(); @@ -4684,6 +4697,7 @@ public synchronized void drain() throws IOException, InterruptedException, Execu // Flush system tables after stopping the batchlog manager and compactions since they both modify // system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update // system tables, see SSTableReader.GlobalTidy) + setMode(Mode.DRAINING, "flush system tables", true); flushes.clear(); for (Keyspace keyspace : Keyspace.system()) { @@ -4692,12 +4706,14 @@ public synchronized void drain() throws IOException, InterruptedException, Execu } FBUtilities.waitOnFutures(flushes); + setMode(Mode.DRAINING, "shutdown Commitlog", true); // whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure // there are no segments to replay, so we force the recycling of any remaining (should be at most one) CommitLog.instance.forceRecycleAllSegments("Drain"); CommitLog.instance.shutdownBlocking(); + setMode(Mode.DRAINING, "shutdown non-periodic tasks", true); // wait for miscellaneous tasks like sstable and commitlog segment deletion ScheduledExecutors.nonPeriodicTasks.shutdown(); if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES)) diff --git a/src/java/org/apache/cassandra/tools/SSTableExport.java b/src/java/org/apache/cassandra/tools/SSTableExport.java index 5453d207c6..d36a0a4fad 100644 --- a/src/java/org/apache/cassandra/tools/SSTableExport.java +++ b/src/java/org/apache/cassandra/tools/SSTableExport.java @@ -23,8 +23,11 @@ import java.io.IOException; import java.io.OutputStream; import java.io.PrintStream; +import java.nio.ByteBuffer; import java.util.*; +import java.util.stream.Collectors; +import com.google.common.collect.ImmutableSet; import org.apache.cassandra.io.sstable.format.SSTableReader; import org.apache.commons.cli.*; @@ -49,6 +52,7 @@ public class SSTableExport private static final ObjectMapper jsonMapper = new ObjectMapper(); private static final String KEY_OPTION = "k"; + private static final String PREFIX_OPTION = "p"; private static final String EXCLUDEKEY_OPTION = "x"; private static final String ENUMERATEKEYS_OPTION = "e"; @@ -67,6 +71,11 @@ public class SSTableExport excludeKey.setArgs(500); options.addOption(excludeKey); + Option prefixKey = new Option(PREFIX_OPTION, true, "Prefix of row key"); + // Number of times -p can be passed on the command line. + prefixKey.setArgs(1); + options.addOption(prefixKey); + Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only"); options.addOption(optEnumerate); @@ -305,6 +314,64 @@ public static void export(Descriptor desc, PrintStream outs, Collection } } + public static void export(Descriptor desc, PrintStream outs, String prefix, String[] excludes, CFMetaData metadata) throws IOException { + export(SSTableReader.open(desc), outs, prefix, excludes, metadata); + } + + public static void export(SSTableReader sstable, PrintStream outs, String prefix, String[] excludes, CFMetaData metadata) throws IOException + { + Set excludedKeys = excludes != null ? Arrays.stream(excludes).collect(Collectors.toSet()) : ImmutableSet.of(); + + try (RandomAccessReader dfile = sstable.openDataReader()) + { + IPartitioner partitioner = sstable.partitioner; + + outs.println("["); + + DecoratedKey decoratedKey = partitioner.decorateKey(metadata.getKeyValidator().fromString(prefix)); + // last key to compare order + DecoratedKey lastKey = null; + int keysCount = 0; + while (keysCount < 5000) { + if (lastKey != null && lastKey.compareTo(decoratedKey) > 0) + throw new IOException("Keys in sstable are out of order despite scanning in order! " + lastKey + " > " + decoratedKey); + + RowIndexEntry entry = sstable.getPosition(decoratedKey, keysCount == 0 ? SSTableReader.Operator.GE : SSTableReader.Operator.GT); + + if (entry == null) + break; + + lastKey = decoratedKey; + + dfile.seek(entry.position); + ByteBuffer keyBytes = ByteBufferUtil.readWithShortLength(dfile); + decoratedKey = partitioner.decorateKey(keyBytes); // row key + String keyString = metadata.getKeyValidator().getString(keyBytes); + + if (!keyString.startsWith(prefix)) { + break; + } + + if (excludedKeys.contains(keyString)) { + continue; + } + + DeletionInfo deletionInfo = new DeletionInfo(DeletionTime.serializer.deserialize(dfile)); + + Iterator atomIterator = sstable.metadata.getOnDiskIterator(dfile, sstable.descriptor.version); + checkStream(outs); + + if (keysCount != 0) + outs.println(","); + keysCount++; + serializeRow(deletionInfo, atomIterator, sstable.metadata, decoratedKey, outs); + } + + outs.println("\n]"); + outs.flush(); + } + } + // This is necessary to accommodate the test suite since you cannot open a Reader more // than once from within the same process. static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException @@ -404,6 +471,14 @@ public static void main(String[] args) throws ConfigurationException, IOExceptio String[] keys = cmd.getOptionValues(KEY_OPTION); String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION); + String prefix = cmd.getOptionValue(PREFIX_OPTION); + + if ((keys != null) && (keys.length > 0) && (prefix != null)) + { + System.err.println("Cannot specify keys and prefix at the same time"); + System.exit(1); + } + File fileOrDirectory = new File(cmd.getArgs()[0]); Schema.instance.loadFromDisk(false); @@ -425,20 +500,20 @@ public static void main(String[] args) throws ConfigurationException, IOExceptio i++; String ssTableFileName = file.getAbsolutePath(); printStream.printf("\"%s\":", file.getName()); - handleSingleSsTableFile(ssTableFileName, keys, excludes, printStream); + handleSingleSsTableFile(ssTableFileName, keys, excludes, prefix, printStream); } printStream.println("}"); } else { - handleSingleSsTableFile(fileOrDirectory.getAbsolutePath(), keys, excludes, printStream); + handleSingleSsTableFile(fileOrDirectory.getAbsolutePath(), keys, excludes, prefix, printStream); } } System.exit(0); } - private static void handleSingleSsTableFile(String ssTableFileName, String[] keys, String[] excludes, PrintStream printStream) + private static void handleSingleSsTableFile(String ssTableFileName, String[] keys, String[] excludes, String prefix, PrintStream printStream) { Descriptor descriptor = Descriptor.fromFilename(ssTableFileName); @@ -483,6 +558,8 @@ private static void handleSingleSsTableFile(String ssTableFileName, String[] key { if ((keys != null) && (keys.length > 0)) export(descriptor, printStream, Arrays.asList(keys), excludes, cfStore.metadata); + else if (prefix != null) + export(descriptor, printStream, prefix, excludes, cfStore.metadata); else export(descriptor, printStream, excludes); } diff --git a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java index 355adacb13..7e6c0af1ac 100644 --- a/test/unit/org/apache/cassandra/tools/SSTableExportTest.java +++ b/test/unit/org/apache/cassandra/tools/SSTableExportTest.java @@ -366,4 +366,51 @@ public void testAsciiKeyValidator() throws IOException, ParseException // check row key assertEquals("key", row.get("key")); } + + @Test + public void testExportPrefixFilter() throws IOException, ParseException + { + File tempSS = tempSSTableFile(KEYSPACE1, "Standard1"); + ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1"); + SSTableWriter writer = SSTableWriter.create(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE, 0); + + // Add rows + cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); + writer.append(Util.dk("apple"), cfamily); + cfamily.clear(); + + cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); + writer.append(Util.dk("bam"), cfamily); + cfamily.clear(); + + cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); + writer.append(Util.dk("ban"), cfamily); + cfamily.clear(); + + cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); + writer.append(Util.dk("banana"), cfamily); + cfamily.clear(); + + cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); + writer.append(Util.dk("bar"), cfamily); + cfamily.clear(); + + cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis()); + writer.append(Util.dk("zebra"), cfamily); + cfamily.clear(); + + SSTableReader sstable = writer.finish(true); + + // Export to JSON and verify + File tempJson = File.createTempFile("Standard1", ".json"); + SSTableExport.export(sstable, new PrintStream(tempJson.getPath()), asHex("ban"), null, cfamily.metadata()); + + JSONArray json = (JSONArray)JSONValue.parseWithException(new FileReader(tempJson)); + assertEquals("unexpected number of rows", 2, json.size()); + + JSONObject rowBan = (JSONObject)json.get(0); + assertEquals("unexpected row key",asHex("ban"),rowBan.get("key")); + JSONObject rowBanana = (JSONObject)json.get(1); + assertEquals("unexpected row key",asHex("banana"),rowBanana.get("key")); + } }