Skip to content

Commit

Permalink
Merge branch 'palantir-cassandra-2.2.18' into rh/lock-schema-bootstrap
Browse files Browse the repository at this point in the history
  • Loading branch information
rhuffy committed Nov 26, 2024
2 parents 63f60fe + 862a381 commit 14a9998
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 10 deletions.
30 changes: 23 additions & 7 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2647,7 +2647,7 @@ private void handleStateRemoving(InetAddress endpoint, String[] pieces)
logger.info("Received removenode gossip about myself. Is this node rejoining after an explicit removenode?");
try
{
drain();
drainInternal();
}
catch (Exception e)
{
Expand Down Expand Up @@ -4611,11 +4611,22 @@ public String getDrainProgress()

/**
* Shuts node off to writes, empties memtables and the commit log.
* There are two differences between drain and the normal shutdown hook:
* - Drain waits for in-progress streaming to complete
* There is one difference between drain and the normal shutdown hook:
* - Drain flushes *all* columnfamilies (shutdown hook only flushes non-durable CFs)
*/
public synchronized void drain() throws IOException, InterruptedException, ExecutionException
public void drain() throws IOException, InterruptedException, ExecutionException
{
if (daemon.setupCompleted())
{
drainInternal();
}
else
{
throw new IllegalStateException("Cannot drain a node that is initializing or bootstrapping");
}
}

private synchronized void drainInternal() throws IOException, InterruptedException, ExecutionException
{
Stopwatch watch = Stopwatch.createStarted();

Expand All @@ -4633,18 +4644,18 @@ public synchronized void drain() throws IOException, InterruptedException, Execu
ScheduledExecutors.optionalTasks.shutdown();
Gossiper.instance.stop();

setMode(Mode.DRAINING, "shutting down MessageService", false);
setMode(Mode.DRAINING, "shutting down MessageService", true);
MessagingService.instance().shutdown();

setMode(Mode.DRAINING, "clearing mutation stage", false);
setMode(Mode.DRAINING, "clearing mutation stage", true);
counterMutationStage.shutdown();
mutationStage.shutdown();
counterMutationStage.awaitTermination(3600, TimeUnit.SECONDS);
mutationStage.awaitTermination(3600, TimeUnit.SECONDS);

StorageProxy.instance.verifyNoHintsInProgress();

setMode(Mode.DRAINING, "flushing column families", false);
setMode(Mode.DRAINING, "flushing column families", true);
// count CFs first, since forceFlush could block for the flushWriter to get a queue slot empty
totalCFs = 0;
for (Keyspace keyspace : Keyspace.nonSystem())
Expand All @@ -4666,6 +4677,7 @@ public synchronized void drain() throws IOException, InterruptedException, Execu
remainingCFs--;
}

setMode(Mode.DRAINING, "shutdown BatchlogManager", true);
try
{
/* not clear this is reasonable time, but propagated from prior embedded behaviour */
Expand All @@ -4676,6 +4688,7 @@ public synchronized void drain() throws IOException, InterruptedException, Execu
logger.error("Batchlog manager timed out shutting down", t);
}

setMode(Mode.DRAINING, "shutdown CompactionManager", true);
// Interrupt on going compaction and shutdown to prevent further compaction
CompactionManager.instance.forceShutdown();

Expand All @@ -4684,6 +4697,7 @@ public synchronized void drain() throws IOException, InterruptedException, Execu
// Flush system tables after stopping the batchlog manager and compactions since they both modify
// system tables (for example compactions can obsolete sstables and the tidiers in SSTableReader update
// system tables, see SSTableReader.GlobalTidy)
setMode(Mode.DRAINING, "flush system tables", true);
flushes.clear();
for (Keyspace keyspace : Keyspace.system())
{
Expand All @@ -4692,12 +4706,14 @@ public synchronized void drain() throws IOException, InterruptedException, Execu
}
FBUtilities.waitOnFutures(flushes);

setMode(Mode.DRAINING, "shutdown Commitlog", true);
// whilst we've flushed all the CFs, which will have recycled all completed segments, we want to ensure
// there are no segments to replay, so we force the recycling of any remaining (should be at most one)
CommitLog.instance.forceRecycleAllSegments("Drain");

CommitLog.instance.shutdownBlocking();

setMode(Mode.DRAINING, "shutdown non-periodic tasks", true);
// wait for miscellaneous tasks like sstable and commitlog segment deletion
ScheduledExecutors.nonPeriodicTasks.shutdown();
if (!ScheduledExecutors.nonPeriodicTasks.awaitTermination(1, MINUTES))
Expand Down
83 changes: 80 additions & 3 deletions src/java/org/apache/cassandra/tools/SSTableExport.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.cli.*;

Expand All @@ -49,6 +52,7 @@ public class SSTableExport
private static final ObjectMapper jsonMapper = new ObjectMapper();

private static final String KEY_OPTION = "k";
private static final String PREFIX_OPTION = "p";
private static final String EXCLUDEKEY_OPTION = "x";
private static final String ENUMERATEKEYS_OPTION = "e";

Expand All @@ -67,6 +71,11 @@ public class SSTableExport
excludeKey.setArgs(500);
options.addOption(excludeKey);

Option prefixKey = new Option(PREFIX_OPTION, true, "Prefix of row key");
// Number of times -p <prefix> can be passed on the command line.
prefixKey.setArgs(1);
options.addOption(prefixKey);

Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
options.addOption(optEnumerate);

Expand Down Expand Up @@ -305,6 +314,64 @@ public static void export(Descriptor desc, PrintStream outs, Collection<String>
}
}

public static void export(Descriptor desc, PrintStream outs, String prefix, String[] excludes, CFMetaData metadata) throws IOException {
export(SSTableReader.open(desc), outs, prefix, excludes, metadata);
}

public static void export(SSTableReader sstable, PrintStream outs, String prefix, String[] excludes, CFMetaData metadata) throws IOException
{
Set<String> excludedKeys = excludes != null ? Arrays.stream(excludes).collect(Collectors.toSet()) : ImmutableSet.of();

try (RandomAccessReader dfile = sstable.openDataReader())
{
IPartitioner partitioner = sstable.partitioner;

outs.println("[");

DecoratedKey decoratedKey = partitioner.decorateKey(metadata.getKeyValidator().fromString(prefix));
// last key to compare order
DecoratedKey lastKey = null;
int keysCount = 0;
while (keysCount < 5000) {
if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
throw new IOException("Keys in sstable are out of order despite scanning in order! " + lastKey + " > " + decoratedKey);

RowIndexEntry entry = sstable.getPosition(decoratedKey, keysCount == 0 ? SSTableReader.Operator.GE : SSTableReader.Operator.GT);

if (entry == null)
break;

lastKey = decoratedKey;

dfile.seek(entry.position);
ByteBuffer keyBytes = ByteBufferUtil.readWithShortLength(dfile);
decoratedKey = partitioner.decorateKey(keyBytes); // row key
String keyString = metadata.getKeyValidator().getString(keyBytes);

if (!keyString.startsWith(prefix)) {
break;
}

if (excludedKeys.contains(keyString)) {
continue;
}

DeletionInfo deletionInfo = new DeletionInfo(DeletionTime.serializer.deserialize(dfile));

Iterator<OnDiskAtom> atomIterator = sstable.metadata.getOnDiskIterator(dfile, sstable.descriptor.version);
checkStream(outs);

if (keysCount != 0)
outs.println(",");
keysCount++;
serializeRow(deletionInfo, atomIterator, sstable.metadata, decoratedKey, outs);
}

outs.println("\n]");
outs.flush();
}
}

// This is necessary to accommodate the test suite since you cannot open a Reader more
// than once from within the same process.
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
Expand Down Expand Up @@ -404,6 +471,14 @@ public static void main(String[] args) throws ConfigurationException, IOExceptio

String[] keys = cmd.getOptionValues(KEY_OPTION);
String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION);
String prefix = cmd.getOptionValue(PREFIX_OPTION);

if ((keys != null) && (keys.length > 0) && (prefix != null))
{
System.err.println("Cannot specify keys and prefix at the same time");
System.exit(1);
}

File fileOrDirectory = new File(cmd.getArgs()[0]);

Schema.instance.loadFromDisk(false);
Expand All @@ -425,20 +500,20 @@ public static void main(String[] args) throws ConfigurationException, IOExceptio
i++;
String ssTableFileName = file.getAbsolutePath();
printStream.printf("\"%s\":", file.getName());
handleSingleSsTableFile(ssTableFileName, keys, excludes, printStream);
handleSingleSsTableFile(ssTableFileName, keys, excludes, prefix, printStream);
}
printStream.println("}");
}
else
{
handleSingleSsTableFile(fileOrDirectory.getAbsolutePath(), keys, excludes, printStream);
handleSingleSsTableFile(fileOrDirectory.getAbsolutePath(), keys, excludes, prefix, printStream);
}
}

System.exit(0);
}

private static void handleSingleSsTableFile(String ssTableFileName, String[] keys, String[] excludes, PrintStream printStream)
private static void handleSingleSsTableFile(String ssTableFileName, String[] keys, String[] excludes, String prefix, PrintStream printStream)
{
Descriptor descriptor = Descriptor.fromFilename(ssTableFileName);

Expand Down Expand Up @@ -483,6 +558,8 @@ private static void handleSingleSsTableFile(String ssTableFileName, String[] key
{
if ((keys != null) && (keys.length > 0))
export(descriptor, printStream, Arrays.asList(keys), excludes, cfStore.metadata);
else if (prefix != null)
export(descriptor, printStream, prefix, excludes, cfStore.metadata);
else
export(descriptor, printStream, excludes);
}
Expand Down
47 changes: 47 additions & 0 deletions test/unit/org/apache/cassandra/tools/SSTableExportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,4 +366,51 @@ public void testAsciiKeyValidator() throws IOException, ParseException
// check row key
assertEquals("key", row.get("key"));
}

@Test
public void testExportPrefixFilter() throws IOException, ParseException
{
File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
SSTableWriter writer = SSTableWriter.create(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE, 0);

// Add rows
cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("apple"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("bam"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("ban"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("banana"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("bar"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("zebra"), cfamily);
cfamily.clear();

SSTableReader sstable = writer.finish(true);

// Export to JSON and verify
File tempJson = File.createTempFile("Standard1", ".json");
SSTableExport.export(sstable, new PrintStream(tempJson.getPath()), asHex("ban"), null, cfamily.metadata());

JSONArray json = (JSONArray)JSONValue.parseWithException(new FileReader(tempJson));
assertEquals("unexpected number of rows", 2, json.size());

JSONObject rowBan = (JSONObject)json.get(0);
assertEquals("unexpected row key",asHex("ban"),rowBan.get("key"));
JSONObject rowBanana = (JSONObject)json.get(1);
assertEquals("unexpected row key",asHex("banana"),rowBanana.get("key"));
}
}

0 comments on commit 14a9998

Please sign in to comment.