Skip to content

Commit

Permalink
Review comments Oct 17.
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitriy Fingerman committed Oct 17, 2024
1 parent 2110fc7 commit afced96
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ set hive.llap.io.enabled=true;
set hive.vectorized.execution.enabled=true;
set hive.optimize.shared.work.merge.ts.schema=true;

create database ice_comp with dbproperties('hive.compactor.worker.pool'='iceberg');
use ice_comp;

create table ice_orc (
first_name string,
last_name string,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,15 @@
PREHOOK: query: create database ice_comp with dbproperties('hive.compactor.worker.pool'='iceberg')
PREHOOK: type: CREATEDATABASE
PREHOOK: Output: database:ice_comp
POSTHOOK: query: create database ice_comp with dbproperties('hive.compactor.worker.pool'='iceberg')
POSTHOOK: type: CREATEDATABASE
POSTHOOK: Output: database:ice_comp
PREHOOK: query: use ice_comp
PREHOOK: type: SWITCHDATABASE
PREHOOK: Input: database:ice_comp
POSTHOOK: query: use ice_comp
POSTHOOK: type: SWITCHDATABASE
POSTHOOK: Input: database:ice_comp
PREHOOK: query: create table ice_orc (
first_name string,
last_name string,
Expand All @@ -6,8 +18,8 @@ PREHOOK: query: create table ice_orc (
stored by iceberg stored as orc
tblproperties ('format-version'='2')
PREHOOK: type: CREATETABLE
PREHOOK: Output: database:default
PREHOOK: Output: default@ice_orc
PREHOOK: Output: database:ice_comp
PREHOOK: Output: ice_comp@ice_orc
POSTHOOK: query: create table ice_orc (
first_name string,
last_name string,
Expand All @@ -16,113 +28,113 @@ POSTHOOK: query: create table ice_orc (
stored by iceberg stored as orc
tblproperties ('format-version'='2')
POSTHOOK: type: CREATETABLE
POSTHOOK: Output: database:default
POSTHOOK: Output: default@ice_orc
POSTHOOK: Output: database:ice_comp
POSTHOOK: Output: ice_comp@ice_orc
PREHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
PREHOOK: Output: ice_comp@ice_orc
POSTHOOK: query: insert into ice_orc VALUES ('fn1','ln1', 1)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
POSTHOOK: Output: ice_comp@ice_orc
PREHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
PREHOOK: Output: ice_comp@ice_orc
POSTHOOK: query: insert into ice_orc VALUES ('fn2','ln2', 1)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
POSTHOOK: Output: ice_comp@ice_orc
PREHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
PREHOOK: Output: ice_comp@ice_orc
POSTHOOK: query: insert into ice_orc VALUES ('fn3','ln3', 1)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
POSTHOOK: Output: ice_comp@ice_orc
PREHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1)
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc
PREHOOK: Output: ice_comp@ice_orc
POSTHOOK: query: insert into ice_orc VALUES ('fn4','ln4', 1)
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc
POSTHOOK: Output: ice_comp@ice_orc
PREHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4')
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
POSTHOOK: query: delete from ice_orc where last_name in ('ln3', 'ln4')
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
PREHOOK: query: alter table ice_orc set partition spec(dept_id)
PREHOOK: type: ALTERTABLE_SETPARTSPEC
PREHOOK: Input: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
POSTHOOK: query: alter table ice_orc set partition spec(dept_id)
POSTHOOK: type: ALTERTABLE_SETPARTSPEC
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
POSTHOOK: Output: ice_comp@ice_orc
PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc@dept_id=2
PREHOOK: Output: ice_comp@ice_orc@dept_id=2
POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn5','ln5')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc@dept_id=2
POSTHOOK: Output: ice_comp@ice_orc@dept_id=2
PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc@dept_id=2
PREHOOK: Output: ice_comp@ice_orc@dept_id=2
POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn6','ln6')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc@dept_id=2
POSTHOOK: Output: ice_comp@ice_orc@dept_id=2
PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc@dept_id=2
PREHOOK: Output: ice_comp@ice_orc@dept_id=2
POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn7','ln7')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc@dept_id=2
POSTHOOK: Output: ice_comp@ice_orc@dept_id=2
PREHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8')
PREHOOK: type: QUERY
PREHOOK: Input: _dummy_database@_dummy_table
PREHOOK: Output: default@ice_orc@dept_id=2
PREHOOK: Output: ice_comp@ice_orc@dept_id=2
POSTHOOK: query: insert into ice_orc PARTITION(dept_id=2) VALUES ('fn8','ln8')
POSTHOOK: type: QUERY
POSTHOOK: Input: _dummy_database@_dummy_table
POSTHOOK: Output: default@ice_orc@dept_id=2
POSTHOOK: Output: ice_comp@ice_orc@dept_id=2
PREHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8')
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
POSTHOOK: query: delete from ice_orc where last_name in ('ln7', 'ln8')
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
PREHOOK: query: select * from ice_orc
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
POSTHOOK: query: select * from ice_orc
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
fn1 ln1 1
fn2 ln2 1
fn5 ln5 2
fn6 ln6 2
PREHOOK: query: describe formatted ice_orc
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
POSTHOOK: query: describe formatted ice_orc
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
# col_name data_type comment
first_name string
last_name string
Expand All @@ -133,7 +145,7 @@ dept_id bigint
dept_id IDENTITY

# Detailed Table Information
Database: default
Database: ice_comp
#### A masked pattern was here ####
Retention: 0
#### A masked pattern was here ####
Expand Down Expand Up @@ -180,30 +192,30 @@ POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
PREHOOK: query: alter table ice_orc COMPACT 'major' and wait
PREHOOK: type: ALTERTABLE_COMPACT
PREHOOK: Input: default@ice_orc
PREHOOK: Output: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
PREHOOK: Output: ice_comp@ice_orc
POSTHOOK: query: alter table ice_orc COMPACT 'major' and wait
POSTHOOK: type: ALTERTABLE_COMPACT
POSTHOOK: Input: default@ice_orc
POSTHOOK: Output: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
POSTHOOK: Output: ice_comp@ice_orc
PREHOOK: query: select * from ice_orc
PREHOOK: type: QUERY
PREHOOK: Input: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
POSTHOOK: query: select * from ice_orc
POSTHOOK: type: QUERY
POSTHOOK: Input: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
#### A masked pattern was here ####
fn1 ln1 1
fn2 ln2 1
fn5 ln5 2
fn6 ln6 2
PREHOOK: query: describe formatted ice_orc
PREHOOK: type: DESCTABLE
PREHOOK: Input: default@ice_orc
PREHOOK: Input: ice_comp@ice_orc
POSTHOOK: query: describe formatted ice_orc
POSTHOOK: type: DESCTABLE
POSTHOOK: Input: default@ice_orc
POSTHOOK: Input: ice_comp@ice_orc
# col_name data_type comment
first_name string
last_name string
Expand All @@ -214,7 +226,7 @@ dept_id bigint
dept_id IDENTITY

# Detailed Table Information
Database: default
Database: ice_comp
#### A masked pattern was here ####
Retention: 0
#### A masked pattern was here ####
Expand Down Expand Up @@ -259,5 +271,5 @@ PREHOOK: type: SHOW COMPACTIONS
POSTHOOK: query: show compactions order by 'partition'
POSTHOOK: type: SHOW COMPACTIONS
CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId
#Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---
#Masked# ice_comp ice_orc dept_id=2 MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
#Masked# ice_comp ice_orc --- MAJOR succeeded #Masked# manual iceberg 0 0 0 ---
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ public TestIcebergLlapLocalCompactorCliDriver(String name, File qfile) {

@Test
public void testCliDriver() throws Exception {
adapter.setUp();
adapter.runTest(name, qfile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.hadoop.hive.ql.ddl.table.storage.compact;

import org.apache.commons.lang3.ObjectUtils;
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
Expand All @@ -39,6 +40,7 @@
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.MetadataCache;

import java.util.List;
import java.util.ArrayList;
Expand All @@ -54,6 +56,8 @@
*/
public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDesc> {

private static MetadataCache metadataCache = new MetadataCache(true);

public AlterTableCompactOperation(DDLOperationContext context, AlterTableCompactDesc desc) {
super(context, desc);
}
Expand Down Expand Up @@ -88,8 +92,8 @@ else if (desc.getPartitionSpec() != null) {
CompactionRequest compactionRequest = new CompactionRequest(table.getDbName(), table.getTableName(),
compactionTypeStr2ThriftType(desc.getCompactionType()));

compactionRequest.setPoolName(desc.getPoolName() != null ? desc.getPoolName() :
CompactorUtil.getCompactPoolName(context.getConf(), table));
compactionRequest.setPoolName(ObjectUtils.defaultIfNull(desc.getPoolName(),
CompactorUtil.getPoolName(context.getConf(), table.getTTable(), metadataCache)));
compactionRequest.setProperties(desc.getProperties());
compactionRequest.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
compactionRequest.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.txn.compactor;

import com.google.common.collect.Maps;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -56,7 +57,6 @@
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
import org.apache.hadoop.hive.metastore.utils.StringableMap;
import org.apache.hadoop.hive.ql.ddl.DDLUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.shims.HadoopShims;
Expand Down Expand Up @@ -567,15 +567,14 @@ public static Map<String, Integer> getPoolConf(HiveConf hiveConf) {
return poolConf;
}

public static String getCompactPoolName(HiveConf conf, org.apache.hadoop.hive.ql.metadata.Table table)
throws Exception {
String poolName;
Map<String, String> params = table.getParameters();
poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
public static String getPoolName(HiveConf conf, Table t, MetadataCache metadataCache) throws Exception {
Map<String, String> params = ObjectUtils.defaultIfNull(t.getParameters(), Collections.emptyMap());
String poolName = params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
if (StringUtils.isBlank(poolName)) {
params = CompactorUtil.resolveDatabase(conf, table.getDbName()).getParameters();
poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
}
params = ObjectUtils.defaultIfNull(metadataCache.computeIfAbsent(t.getDbName(),
() -> resolveDatabase(conf, t.getDbName())).getParameters(), Collections.emptyMap());
poolName = params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
}
return poolName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void run() {
}

Table t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> resolveTable(ci));
ci.poolName = getPoolName(ci, t);
ci.poolName = CompactorUtil.getPoolName(conf, t, metadataCache);
Partition p = resolvePartition(ci);
if (p == null && ci.partName != null) {
LOG.info("Can't find partition " + ci.getFullPartitionName() +
Expand Down Expand Up @@ -213,16 +213,6 @@ protected boolean isCacheEnabled() {
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
}

private String getPoolName(CompactionInfo ci, Table t) throws Exception {
Map<String, String> params = t.getParameters();
String poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
if (StringUtils.isBlank(poolName)) {
params = metadataCache.computeIfAbsent(ci.dbname, () -> resolveDatabase(ci)).getParameters();
poolName = params == null ? null : params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
}
return poolName;
}

private Database resolveDatabase(CompactionInfo ci) throws MetaException, NoSuchObjectException {
return CompactorUtil.resolveDatabase(conf, ci.dbname);
}
Expand Down

0 comments on commit afced96

Please sign in to comment.