Skip to content

Commit

Permalink
HIVE-28537 Iceberg: allow only partition columns in the WHERE clause
Browse files Browse the repository at this point in the history
Change-Id: Ic85efd70599413cdb96073c6cb50690fbc1c11b0
  • Loading branch information
zratkai committed Oct 3, 2024
1 parent 799b5cf commit ad45a83
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 7 deletions.
1 change: 1 addition & 0 deletions common/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public enum ErrorMsg {
UNEXPECTED_PARTITION_TRANSFORM_SPEC(10437, "Partition transforms are only supported by Iceberg storage handler", true),
NONICEBERG_COMPACTION_WITH_FILTER_NOT_SUPPORTED(10440, "Compaction with filter is not allowed on non-Iceberg table {0}.{1}", true),
ICEBERG_COMPACTION_WITH_PART_SPEC_AND_FILTER_NOT_SUPPORTED(10441, "Compaction command with both partition spec and filter is not supported on Iceberg table {0}.{1}", true),
ALTER_TABLE_COMPCATION_NON_PARTITIONED_COLUMN_NOT_ALLOWED (10442,"Filter expression can contain only partition columns."),

//========================== 20000 range starts here ========================//

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc;

insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222);
insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444);
insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666);

alter table iceberg_orc_compaction COMPACT 'major' and wait where (c in ('text1', 'text2'));
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
PREHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: create table iceberg_orc_compaction (a int, b int, c string) partitioned by (d int) stored by iceberg stored as orc
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (1, 11, "text1", 111),(2,22,"text2",222)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (3, 33, "text3", 333),(4,44,"text4",444)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
PREHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@iceberg_orc_compaction
POSTHOOK: query: insert into iceberg_orc_compaction values (5, 55, "text5", 555),(6,66,"text6",666)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@iceberg_orc_compaction
FAILED: SemanticException [Error 10442]: Filter expression can contain only partition columns.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.hadoop.hive.ql.ddl.table.storage.compact;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.antlr.runtime.tree.Tree;
Expand All @@ -32,14 +33,22 @@
import org.apache.hadoop.hive.ql.ddl.table.AbstractAlterTableAnalyzer;
import org.apache.hadoop.hive.ql.exec.ColumnInfo;
import org.apache.hadoop.hive.ql.exec.TaskFactory;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.metadata.Hive;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.optimizer.ppr.PartitionPruner;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.RowResolver;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.parse.type.ExprNodeTypeCheck;
import org.apache.hadoop.hive.ql.parse.type.TypeCheckCtx;
import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;

Expand Down Expand Up @@ -93,8 +102,10 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
case HiveParser.TOK_WHERE:
RowResolver rwsch = new RowResolver();
Map<String, String> colTypes = new HashMap<>();
Table table;
try {
for (FieldSchema fs : getDb().getTable(tableName).getCols()) {
table = getDb().getTable(tableName);
for (FieldSchema fs : table.getCols()) {
TypeInfo columnType = TypeInfoUtils.getTypeInfoFromTypeString(fs.getType());
rwsch.put(tableName.getTable(), fs.getName(),
new ColumnInfo(fs.getName(), columnType, null, true));
Expand All @@ -106,6 +117,9 @@ protected void analyzeCommand(TableName tableName, Map<String, String> partition
TypeCheckCtx tcCtx = new TypeCheckCtx(rwsch);
ASTNode conds = (ASTNode) node.getChild(0);
filterExpr = ExprNodeTypeCheck.genExprNode(conds, tcCtx).get(conds);
if(!PartitionPruner.onlyContainsPartnCols(table, filterExpr)){
throw new SemanticException(ErrorMsg.ALTER_TABLE_COMPCATION_NON_PARTITIONED_COLUMN_NOT_ALLOWED);
}
break;
default:
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,19 @@ public ParseContext transform(ParseContext pctx) throws SemanticException {
* if the table is not partitioned, the function always returns true.
* condition.
*
* @param tab
* @param table
* the table object
* @param expr
* the pruner expression for the table
*/
public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) {
if (!tab.isPartitioned() || (expr == null)) {
public static boolean onlyContainsPartnCols(Table table, ExprNodeDesc expr) {
if(!isPartitioned(table) || (expr == null)) {
return true;
}

if (expr instanceof ExprNodeColumnDesc) {
String colName = ((ExprNodeColumnDesc) expr).getColumn();
return tab.isPartitionKey(colName);
String columnName = ((ExprNodeColumnDesc) expr).getColumn();
return isPartitionKey(table, columnName);
}

// It cannot contain a non-deterministic function
Expand All @@ -130,7 +130,7 @@ public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) {
List<ExprNodeDesc> children = expr.getChildren();
if (children != null) {
for (int i = 0; i < children.size(); i++) {
if (!onlyContainsPartnCols(tab, children.get(i))) {
if (!onlyContainsPartnCols(table, children.get(i))) {
return false;
}
}
Expand All @@ -139,6 +139,24 @@ public static boolean onlyContainsPartnCols(Table tab, ExprNodeDesc expr) {
return true;
}

public static boolean isPartitioned(Table table){
if(table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned()) {
return table.getStorageHandler().isPartitioned(table);
}else{
return table.isPartitioned();
}
}

public static boolean isPartitionKey(Table table, String columnName){
columnName = columnName.toLowerCase();
List<FieldSchema> partitionKeys = table.getStorageHandler() != null && table.getStorageHandler().alwaysUnpartitioned() ?
table.getStorageHandler().getPartitionKeys(table) : table.getPartCols();
for (FieldSchema fieldSchema: partitionKeys) {
if (fieldSchema.getName().toLowerCase().equals(columnName))
return true;
}
return false;
}
/**
* Get the partition list for the TS operator that satisfies the partition pruner
* condition.
Expand Down

0 comments on commit ad45a83

Please sign in to comment.