Skip to content

Commit

Permalink
Add prefix filter to sstable2json (#586)
Browse files Browse the repository at this point in the history
Co-authored-by: Daniel Guo <[email protected]>
  • Loading branch information
d-guo and Daniel Guo authored Nov 25, 2024
1 parent afa0513 commit 862a381
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 3 deletions.
83 changes: 80 additions & 3 deletions src/java/org/apache/cassandra/tools/SSTableExport.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,11 @@
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.stream.Collectors;

import com.google.common.collect.ImmutableSet;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.commons.cli.*;

Expand All @@ -49,6 +52,7 @@ public class SSTableExport
private static final ObjectMapper jsonMapper = new ObjectMapper();

private static final String KEY_OPTION = "k";
private static final String PREFIX_OPTION = "p";
private static final String EXCLUDEKEY_OPTION = "x";
private static final String ENUMERATEKEYS_OPTION = "e";

Expand All @@ -67,6 +71,11 @@ public class SSTableExport
excludeKey.setArgs(500);
options.addOption(excludeKey);

Option prefixKey = new Option(PREFIX_OPTION, true, "Prefix of row key");
// Number of times -p <prefix> can be passed on the command line.
prefixKey.setArgs(1);
options.addOption(prefixKey);

Option optEnumerate = new Option(ENUMERATEKEYS_OPTION, false, "enumerate keys only");
options.addOption(optEnumerate);

Expand Down Expand Up @@ -305,6 +314,64 @@ public static void export(Descriptor desc, PrintStream outs, Collection<String>
}
}

public static void export(Descriptor desc, PrintStream outs, String prefix, String[] excludes, CFMetaData metadata) throws IOException {
export(SSTableReader.open(desc), outs, prefix, excludes, metadata);
}

public static void export(SSTableReader sstable, PrintStream outs, String prefix, String[] excludes, CFMetaData metadata) throws IOException
{
Set<String> excludedKeys = excludes != null ? Arrays.stream(excludes).collect(Collectors.toSet()) : ImmutableSet.of();

try (RandomAccessReader dfile = sstable.openDataReader())
{
IPartitioner partitioner = sstable.partitioner;

outs.println("[");

DecoratedKey decoratedKey = partitioner.decorateKey(metadata.getKeyValidator().fromString(prefix));
// last key to compare order
DecoratedKey lastKey = null;
int keysCount = 0;
while (keysCount < 5000) {
if (lastKey != null && lastKey.compareTo(decoratedKey) > 0)
throw new IOException("Keys in sstable are out of order despite scanning in order! " + lastKey + " > " + decoratedKey);

RowIndexEntry entry = sstable.getPosition(decoratedKey, keysCount == 0 ? SSTableReader.Operator.GE : SSTableReader.Operator.GT);

if (entry == null)
break;

lastKey = decoratedKey;

dfile.seek(entry.position);
ByteBuffer keyBytes = ByteBufferUtil.readWithShortLength(dfile);
decoratedKey = partitioner.decorateKey(keyBytes); // row key
String keyString = metadata.getKeyValidator().getString(keyBytes);

if (!keyString.startsWith(prefix)) {
break;
}

if (excludedKeys.contains(keyString)) {
continue;
}

DeletionInfo deletionInfo = new DeletionInfo(DeletionTime.serializer.deserialize(dfile));

Iterator<OnDiskAtom> atomIterator = sstable.metadata.getOnDiskIterator(dfile, sstable.descriptor.version);
checkStream(outs);

if (keysCount != 0)
outs.println(",");
keysCount++;
serializeRow(deletionInfo, atomIterator, sstable.metadata, decoratedKey, outs);
}

outs.println("\n]");
outs.flush();
}
}

// This is necessary to accommodate the test suite since you cannot open a Reader more
// than once from within the same process.
static void export(SSTableReader reader, PrintStream outs, String[] excludes) throws IOException
Expand Down Expand Up @@ -404,6 +471,14 @@ public static void main(String[] args) throws ConfigurationException, IOExceptio

String[] keys = cmd.getOptionValues(KEY_OPTION);
String[] excludes = cmd.getOptionValues(EXCLUDEKEY_OPTION);
String prefix = cmd.getOptionValue(PREFIX_OPTION);

if ((keys != null) && (keys.length > 0) && (prefix != null))
{
System.err.println("Cannot specify keys and prefix at the same time");
System.exit(1);
}

File fileOrDirectory = new File(cmd.getArgs()[0]);

Schema.instance.loadFromDisk(false);
Expand All @@ -425,20 +500,20 @@ public static void main(String[] args) throws ConfigurationException, IOExceptio
i++;
String ssTableFileName = file.getAbsolutePath();
printStream.printf("\"%s\":", file.getName());
handleSingleSsTableFile(ssTableFileName, keys, excludes, printStream);
handleSingleSsTableFile(ssTableFileName, keys, excludes, prefix, printStream);
}
printStream.println("}");
}
else
{
handleSingleSsTableFile(fileOrDirectory.getAbsolutePath(), keys, excludes, printStream);
handleSingleSsTableFile(fileOrDirectory.getAbsolutePath(), keys, excludes, prefix, printStream);
}
}

System.exit(0);
}

private static void handleSingleSsTableFile(String ssTableFileName, String[] keys, String[] excludes, PrintStream printStream)
private static void handleSingleSsTableFile(String ssTableFileName, String[] keys, String[] excludes, String prefix, PrintStream printStream)
{
Descriptor descriptor = Descriptor.fromFilename(ssTableFileName);

Expand Down Expand Up @@ -483,6 +558,8 @@ private static void handleSingleSsTableFile(String ssTableFileName, String[] key
{
if ((keys != null) && (keys.length > 0))
export(descriptor, printStream, Arrays.asList(keys), excludes, cfStore.metadata);
else if (prefix != null)
export(descriptor, printStream, prefix, excludes, cfStore.metadata);
else
export(descriptor, printStream, excludes);
}
Expand Down
47 changes: 47 additions & 0 deletions test/unit/org/apache/cassandra/tools/SSTableExportTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -366,4 +366,51 @@ public void testAsciiKeyValidator() throws IOException, ParseException
// check row key
assertEquals("key", row.get("key"));
}

@Test
public void testExportPrefixFilter() throws IOException, ParseException
{
File tempSS = tempSSTableFile(KEYSPACE1, "Standard1");
ColumnFamily cfamily = ArrayBackedSortedColumns.factory.create(KEYSPACE1, "Standard1");
SSTableWriter writer = SSTableWriter.create(tempSS.getPath(), 2, ActiveRepairService.UNREPAIRED_SSTABLE, 0);

// Add rows
cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("apple"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("bam"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("ban"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("banana"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("bar"), cfamily);
cfamily.clear();

cfamily.addColumn(Util.cellname("col"), ByteBufferUtil.bytes("val"), System.currentTimeMillis());
writer.append(Util.dk("zebra"), cfamily);
cfamily.clear();

SSTableReader sstable = writer.finish(true);

// Export to JSON and verify
File tempJson = File.createTempFile("Standard1", ".json");
SSTableExport.export(sstable, new PrintStream(tempJson.getPath()), asHex("ban"), null, cfamily.metadata());

JSONArray json = (JSONArray)JSONValue.parseWithException(new FileReader(tempJson));
assertEquals("unexpected number of rows", 2, json.size());

JSONObject rowBan = (JSONObject)json.get(0);
assertEquals("unexpected row key",asHex("ban"),rowBan.get("key"));
JSONObject rowBanana = (JSONObject)json.get(1);
assertEquals("unexpected row key",asHex("banana"),rowBanana.get("key"));
}
}

0 comments on commit 862a381

Please sign in to comment.