Skip to content

Commit

Permalink
HIVE-28537 Iceberg: allow only partition columns in the WHERE clause
Browse files Browse the repository at this point in the history
Change-Id: Ic85efd70599413cdb96073c6cb50690fbc1c11b0
  • Loading branch information
zratkai committed Oct 18, 2024
1 parent 13bec9b commit f7f2eb4
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 42 deletions.
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,7 @@ public enum ErrorMsg {
NONICEBERG_COMPACTION_WITH_FILTER_NOT_SUPPORTED(10440, "Compaction with filter is not allowed on non-Iceberg table {0}.{1}", true),
ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true),
COMPACTION_THREAD_INITIALIZATION(10442, "Compaction thread failed during initialization", false),
ALTER_TABLE_COMPACTION_NON_PARTITIONED_COLUMN_NOT_ALLOWED(10443, "Filter expression can contain only partition columns."),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2098,9 +2098,10 @@ public boolean canPerformMetadataDelete(org.apache.hadoop.hive.ql.metadata.Table
}

@Override
public List<FieldSchema> getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Table hmsTable) {
public List<FieldSchema> getPartitionKeys(org.apache.hadoop.hive.ql.metadata.Table hmsTable, boolean latestSpecOnly) {
Table icebergTable = IcebergTableUtil.getTable(conf, hmsTable.getTTable());
return IcebergTableUtil.getPartitionKeys(icebergTable, icebergTable.spec().specId());
return latestSpecOnly ? IcebergTableUtil.getPartitionKeys(icebergTable, true) :
IcebergTableUtil.getPartitionKeys(icebergTable, false);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -473,17 +473,22 @@ public static Expression generateExpressionFromPartitionSpec(Table table, Map<St
return finalExp;
}

public static List<FieldSchema> getPartitionKeys(Table table, int specId) {
Schema schema = table.specs().get(specId).schema();
List<FieldSchema> hiveSchema = HiveSchemaUtil.convert(schema);
Map<String, String> colNameToColType = hiveSchema.stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType));
return table.specs().get(specId).fields().stream().map(partField ->
new FieldSchema(schema.findColumnName(partField.sourceId()),
colNameToColType.get(schema.findColumnName(partField.sourceId())),
String.format("Transform: %s", partField.transform().toString()))).collect(Collectors.toList());
public static List<FieldSchema> getPartitionKeys(Table table, boolean latestSpecOnly) {
Map<Integer, Schema> schemaBySpecId = table.specs().values().stream()
.filter(spec -> !latestSpecOnly || table.spec().specId() == spec.specId())
.collect(Collectors.toMap(PartitionSpec::specId, PartitionSpec::schema));
Map<Integer, List<FieldSchema>> hiveSchemaMapBySpecId = schemaBySpecId.entrySet().stream()
.collect(Collectors.toMap(item -> item.getKey(), item -> HiveSchemaUtil.convert(item.getValue())));
Map<Integer, Map<String, String>> colNameToColTypeBySpecId = hiveSchemaMapBySpecId.entrySet().stream().collect(
Collectors.toMap(listEntry -> listEntry.getKey(), listEntry -> listEntry.getValue().stream()
.collect(Collectors.toMap(FieldSchema::getName, FieldSchema::getType))));
return table.specs().values().stream().filter(spec -> !latestSpecOnly || table.spec().specId() == spec.specId())
.flatMap(partitionSpec -> partitionSpec.fields().stream().map(partField -> {
String columnName = schemaBySpecId.get(partitionSpec.specId()).findColumnName(partField.sourceId());
return new FieldSchema(columnName, colNameToColTypeBySpecId.get(partitionSpec.specId()).get(columnName),
String.format("Transform: %s", partField.transform().toString()));
})).distinct().collect(Collectors.toList());
}

public static List<PartitionField> getPartitionFields(Table table) {
return table.specs().values().stream().flatMap(spec -> spec.fields()
.stream()).distinct().collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc;

insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222);
insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444);
insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666);

alter table iceberg_orc_compaction COMPACT 'major' and wait where c in ('text1', 'text2');
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,26 @@ delete from ice_orc where last_name in ('ln1', 'ln9');
delete from ice_orc where last_name in ('ln3', 'ln11');
delete from ice_orc where last_name in ('ln5', 'ln13');

alter table ice_orc set partition spec(team_id);
insert into ice_orc VALUES
('fn17', 'ln17', 1, 10, 100),
('fn18','ln18', 1, 10, 100);
insert into ice_orc VALUES
('fn19','ln19', 2, 11, 100),
('fn20','ln20', 2, 11, 100);
insert into ice_orc VALUES
('fn21','ln21', 3, 12, 100),
('fn22','ln22', 3, 12, 100);
insert into ice_orc VALUES
('fn23','ln23', 4, 13, 100),
('fn24','ln24', 4, 13, 100);


select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15');
alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15');
explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2);
alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2);

select * from ice_orc;
describe formatted ice_orc;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
PREHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
FAILED: SemanticException [Error 10443]: Filter expression can contain only partition columns.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,61 @@ POSTHOOK: query: delete from ice_orc where last_name in ('ln5', 'ln13')
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
PREHOOK: query: alter table ice_orc set partition spec(team_id)
PREHOOK: type: ALTERTABLE_SETPARTSPEC
PREHOOK: Input: default@ice_orc
POSTHOOK: query: alter table ice_orc set partition spec(team_id)
POSTHOOK: type: ALTERTABLE_SETPARTSPEC
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn17', 'ln17', 1, 10, 100),
('fn18','ln18', 1, 10, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn17', 'ln17', 1, 10, 100),
('fn18','ln18', 1, 10, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn19','ln19', 2, 11, 100),
('fn20','ln20', 2, 11, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn19','ln19', 2, 11, 100),
('fn20','ln20', 2, 11, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn21','ln21', 3, 12, 100),
('fn22','ln22', 3, 12, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn21','ln21', 3, 12, 100),
('fn22','ln22', 3, 12, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: insert into ice_orc VALUES
('fn23','ln23', 4, 13, 100),
('fn24','ln24', 4, 13, 100)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
POSTHOOK: query: insert into ice_orc VALUES
('fn23','ln23', 4, 13, 100),
('fn24','ln24', 4, 13, 100)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
PREHOOK: query: select * from ice_orc
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
Expand All @@ -162,7 +217,15 @@ fn12 ln12 2 11 100
fn14 ln14 3 12 100
fn15 ln15 4 13 100
fn16 ln16 4 13 100
fn17 ln17 1 10 100
fn18 ln18 1 10 100
fn19 ln19 2 11 100
fn2 ln2 1 10 100
fn20 ln20 2 11 100
fn21 ln21 3 12 100
fn22 ln22 3 12 100
fn23 ln23 4 13 100
fn24 ln24 4 13 100
fn4 ln4 2 11 100
fn6 ln6 3 12 100
fn7 ln7 4 13 100
Expand All @@ -182,8 +245,7 @@ company_id bigint

# Partition Transform Information
# col_name transform_type
company_id IDENTITY
dept_id IDENTITY
team_id IDENTITY

# Detailed Table Information
Database: default
Expand All @@ -192,24 +254,24 @@ Retention: 0
#### A masked pattern was here ####
Table Type: EXTERNAL_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"}
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\",\"COLUMN_STATS\":{\"company_id\":\"true\",\"dept_id\":\"true\",\"first_name\":\"true\",\"last_name\":\"true\",\"team_id\":\"true\"}}
EXTERNAL TRUE
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-position-delete-files\":\"2\",\"added-delete-files\":\"2\",\"added-files-size\":\"#Masked#\",\"added-position-deletes\":\"2\",\"changed-partition-count\":\"2\",\"total-records\":\"16\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"6\",\"total-position-deletes\":\"6\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"2\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"24\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"12\",\"total-delete-files\":\"6\",\"total-position-deletes\":\"6\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
default-partition-spec {\"spec-id\":2,\"fields\":[{\"name\":\"team_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1002}]}
format-version 2
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 8
numRows 10
numFiles 12
numRows 18
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 11
snapshot-count 15
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -226,11 +288,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -246,11 +308,11 @@ STAGE PLANS:
table name: default.ice_orc
blocking: true

PREHOOK: query: alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
PREHOOK: query: alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -267,7 +329,15 @@ fn12 ln12 2 11 100
fn14 ln14 3 12 100
fn15 ln15 4 13 100
fn16 ln16 4 13 100
fn17 ln17 1 10 100
fn18 ln18 1 10 100
fn19 ln19 2 11 100
fn2 ln2 1 10 100
fn20 ln20 2 11 100
fn21 ln21 3 12 100
fn22 ln22 3 12 100
fn23 ln23 4 13 100
fn24 ln24 4 13 100
fn4 ln4 2 11 100
fn6 ln6 3 12 100
fn7 ln7 4 13 100
Expand All @@ -287,8 +357,7 @@ company_id bigint

# Partition Transform Information
# col_name transform_type
company_id IDENTITY
dept_id IDENTITY
team_id IDENTITY

# Detailed Table Information
Database: default
Expand All @@ -302,19 +371,19 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-data-files\":\"4\",\"deleted-data-files\":\"4\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"5\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"5\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"1\",\"total-position-deletes\":\"1\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-summary {\"added-data-files\":\"4\",\"deleted-data-files\":\"8\",\"removed-position-delete-files\":\"6\",\"removed-delete-files\":\"6\",\"added-records\":\"10\",\"deleted-records\":\"16\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"6\",\"changed-partition-count\":\"9\",\"total-records\":\"18\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
default-partition-spec {\"spec-id\":2,\"fields\":[{\"name\":\"team_id\",\"transform\":\"identity\",\"source-id\":4,\"field-id\":1002}]}
format-version 2
iceberg.orc.files.only true
#### A masked pattern was here ####
numFiles 8
numRows 10
numRows 18
parquet.compression zstd
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 15
snapshot-count 20
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -336,7 +405,8 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=4 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=10 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=11 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=12 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc team_id=13 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
Loading

0 comments on commit f7f2eb4

Please sign in to comment.