Skip to content

Commit

Permalink
HIVE-28495: Iceberg: Upgrade iceberg version to 1.6.1 (Butao Zhang, r…
Browse files Browse the repository at this point in the history
…eviewed by Denys Kuzmenko)

Closes #5436
  • Loading branch information
zhangbutao authored Oct 2, 2024
1 parent 799b5cf commit ff95fcd
Show file tree
Hide file tree
Showing 86 changed files with 366 additions and 630 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,9 @@ public List<TableIdentifier> listTables(Namespace namespace) {
.map(t -> TableIdentifier.of(namespace, t))
.collect(Collectors.toList());
} else {
List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(database, tableNames));
tableIdentifiers = tableObjects.stream()
.filter(table -> table.getParameters() != null && BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE
.equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());
tableIdentifiers = listIcebergTables(
tableNames, namespace, BaseMetastoreTableOperations.ICEBERG_TABLE_TYPE_VALUE);

}

LOG.debug("Listing of namespace: {} resulted in the following tables: {}", namespace, tableIdentifiers);
Expand Down Expand Up @@ -209,20 +206,42 @@ public boolean dropTable(TableIdentifier identifier, boolean purge) {

@Override
public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
renameTableOrView(from, originalTo, HiveOperationsBase.ContentType.TABLE);
}

private List<TableIdentifier> listIcebergTables(
List<String> tableNames, Namespace namespace, String tableTypeProp)
throws TException, InterruptedException {
List<Table> tableObjects = clients.run(client -> client.getTableObjectsByName(namespace.level(0), tableNames));
return tableObjects.stream()
.filter(table -> table.getParameters() != null && tableTypeProp
.equalsIgnoreCase(table.getParameters().get(BaseMetastoreTableOperations.TABLE_TYPE_PROP)))
.map(table -> TableIdentifier.of(namespace, table.getTableName()))
.collect(Collectors.toList());
}

private void renameTableOrView(
TableIdentifier from,
TableIdentifier originalTo,
HiveOperationsBase.ContentType contentType) {
if (!isValidIdentifier(from)) {
throw new NoSuchTableException("Invalid identifier: %s", from);
}

TableIdentifier to = removeCatalogName(originalTo);
Preconditions.checkArgument(isValidIdentifier(to), "Invalid identifier: %s", to);
if (!namespaceExists(to.namespace())) {
throw new NoSuchNamespaceException(
"Cannot rename %s to %s. Namespace does not exist: %s", from, to, to.namespace());
}

String toDatabase = to.namespace().level(0);
String fromDatabase = from.namespace().level(0);
String fromName = from.name();

try {
Table table = clients.run(client -> client.getTable(fromDatabase, fromName));
HiveOperationsBase.validateTableIsIceberg(table, fullTableName(name, from));
validateTableIsIcebergTableOrView(contentType, table, CatalogUtil.fullTableName(name, from));

table.setDbName(toDatabase);
table.setTableName(to.name());
Expand All @@ -232,7 +251,7 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
return null;
});

LOG.info("Renamed table from {}, to {}", from, to);
LOG.info("Renamed {} from {}, to {}", contentType.value(), from, to);

} catch (NoSuchObjectException e) {
throw new NoSuchTableException("Table does not exist: %s", from);
Expand All @@ -254,6 +273,17 @@ public void renameTable(TableIdentifier from, TableIdentifier originalTo) {
}
}

private void validateTableIsIcebergTableOrView(
HiveOperationsBase.ContentType contentType, Table table, String fullName) {
switch (contentType) {
case TABLE:
HiveOperationsBase.validateTableIsIceberg(table, fullName);
break;
case VIEW:
throw new UnsupportedOperationException("View is not supported.");
}
}

@Override
public void createNamespace(Namespace namespace, Map<String, String> meta) {
Preconditions.checkArgument(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@
import java.util.Map;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
import org.apache.hadoop.hive.metastore.api.SerDeInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.iceberg.BaseMetastoreOperations;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.Schema;
import org.apache.iceberg.SchemaParser;
import org.apache.iceberg.TableMetadata;
import org.apache.iceberg.TableProperties;
Expand Down Expand Up @@ -56,6 +59,21 @@ public interface HiveOperationsBase {

TableType tableType();

enum ContentType {
TABLE("Table"),
VIEW("View");

private final String value;

ContentType(String value) {
this.value = value;
}

public String value() {
return value;
}
}

ClientPool<IMetaStoreClient, TException> metaClients();

long maxHiveTablePropertySize();
Expand All @@ -64,6 +82,15 @@ public interface HiveOperationsBase {

String table();

default Table loadHmsTable() throws TException, InterruptedException {
try {
return metaClients().run(client -> client.getTable(database(), table()));
} catch (NoSuchObjectException nte) {
LOG.trace("Table not found {}", database() + "." + table(), nte);
return null;
}
}

default Map<String, String> hmsEnvContext(String metadataLocation) {
return metadataLocation == null ? ImmutableMap.of() :
ImmutableMap.of(
Expand All @@ -77,11 +104,11 @@ default boolean exposeInHmsProperties() {
return maxHiveTablePropertySize() > 0;
}

default void setSchema(TableMetadata metadata, Map<String, String> parameters) {
default void setSchema(Schema schema, Map<String, String> parameters) {
parameters.remove(TableProperties.CURRENT_SCHEMA);
if (exposeInHmsProperties() && metadata.schema() != null) {
String schema = SchemaParser.toJson(metadata.schema());
setField(parameters, TableProperties.CURRENT_SCHEMA, schema);
if (exposeInHmsProperties() && schema != null) {
String jsonSchema = SchemaParser.toJson(schema);
setField(parameters, TableProperties.CURRENT_SCHEMA, jsonSchema);
}
}

Expand Down Expand Up @@ -119,12 +146,23 @@ default void persistTable(Table hmsTable, boolean updateHiveTable, String metada
}
}

/**
* @deprecated since 1.6.0, will be removed in 1.7.0; Use {@link #storageDescriptor(Schema,
* String, boolean)} instead
*/
@Deprecated
static StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveEngineEnabled) {
return storageDescriptor(metadata.schema(), metadata.location(), hiveEngineEnabled);
}

static StorageDescriptor storageDescriptor(
Schema schema, String location, boolean hiveEngineEnabled) {
final StorageDescriptor storageDescriptor = new StorageDescriptor();
storageDescriptor.setCols(HiveSchemaUtil.convert(metadata.schema()));
storageDescriptor.setLocation(metadata.location());
storageDescriptor.setCols(HiveSchemaUtil.convert(schema));
storageDescriptor.setLocation(location);
SerDeInfo serDeInfo = new SerDeInfo();
serDeInfo.setParameters(Maps.newHashMap());

if (hiveEngineEnabled) {
storageDescriptor.setInputFormat("org.apache.iceberg.mr.hive.HiveIcebergInputFormat");
storageDescriptor.setOutputFormat("org.apache.iceberg.mr.hive.HiveIcebergOutputFormat");
Expand All @@ -134,6 +172,7 @@ static StorageDescriptor storageDescriptor(TableMetadata metadata, boolean hiveE
storageDescriptor.setInputFormat("org.apache.hadoop.mapred.FileInputFormat");
serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe");
}

storageDescriptor.setSerdeInfo(serDeInfo);
return storageDescriptor;
}
Expand All @@ -149,6 +188,18 @@ static void cleanupMetadata(FileIO io, String commitStatus, String metadataLocat
}
}

static void cleanupMetadataAndUnlock(
FileIO io,
BaseMetastoreOperations.CommitStatus commitStatus,
String metadataLocation,
HiveLock lock) {
try {
cleanupMetadata(io, commitStatus.name(), metadataLocation);
} finally {
lock.unlock();
}
}

default Table newHmsTable(String hmsTableOwner) {
Preconditions.checkNotNull(hmsTableOwner, "'hmsOwner' parameter can't be null");
final long currentTimeMillis = System.currentTimeMillis();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class HiveSchemaConverter {
private static final Logger LOG = LoggerFactory.getLogger(HiveSchemaConverter.class);

private int id;
private boolean autoConvert;
private final boolean autoConvert;

private HiveSchemaConverter(boolean autoConvert) {
this.autoConvert = autoConvert;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hive.iceberg.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.iceberg.BaseMetastoreOperations;
import org.apache.iceberg.BaseMetastoreTableOperations;
import org.apache.iceberg.ClientPool;
import org.apache.iceberg.PartitionSpecParser;
Expand Down Expand Up @@ -173,7 +174,8 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
boolean hiveEngineEnabled = hiveEngineEnabled(metadata, conf);
boolean keepHiveStats = conf.getBoolean(ConfigProperties.KEEP_HIVE_STATS, false);

CommitStatus commitStatus = CommitStatus.FAILURE;
BaseMetastoreOperations.CommitStatus commitStatus =
BaseMetastoreOperations.CommitStatus.FAILURE;
boolean updateHiveTable = false;

HiveLock lock = lockObject(base);
Expand All @@ -195,7 +197,10 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
LOG.debug("Committing new table: {}", fullName);
}

tbl.setSd(HiveOperationsBase.storageDescriptor(metadata, hiveEngineEnabled)); // set to pickup any schema changes
tbl.setSd(HiveOperationsBase.storageDescriptor(
metadata.schema(),
metadata.location(),
hiveEngineEnabled)); // set to pickup any schema changes

String metadataLocation = tbl.getParameters().get(METADATA_LOCATION_PROP);
String baseMetadataLocation = base != null ? base.metadataFileLocation() : null;
Expand Down Expand Up @@ -233,9 +238,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
tbl, updateHiveTable, hiveLockEnabled(base, conf) ? null : baseMetadataLocation);
lock.ensureActive();

commitStatus = CommitStatus.SUCCESS;
commitStatus = BaseMetastoreOperations.CommitStatus.SUCCESS;
} catch (LockException le) {
commitStatus = CommitStatus.UNKNOWN;
commitStatus = BaseMetastoreOperations.CommitStatus.UNKNOWN;
throw new CommitStateUnknownException(
"Failed to heartbeat for hive lock while " +
"committing changes. This can lead to a concurrent commit attempt be able to overwrite this commit. " +
Expand Down Expand Up @@ -270,7 +275,9 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {

LOG.error("Cannot tell if commit to {}.{} succeeded, attempting to reconnect and check.",
database, tableName, e);
commitStatus = checkCommitStatus(newMetadataLocation, metadata);
commitStatus =
BaseMetastoreOperations.CommitStatus.valueOf(
checkCommitStatus(newMetadataLocation, metadata).name());
switch (commitStatus) {
case SUCCESS:
break;
Expand All @@ -292,22 +299,12 @@ protected void doCommit(TableMetadata base, TableMetadata metadata) {
throw new CommitFailedException(e);

} finally {
cleanupMetadataAndUnlock(commitStatus, newMetadataLocation, lock);
HiveOperationsBase.cleanupMetadataAndUnlock(io(), commitStatus, newMetadataLocation, lock);
}

LOG.info("Committed to table {} with the new metadata location {}", fullName, newMetadataLocation);
}

@VisibleForTesting
Table loadHmsTable() throws TException, InterruptedException {
try {
return metaClients.run(client -> client.getTable(database, tableName));
} catch (NoSuchObjectException nte) {
LOG.trace("Table not found {}", fullName, nte);
return null;
}
}

private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableMetadata metadata,
Set<String> obsoleteProps, boolean hiveEngineEnabled,
Map<String, String> summary) {
Expand Down Expand Up @@ -352,7 +349,7 @@ private void setHmsTableParameters(String newMetadataLocation, Table tbl, TableM
}

setSnapshotStats(metadata, parameters);
setSchema(metadata, parameters);
setSchema(metadata.schema(), parameters);
setPartitionSpec(metadata, parameters);
setSortOrder(metadata, parameters);

Expand Down Expand Up @@ -445,15 +442,6 @@ public ClientPool<IMetaStoreClient, TException> metaClients() {
return metaClients;
}

private void cleanupMetadataAndUnlock(CommitStatus commitStatus, String metadataLocation,
HiveLock lock) {
try {
HiveOperationsBase.cleanupMetadata(io(), commitStatus.name(), metadataLocation);
} finally {
doUnlock(lock);
}
}

void doUnlock(HiveLock lock) {
if (lock != null) {
try {
Expand Down
Loading

0 comments on commit ff95fcd

Please sign in to comment.