Skip to content

Commit

Permalink
HADOOP-19291. RawLocalFileSystem to allow overlapping ranges (#7101)
Browse files Browse the repository at this point in the history
ChecksumFileSystem creates the chunked ranges based on the checksum chunk size and then calls
readVectored on Raw Local which may lead to overlapping ranges in some cases.

Contributed by: Mukund Thakur
  • Loading branch information
mukund-thakur committed Oct 9, 2024
1 parent af0b841 commit 23f45d0
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.VectoredReadUtils.validateAndSortRanges;
import static org.apache.hadoop.fs.VectoredReadUtils.sortRangeList;
import static org.apache.hadoop.fs.VectoredReadUtils.validateRangeRequest;
import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
Expand Down Expand Up @@ -320,10 +321,10 @@ public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws IOException {

// Validate, but do not pass in a file length as it may change.
List<? extends FileRange> sortedRanges = validateAndSortRanges(ranges,
Optional.empty());
List<? extends FileRange> sortedRanges = sortRangeList(ranges);
// Set up all of the futures, so that we can use them if things fail
for(FileRange range: sortedRanges) {
validateRangeRequest(range);
range.setData(new CompletableFuture<>());
}
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,13 @@ support -and fallback everywhere else.

The restriction "no overlapping ranges" was only initially enforced in
the S3A connector, which would raise `UnsupportedOperationException`.
Adding the range check as a precondition for all implementations guarantees
consistent behavior everywhere.
Adding the range check as a precondition for all implementations (Raw Local
being an exception) guarantees consistent behavior everywhere.
The reason Raw Local doesn't have this precondition is ChecksumFileSystem
creates the chunked ranges based on the checksum chunk size and then calls
readVectored on Raw Local which may lead to overlapping ranges in some cases.
For details see [HADOOP-19291](https://issues.apache.org/jira/browse/HADOOP-19291)

For reliable use with older hadoop releases with the API: sort the list of ranges
and check for overlaps before calling `readVectored()`.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,23 +270,42 @@ public void testSomeRangesMergedSomeUnmerged() throws Exception {
}

/**
* Vectored IO doesn't support overlapping ranges.
* Most file systems won't support overlapping ranges.
* Currently, only Raw Local supports it.
*/
@Test
public void testOverlappingRanges() throws Exception {
verifyExceptionalVectoredRead(
getSampleOverlappingRanges(),
IllegalArgumentException.class);
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
verifyExceptionalVectoredRead(
getSampleOverlappingRanges(),
IllegalArgumentException.class);
} else {
try (FSDataInputStream in = openVectorFile()) {
List<FileRange> fileRanges = getSampleOverlappingRanges();
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
}

/**
* Same ranges are special case of overlapping.
*/
@Test
public void testSameRanges() throws Exception {
verifyExceptionalVectoredRead(
getSampleSameRanges(),
IllegalArgumentException.class);
if (!isSupported(VECTOR_IO_OVERLAPPING_RANGES)) {
verifyExceptionalVectoredRead(
getSampleSameRanges(),
IllegalArgumentException.class);
} else {
try (FSDataInputStream in = openVectorFile()) {
List<FileRange> fileRanges = getSampleSameRanges();
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
returnBuffersToPoolPostRead(fileRanges, pool);
}
}
}

/**
Expand Down Expand Up @@ -329,10 +348,9 @@ public void testSomeRandomNonOverlappingRanges() throws Exception {
public void testConsecutiveRanges() throws Exception {
List<FileRange> fileRanges = new ArrayList<>();
final int offset = 500;
final int length = 100;
final int length = 2011;
range(fileRanges, offset, length);
range(fileRanges, 600, 200);
range(fileRanges, 800, 100);
range(fileRanges, offset + length, length);
try (FSDataInputStream in = openVectorFile()) {
in.readVectored(fileRanges, allocate);
validateVectoredReadResult(fileRanges, DATASET, 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -261,4 +261,6 @@ public interface ContractOptions {
* Does vector read check file length on open rather than in the read call?
*/
String VECTOR_IO_EARLY_EOF_CHECK = "vector-io-early-eof-check";

String VECTOR_IO_OVERLAPPING_RANGES = "vector-io-overlapping-ranges";
}
Original file line number Diff line number Diff line change
Expand Up @@ -142,4 +142,9 @@
<value>true</value>
</property>

<property>
<name>fs.contract.vector-io-overlapping-ranges</name>
<value>true</value>
</property>

</configuration>

0 comments on commit 23f45d0

Please sign in to comment.