Skip to content

Commit

Permalink
HIVE-28537 Iceberg: allow only partition columns in the WHERE clause
Browse files Browse the repository at this point in the history
Change-Id: Ic85efd70599413cdb96073c6cb50690fbc1c11b0
  • Loading branch information
zratkai committed Oct 7, 2024
1 parent ff95fcd commit 5143c61
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 15 deletions.
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public enum ErrorMsg {
UNEXPECTED_PARTITION_TRANSFORM_SPEC(10437, "Partition transforms are only supported by Iceberg storage handler", true),
NONICEBERG_COMPACTION_WITH_FILTER_NOT_SUPPORTED(10440, "Compaction with filter is not allowed on non-Iceberg table {0}.{1}", true),
ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true),
ALTER_TABLE_COMPACTION_NON_PARTITIONED_COLUMN_NOT_ALLOWED(10442, "Filter expression can contain only partition columns."),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc;

insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222);
insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444);
insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666);

alter table iceberg_orc_compaction COMPACT 'major' and wait where (c in ('text1', 'text2'));
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ delete from ice_orc where last_name in ('ln5', 'ln13');
select * from ice_orc;
describe formatted ice_orc;

explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15');
alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15');
explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2);
alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2);

select * from ice_orc;
describe formatted ice_orc;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
PREHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
FAILED: SemanticException [Error 10442]: Filter expression can contain only partition columns.
Original file line number Diff line number Diff line change
Expand Up @@ -226,11 +226,11 @@ InputFormat: org.apache.iceberg.mr.hive.HiveIcebergInputFormat
OutputFormat: org.apache.iceberg.mr.hive.HiveIcebergOutputFormat
Compressed: No
Sort Columns: []
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
PREHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
POSTHOOK: query: explain alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand All @@ -246,11 +246,11 @@ STAGE PLANS:
table name: default.ice_orc
blocking: true

PREHOOK: query: alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
PREHOOK: query: alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait where team_id=10 or first_name in ('fn3', 'fn11') or last_name in ('ln7', 'ln15')
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait where company_id=100 or dept_id in (1,2)
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
Expand Down Expand Up @@ -302,7 +302,7 @@ Table Parameters:
bucketing_version 2
current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]}
current-snapshot-id #Masked#
current-snapshot-summary {\"added-data-files\":\"4\",\"deleted-data-files\":\"4\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"5\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"5\",\"total-records\":\"11\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"1\",\"total-position-deletes\":\"1\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-summary {\"added-data-files\":\"4\",\"deleted-data-files\":\"4\",\"removed-position-delete-files\":\"3\",\"removed-delete-files\":\"3\",\"added-records\":\"5\",\"deleted-records\":\"8\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"removed-position-deletes\":\"3\",\"changed-partition-count\":\"5\",\"total-records\":\"10\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"8\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"}
current-snapshot-timestamp-ms #Masked#
default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"company_id\",\"transform\":\"identity\",\"source-id\":5,\"field-id\":1000},{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]}
format-version 2
Expand All @@ -314,7 +314,7 @@ Table Parameters:
#### A masked pattern was here ####
rawDataSize 0
serialization.format 1
snapshot-count 15
snapshot-count 16
storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler
table_type ICEBERG
totalSize #Masked#
Expand All @@ -338,5 +338,6 @@ POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc company_id=100/dept_id=1 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=3 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc company_id=100/dept_id=4 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.RowResolver;
Expand Down Expand Up @@ -93,8 +95,10 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
case HiveParser.TOK_WHERE:
RowResolver rwsch = new RowResolver();
Map<String, String> colTypes = new HashMap<>();
Table table;
try {
for (FieldSchema fs : getDb().getTable(tableName).getCols()) {
table = getDb().getTable(tableName);
for (FieldSchema fs : table.getCols()) {
TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(fs.getType());
rwsch.put(tableName.getTable(), fs.getName(),
new ColumnInfo(fs.getName(), columnType, null, true));
Expand All @@ -106,6 +110,9 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
TypeCheckCtx tcCtx = new TypeCheckCtx(rwsch);
ASTNode conds = (ASTNode) node.getChild(0);
filterExpr = ExprNodeTypeCheck.genExprNode(conds, tcCtx).get(conds);
if (!PartitionPruner.onlyContainsPartnCols(table, filterExpr)) {
throw new SemanticException(ErrorMsg.ALTER_TABLE_COMPACTION_NON_PARTITIONED_COLUMN_NOT_ALLOWED);
}
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,19 @@ public ParseContext transform(ParseContext pctx) throws SemanticException {
* if the table is not partitioned, the function always returns true.
* condition.
*
* @param tab
* @param table
* the table object
* @param expr
* the pruner expression for the table
*/
public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) {
if (!tab.isPartitioned() || (expr == null)) {
public static boolean onlyContainsPartnCols(Table table, ExprNodeDesc expr) {
if(!isPartitioned(table) || (expr == null)) {
return true;
}

if (expr instanceof ExprNodeColumnDesc) {
String colName = ((ExprNodeColumnDesc) expr).getColumn();
return tab.isPartitionKey(colName);
String columnName = ((ExprNodeColumnDesc) expr).getColumn();
return isPartitionKey(table, columnName);
}

// It cannot contain a non-deterministic function
Expand All @@ -130,7 +130,7 @@ public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) {
List<ExprNodeDesc> children = expr.getChildren();
if (children != null) {
for (int i = 0; i < children.size(); i++) {
if (!onlyContainsPartnCols(tab, children.get(i))) {
if (!onlyContainsPartnCols(table, children.get(i))) {
return false;
}
}
Expand All @@ -139,6 +139,24 @@ public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) {
return true;
}

public static boolean isPartitioned(Table table){
if (table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned()) {
return table.getStorageHandler().isPartitioned(table);
} else {
return table.isPartitioned();
}
}

public static boolean isPartitionKey(Table table, String columnName){
columnName = columnName.toLowerCase();
List<FieldSchema> partitionKeys = table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned() ?
table.getStorageHandler().getPartitionKeys(table) : table.getPartCols();
for (FieldSchema fieldSchema: partitionKeys) {
if (fieldSchema.getName().toLowerCase().equals(columnName))
return true;
}
return false;
}
/**
* Get the partition list for the TS operator that satisfies the partition pruner
* condition.
Expand Down

0 comments on commit 5143c61

Please sign in to comment.