From ad45a83807fe7c82390cadb98e7d51205754b7a7 Mon Sep 17 00:00:00 2001 From: Zoltan Ratkai Date: Thu, 3 Oct 2024 15:24:33 +0200 Subject: [PATCH] HIVE-28537 Iceberg: allow only partition columns in the WHERE clause Change-Id: Ic85efd70599413cdb96073c6cb50690fbc1c11b0 --- .../org/apache/hadoop/hive/ql/ErrorMsg.java | 1 + ...tion_with_non_partition_column_in_filter.q | 11 +++++++ ..._with_non_partition_column_in_filter.q.out | 33 +++++++++++++++++++ .../compact/AlterTableCompactAnalyzer.java | 16 ++++++++- .../ql/optimizer/ppr/PartitionPruner.java | 30 +++++++++++++---- 5 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 iceberg/iceberg-handler/src/test/queries/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q create mode 100644 iceberg/iceberg-handler/src/test/results/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q.out diff --git a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java index 12dfd1b8e0ec..06f6004e23bd 100644 --- a/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java +++ b/common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java @@ -491,6 +491,7 @@ public enum ErrorMsg { UNEXPECTED_PARTITION_TRANSFORM_SPEC(10437, "Partition transforms are only supported by Iceberg storage handler", true), NONICEBERG_COMPACTION_WITH_FILTER_NOT_SUPPORTED(10440, "Compaction with filter is not allowed on non-Iceberg table {0}.{1}", true), ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true), + ALTER_TABLE_COMPCATION_NON_PARTITIONED_COLUMN_NOT_ALLOWED (10442,"Filter expression can contain only partition columns."), //========================== 20000 range starts here ========================// diff --git a/iceberg/iceberg-handler/src/test/queries/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q b/iceberg/iceberg-handler/src/test/queries/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q new file mode 100644 index 000000000000..aedecff94f7c --- /dev/null +++ b/iceberg/iceberg-handler/src/test/queries/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q @@ -0,0 +1,11 @@ +set hive.llap.io.enabled=true; +set hive.vectorized.execution.enabled=true; +set hive.optimize.shared.work.merge.ts.schema=true; + +create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc; + +insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222); +insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444); +insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666); + +alter table iceberg_orc_compaction COMPACT 'major' and wait where (c in ('text1', 'text2')); diff --git a/iceberg/iceberg-handler/src/test/results/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q.out b/iceberg/iceberg-handler/src/test/results/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q.out new file mode 100644 index 000000000000..782ae665e624 --- /dev/null +++ b/iceberg/iceberg-handler/src/test/results/negative/iceberg_major_compaction_with_non_partition_column_in_filter.q.out @@ -0,0 +1,33 @@ +PREHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@iceberg_orc_compaction +POSTHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@iceberg_orc_compaction +PREHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@iceberg_orc_compaction +POSTHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@iceberg_orc_compaction +PREHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@iceberg_orc_compaction +POSTHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@iceberg_orc_compaction +PREHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@iceberg_orc_compaction +POSTHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@iceberg_orc_compaction +FAILED: SemanticException [Error 10442]: Filter expression can contain only partition columns. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java index 69e0a77a2d54..328dac65e5ca 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage.compact; import java.util.HashMap; +import java.util.List; import java.util.Map; import org.antlr.runtime.tree.Tree; @@ -32,7 +33,14 @@ import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer; import org.apache.hadoop.hive.ql.exec.ColumnInfo; import org.apache.hadoop.hive.ql.exec.TaskFactory; +import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner; import org.apache.hadoop.hive.ql.parse.ASTNode; import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.parse.RowResolver; @@ -40,6 +48,7 @@ import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck; import org.apache.hadoop.hive.ql.parse.type.TypeCheckCtx; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; +import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; @@ -93,8 +102,10 @@ protected void analyzeCommand(TableName tableName, Map partition case HiveParser.TOK_WHERE: RowResolver rwsch = new RowResolver(); Map colTypes = new HashMap<>(); + Table table; try { - for (FieldSchema fs : getDb().getTable(tableName).getCols()) { + table = getDb().getTable(tableName); + for (FieldSchema fs : table.getCols()) { TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(fs.getType()); rwsch.put(tableName.getTable(), fs.getName(), new ColumnInfo(fs.getName(), columnType, null, true)); @@ -106,6 +117,9 @@ protected void analyzeCommand(TableName tableName, Map partition TypeCheckCtx tcCtx = new TypeCheckCtx(rwsch); ASTNode conds = (ASTNode) node.getChild(0); filterExpr = ExprNodeTypeCheck.genExprNode(conds, tcCtx).get(conds); + if(!PartitionPruner.onlyContainsPartnCols(table, filterExpr)){ + throw new SemanticException(ErrorMsg.ALTER_TABLE_COMPCATION_NON_PARTITIONED_COLUMN_NOT_ALLOWED); + } break; default: break; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java index 5ccf16af75fa..11d6591f277d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/ppr/PartitionPruner.java @@ -104,19 +104,19 @@ public ParseContext transform(ParseContext pctx) throws SemanticException { * if the table is not partitioned, the function always returns true. * condition. * - * @param tab + * @param table * the table object * @param expr * the pruner expression for the table */ - public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) { - if (!tab.isPartitioned() || (expr == null)) { + public static boolean onlyContainsPartnCols(Table table, ExprNodeDesc expr) { + if(!isPartitioned(table) || (expr == null)) { return true; } if (expr instanceof ExprNodeColumnDesc) { - String colName = ((ExprNodeColumnDesc) expr).getColumn(); - return tab.isPartitionKey(colName); + String columnName = ((ExprNodeColumnDesc) expr).getColumn(); + return isPartitionKey(table, columnName); } // It cannot contain a non-deterministic function @@ -130,7 +130,7 @@ public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) { List children = expr.getChildren(); if (children != null) { for (int i = 0; i < children.size(); i++) { - if (!onlyContainsPartnCols(tab, children.get(i))) { + if (!onlyContainsPartnCols(table, children.get(i))) { return false; } } @@ -139,6 +139,24 @@ public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) { return true; } + public static boolean isPartitioned(Table table){ + if(table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned()) { + return table.getStorageHandler().isPartitioned(table); + }else{ + return table.isPartitioned(); + } + } + + public static boolean isPartitionKey(Table table, String columnName){ + columnName = columnName.toLowerCase(); + List partitionKeys = table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned() ? + table.getStorageHandler().getPartitionKeys(table) : table.getPartCols(); + for (FieldSchema fieldSchema: partitionKeys) { + if (fieldSchema.getName().toLowerCase().equals(columnName)) + return true; + } + return false; + } /** * Get the partition list for the TS operator that satisfies the partition pruner * condition.