Skip to content

Commit

Permalink
Core: Prototype for DVs
Browse files Browse the repository at this point in the history
  • Loading branch information
aokolnychyi committed Oct 18, 2024
1 parent 33b33f3 commit daeb847
Show file tree
Hide file tree
Showing 38 changed files with 1,382 additions and 46 deletions.
10 changes: 10 additions & 0 deletions api/src/main/java/org/apache/iceberg/ContentFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,16 @@ default Long fileSequenceNumber() {
return null;
}

/** Returns the offset in the file where the content starts. */
default Long contentOffset() {
return null;
}

/** Returns the length of referenced content stored in the file. */
default Long contentSizeInBytes() {
return null;
}

/**
* Copies this file. Manifest readers can reuse file instances; use this method to copy data when
* collecting files from tasks.
Expand Down
18 changes: 16 additions & 2 deletions api/src/main/java/org/apache/iceberg/DataFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,23 @@ public interface DataFile extends ContentFile<DataFile> {
Types.NestedField SORT_ORDER_ID =
optional(140, "sort_order_id", IntegerType.get(), "Sort order ID");
Types.NestedField SPEC_ID = optional(141, "spec_id", IntegerType.get(), "Partition spec ID");
Types.NestedField REFERENCED_DATA_FILE =
optional(143, "referenced_data_file", StringType.get(), "Referenced data file path");
Types.NestedField CONTENT_OFFSET =
optional(
144, "content_offset", LongType.get(), "The offset in the file where the content starts");
Types.NestedField CONTENT_SIZE =
optional(
145,
"content_size_in_bytes",
LongType.get(),
"The length of referenced content stored in the file");

int PARTITION_ID = 102;
String PARTITION_NAME = "partition";
String PARTITION_DOC = "Partition data tuple, schema based on the partition spec";

// NEXT ID TO ASSIGN: 142
// NEXT ID TO ASSIGN: 146

static StructType getType(StructType partitionType) {
// IDs start at 100 to leave room for changes to ManifestEntry
Expand All @@ -124,7 +135,10 @@ static StructType getType(StructType partitionType) {
KEY_METADATA,
SPLIT_OFFSETS,
EQUALITY_IDS,
SORT_ORDER_ID);
SORT_ORDER_ID,
REFERENCED_DATA_FILE,
CONTENT_OFFSET,
CONTENT_SIZE);
}

/**
Expand Down
4 changes: 4 additions & 0 deletions api/src/main/java/org/apache/iceberg/DeleteFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,4 +31,8 @@ public interface DeleteFile extends ContentFile<DeleteFile> {
default List<Long> splitOffsets() {
return null;
}

default String referencedDataFile() {
return null;
}
}
1 change: 1 addition & 0 deletions api/src/main/java/org/apache/iceberg/FileFormat.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

/** Enum of supported file formats. */
public enum FileFormat {
PUFFIN("puffin", false),
ORC("orc", true),
PARQUET("parquet", true),
AVRO("avro", true),
Expand Down
7 changes: 4 additions & 3 deletions api/src/main/java/org/apache/iceberg/util/DeleteFileSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -97,13 +97,14 @@ public boolean equals(Object o) {
}

DeleteFileWrapper that = (DeleteFileWrapper) o;
// this needs to be updated once deletion vector support is added
return Objects.equals(file.location(), that.file.location());
return Objects.equals(file.location(), that.file.location())
&& Objects.equals(file.contentOffset(), that.file.contentOffset())
&& Objects.equals(file.contentSizeInBytes(), that.file.contentSizeInBytes());
}

@Override
public int hashCode() {
return Objects.hashCode(file.location());
return Objects.hash(file.location(), file.contentOffset(), file.contentSizeInBytes());
}

@Override
Expand Down
49 changes: 48 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,9 @@ public PartitionData copy() {
private int[] equalityIds = null;
private byte[] keyMetadata = null;
private Integer sortOrderId;
private String referencedDataFile = null;
private Long contentOffset = null;
private Long contentSizeInBytes = null;

// cached schema
private transient Schema avroSchema = null;
Expand Down Expand Up @@ -108,6 +111,9 @@ public PartitionData copy() {
DataFile.SPLIT_OFFSETS,
DataFile.EQUALITY_IDS,
DataFile.SORT_ORDER_ID,
DataFile.REFERENCED_DATA_FILE,
DataFile.CONTENT_OFFSET,
DataFile.CONTENT_SIZE,
MetadataColumns.ROW_POSITION);

/** Used by Avro reflection to instantiate this class when reading manifest files. */
Expand Down Expand Up @@ -149,7 +155,10 @@ public PartitionData copy() {
List<Long> splitOffsets,
int[] equalityFieldIds,
Integer sortOrderId,
ByteBuffer keyMetadata) {
ByteBuffer keyMetadata,
String referencedDataFile,
Long contentOffset,
Long contentSizeInBytes) {
super(BASE_TYPE.fields().size());
this.partitionSpecId = specId;
this.content = content;
Expand Down Expand Up @@ -178,6 +187,9 @@ public PartitionData copy() {
this.equalityIds = equalityFieldIds;
this.sortOrderId = sortOrderId;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.referencedDataFile = referencedDataFile;
this.contentOffset = contentOffset;
this.contentSizeInBytes = contentSizeInBytes;
}

/**
Expand Down Expand Up @@ -230,6 +242,9 @@ public PartitionData copy() {
this.sortOrderId = toCopy.sortOrderId;
this.dataSequenceNumber = toCopy.dataSequenceNumber;
this.fileSequenceNumber = toCopy.fileSequenceNumber;
this.referencedDataFile = toCopy.referencedDataFile;
this.contentOffset = toCopy.contentOffset;
this.contentSizeInBytes = toCopy.contentSizeInBytes;
}

/** Constructor for Java serialization. */
Expand Down Expand Up @@ -339,6 +354,15 @@ protected <T> void internalSet(int pos, T value) {
this.sortOrderId = (Integer) value;
return;
case 17:
this.referencedDataFile = value != null ? value.toString() : null;
return;
case 18:
this.contentOffset = (Long) value;
return;
case 19:
this.contentSizeInBytes = (Long) value;
return;
case 20:
this.fileOrdinal = (long) value;
return;
default:
Expand Down Expand Up @@ -388,6 +412,12 @@ private Object getByPos(int basePos) {
case 16:
return sortOrderId;
case 17:
return referencedDataFile;
case 18:
return contentOffset;
case 19:
return contentSizeInBytes;
case 20:
return fileOrdinal;
default:
throw new UnsupportedOperationException("Unknown field ordinal: " + basePos);
Expand Down Expand Up @@ -514,6 +544,20 @@ public Integer sortOrderId() {
return sortOrderId;
}

public String referencedDataFile() {
return referencedDataFile;
}

@Override
public Long contentOffset() {
return contentOffset;
}

@Override
public Long contentSizeInBytes() {
return contentSizeInBytes;
}

private static <K, V> Map<K, V> copyMap(Map<K, V> map, Set<K> keys) {
return keys == null ? SerializableMap.copyOf(map) : SerializableMap.filteredCopyOf(map, keys);
}
Expand Down Expand Up @@ -565,6 +609,9 @@ public String toString() {
.add("sort_order_id", sortOrderId)
.add("data_sequence_number", dataSequenceNumber == null ? "null" : dataSequenceNumber)
.add("file_sequence_number", fileSequenceNumber == null ? "null" : fileSequenceNumber)
.add("referenced_data_file", referencedDataFile == null ? "null" : referencedDataFile)
.add("content_offset", contentOffset == null ? "null" : contentOffset)
.add("content_size_in_bytes", contentSizeInBytes == null ? "null" : contentSizeInBytes)
.toString();
}
}
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseFileScanTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.iceberg.expressions.ResidualEvaluator;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.util.ContentFileUtil;

public class BaseFileScanTask extends BaseContentScanTask<FileScanTask, DataFile>
implements FileScanTask {
Expand Down Expand Up @@ -79,7 +80,7 @@ private long deletesSizeBytes() {
if (deletesSizeBytes == 0L && deletes.length > 0) {
long size = 0L;
for (DeleteFile deleteFile : deletes) {
size += deleteFile.fileSizeInBytes();
size += ContentFileUtil.contentSizeInBytes(deleteFile);
}
this.deletesSizeBytes = size;
}
Expand Down Expand Up @@ -182,7 +183,7 @@ private long deletesSizeBytes() {
if (deletesSizeBytes == 0L && fileScanTask.filesCount() > 1) {
long size = 0L;
for (DeleteFile deleteFile : fileScanTask.deletes()) {
size += deleteFile.fileSizeInBytes();
size += ContentFileUtil.contentSizeInBytes(deleteFile);
}
this.deletesSizeBytes = size;
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseRowDelta.java
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ protected void validate(TableMetadata base, Snapshot parent) {
if (validateNewDeleteFiles) {
validateNoNewDeleteFiles(base, startingSnapshotId, conflictDetectionFilter, parent);
}

if (base.formatVersion() >= 3 && addsDeleteFiles()) {
validateAddedDVs(base, startingSnapshotId, conflictDetectionFilter, parent);
}
}
}
}
3 changes: 3 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
"partition",
"key_metadata",
"split_offsets",
"referenced_data_file",
"content_offset",
"content_size_in_bytes",
"equality_ids");

protected static final List<String> DELETE_SCAN_WITH_STATS_COLUMNS =
Expand Down
26 changes: 25 additions & 1 deletion core/src/main/java/org/apache/iceberg/ContentFileParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ class ContentFileParser {
private static final String SPLIT_OFFSETS = "split-offsets";
private static final String EQUALITY_IDS = "equality-ids";
private static final String SORT_ORDER_ID = "sort-order-id";
private static final String REFERENCED_DATA_FILE = "referenced-data-file";
private static final String CONTENT_OFFSET = "content-offset";
private static final String CONTENT_SIZE = "content-size-in-bytes";

private ContentFileParser() {}

Expand Down Expand Up @@ -90,6 +93,14 @@ static void toJson(ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator

generator.writeNumberField(FILE_SIZE, contentFile.fileSizeInBytes());

if (contentFile.contentOffset() != null) {
generator.writeNumberField(CONTENT_OFFSET, contentFile.contentOffset());
}

if (contentFile.contentSizeInBytes() != null) {
generator.writeNumberField(CONTENT_SIZE, contentFile.contentSizeInBytes());
}

metricsToJson(contentFile, generator);

if (contentFile.keyMetadata() != null) {
Expand All @@ -109,6 +120,13 @@ static void toJson(ContentFile<?> contentFile, PartitionSpec spec, JsonGenerator
generator.writeNumberField(SORT_ORDER_ID, contentFile.sortOrderId());
}

if (contentFile instanceof DeleteFile) {
DeleteFile deleteFile = (DeleteFile) contentFile;
if (deleteFile.referencedDataFile() != null) {
generator.writeStringField(REFERENCED_DATA_FILE, deleteFile.referencedDataFile());
}
}

generator.writeEndObject();
}

Expand Down Expand Up @@ -140,11 +158,14 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
}

long fileSizeInBytes = JsonUtil.getLong(FILE_SIZE, jsonNode);
Long contentOffset = JsonUtil.getLongOrNull(CONTENT_OFFSET, jsonNode);
Long contentSizeInBytes = JsonUtil.getLongOrNull(CONTENT_SIZE, jsonNode);
Metrics metrics = metricsFromJson(jsonNode);
ByteBuffer keyMetadata = JsonUtil.getByteBufferOrNull(KEY_METADATA, jsonNode);
List<Long> splitOffsets = JsonUtil.getLongListOrNull(SPLIT_OFFSETS, jsonNode);
int[] equalityFieldIds = JsonUtil.getIntArrayOrNull(EQUALITY_IDS, jsonNode);
Integer sortOrderId = JsonUtil.getIntOrNull(SORT_ORDER_ID, jsonNode);
String referencedDataFile = JsonUtil.getStringOrNull(REFERENCED_DATA_FILE, jsonNode);

if (fileContent == FileContent.DATA) {
return new GenericDataFile(
Expand All @@ -169,7 +190,10 @@ static ContentFile<?> fromJson(JsonNode jsonNode, PartitionSpec spec) {
equalityFieldIds,
sortOrderId,
splitOffsets,
keyMetadata);
keyMetadata,
referencedDataFile,
contentOffset,
contentSizeInBytes);
}
}

Expand Down
Loading

0 comments on commit daeb847

Please sign in to comment.