From 5146dbad59c8e4116ae3bbfe615b69c4848681ad Mon Sep 17 00:00:00 2001 From: Dmitriy Fingerman Date: Mon, 20 Jan 2025 04:17:52 -0500 Subject: [PATCH] HIVE-28600: Iceberg: Check whether the table/partition requires compaction before initiating one (Dmitriy Fingerman, reviewed by Denys Kuzmenko) Closes #5529 --- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 +- .../mr/hive/HiveIcebergOutputCommitter.java | 5 +- .../iceberg/mr/hive/IcebergTableUtil.java | 51 -- .../compaction/IcebergCompactionService.java | 10 + .../compaction/IcebergCompactionUtil.java | 97 +++ .../evaluator/HiveTableRuntime.java | 29 + .../evaluator/IcebergCompactionEvaluator.java | 129 ++++ .../amoro/CommonPartitionEvaluator.java | 421 ++++++++++++ .../amoro/CompatiblePropertyUtil.java | 80 +++ .../amoro/IcebergTableFileScanHelper.java | 54 ++ .../evaluator/amoro/OptimizingConfig.java | 204 ++++++ .../evaluator/amoro/OptimizingStatus.java | 41 ++ .../evaluator/amoro/OptimizingType.java | 35 + .../evaluator/amoro/PartitionEvaluator.java | 105 +++ .../amoro/ServerTableIdentifier.java | 140 ++++ .../evaluator/amoro/TableConfiguration.java | 39 ++ .../evaluator/amoro/TableFileScanHelper.java | 49 ++ .../evaluator/amoro/TableFormat.java | 40 ++ .../evaluator/amoro/TableIdentifier.java | 615 ++++++++++++++++++ .../evaluator/amoro/TableProperties.java | 55 ++ .../evaluator/amoro/TableRuntime.java | 56 ++ .../evaluator/amoro/TableRuntimeMeta.java | 109 ++++ ...r_compaction_partition_evolution_ordered.q | 9 +- ..._partition_evolution_w_dyn_spec_w_filter.q | 1 + ...n_partition_evolution_w_id_spec_w_filter.q | 1 + ...paction_single_partition_with_evolution2.q | 1 + ...mpaction_partition_evolution_ordered.q.out | 48 +- ...tition_evolution_w_dyn_spec_w_filter.q.out | 4 +- ...rtition_evolution_w_id_spec_w_filter.q.out | 10 +- 29 files changed, 2357 insertions(+), 84 deletions(-) create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/HiveTableRuntime.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/IcebergCompactionEvaluator.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CommonPartitionEvaluator.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CompatiblePropertyUtil.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/IcebergTableFileScanHelper.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingConfig.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingStatus.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingType.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/PartitionEvaluator.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/ServerTableIdentifier.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableConfiguration.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFileScanHelper.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFormat.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableIdentifier.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableProperties.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntime.java create mode 100644 iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntimeMeta.java diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 1a55d749fa20..afa3bc7b8701 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2240,7 +2240,8 @@ public static enum ConfVars { "If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"), HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false, "If this is set to true, then all the data files being read should be withing the table location"), - + HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE("hive.iceberg.compaction.target.file.size", "128mb", + new SizeValidator(), "Target file size for Iceberg compaction."), HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true, "If this is set the header for RCFiles will simply be RCF. If this is not\n" + "set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" + diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java index 44049a30f4fa..533037669b0e 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergOutputCommitter.java @@ -79,6 +79,7 @@ import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.mr.hive.compaction.IcebergCompactionService; +import org.apache.iceberg.mr.hive.compaction.IcebergCompactionUtil; import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter; import org.apache.iceberg.mr.hive.writer.WriterRegistry; import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting; @@ -598,8 +599,8 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s */ private void commitCompaction(Table table, Long snapshotId, long startTime, FilesForCommit results, String partitionPath) { - List existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionPath); - List existingDeleteFiles = IcebergTableUtil.getDeleteFiles(table, partitionPath); + List existingDataFiles = IcebergCompactionUtil.getDataFiles(table, partitionPath); + List existingDeleteFiles = IcebergCompactionUtil.getDeleteFiles(table, partitionPath); RewriteFiles rewriteFiles = table.newRewrite(); existingDataFiles.forEach(rewriteFiles::deleteFile); diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java index 42bafe0b9fce..8f876a727815 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/IcebergTableUtil.java @@ -49,8 +49,6 @@ import org.apache.hadoop.hive.ql.plan.PlanUtils; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.session.SessionStateUtil; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.DeleteFile; import org.apache.iceberg.DeleteFiles; import org.apache.iceberg.FileScanTask; import org.apache.iceberg.ManageSnapshots; @@ -61,9 +59,7 @@ import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Partitioning; import org.apache.iceberg.PartitionsTable; -import org.apache.iceberg.PositionDeletesScanTask; import org.apache.iceberg.RowLevelOperationMode; -import org.apache.iceberg.ScanTask; import org.apache.iceberg.Schema; import org.apache.iceberg.Snapshot; import org.apache.iceberg.SnapshotRef; @@ -80,7 +76,6 @@ import org.apache.iceberg.mr.Catalogs; import org.apache.iceberg.mr.InputFormatConfig; import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable; -import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.types.Conversions; import org.apache.iceberg.types.Type; @@ -412,52 +407,6 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy return data; } - /** - * Returns table's list of data files as following: - * 1. If the table is unpartitioned, returns all data files. - * 2. If partitionPath is not provided, returns all data files that belong to the non-latest partition spec. - * 3. If partitionPath is provided, returns all data files that belong to the corresponding partition. - * @param table the iceberg table - * @param partitionPath partition path - */ - public static List getDataFiles(Table table, String partitionPath) { - CloseableIterable fileScanTasks = - table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); - CloseableIterable filteredFileScanTasks = - CloseableIterable.filter(fileScanTasks, t -> { - DataFile file = t.asFileScanTask().file(); - return !table.spec().isPartitioned() || - partitionPath == null && file.specId() != table.spec().specId() || - partitionPath != null && - table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath); - }); - return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); - } - - /** - * Returns table's list of delete files as following: - * 1. If the table is unpartitioned, returns all delete files. - * 2. If partitionPath is not provided, returns all delete files that belong to the non-latest partition spec. - * 3. If partitionPath is provided, returns all delete files that belong to corresponding partition. - * @param table the iceberg table - * @param partitionPath partition path - */ - public static List getDeleteFiles(Table table, String partitionPath) { - Table deletesTable = - MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); - CloseableIterable deletesScanTasks = deletesTable.newBatchScan().planFiles(); - CloseableIterable filteredDeletesScanTasks = - CloseableIterable.filter(deletesScanTasks, t -> { - DeleteFile file = ((PositionDeletesScanTask) t).file(); - return !table.spec().isPartitioned() || - partitionPath == null && file.specId() != table.spec().specId() || - partitionPath != null && - table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath); - }); - return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, - t -> ((PositionDeletesScanTask) t).file())); - } - public static Expression generateExpressionFromPartitionSpec(Table table, Map partitionSpec) throws SemanticException { Map partitionFieldMap = getPartitionFields(table).stream() diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java index 455330eb5f18..bda63ef94edd 100644 --- a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionService.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline; import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil; import org.apache.hadoop.hive.ql.txn.compactor.service.CompactionService; +import org.apache.iceberg.mr.hive.IcebergTableUtil; +import org.apache.iceberg.mr.hive.compaction.evaluator.IcebergCompactionEvaluator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -50,6 +52,14 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception { } CompactorUtil.checkInterrupt(CLASS_NAME); + org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, table); + if (!IcebergCompactionEvaluator.isEligibleForCompaction(icebergTable, ci.partName, ci.type, conf)) { + LOG.info("Table={}{} doesn't meet requirements for compaction", table.getTableName(), + ci.partName == null ? "" : ", partition=" + ci.partName); + msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)); + return false; + } + if (ci.runAs == null) { ci.runAs = TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf); } diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java new file mode 100644 index 000000000000..4d8973b5d96a --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/IcebergCompactionUtil.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction; + +import java.util.List; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.MetadataTableType; +import org.apache.iceberg.MetadataTableUtils; +import org.apache.iceberg.PositionDeletesScanTask; +import org.apache.iceberg.ScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class IcebergCompactionUtil { + + private IcebergCompactionUtil() { + + } + + /** + * This method implements a common filter that is used in several places in Iceberg compaction code. + * Its aim is to determine if the provided file needs to be handled when compacting a partition whose path is equal to + * the provided partitionPath. Returns true when one of the following conditions is true, otherwise returns false: + * 1. table is unpartitioned + * 2. partitionPath is null and the file belongs to the non-latest partition spec + * 3. partitionPath is not null and the file belongs to the partition whose path is the partitionPath + * @param table the iceberg table + * @param partitionPath partition path + * @param file Data or Delete file + */ + public static boolean shouldIncludeForCompaction(Table table, String partitionPath, ContentFile file) { + return !table.spec().isPartitioned() || + partitionPath == null && file.specId() != table.spec().specId() || + partitionPath != null && + table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath); + } + + /** + * Returns table's list of data files as following: + * 1. If the table is unpartitioned, returns all data files. + * 2. If partitionPath is not provided, returns all data files that belong to the non-latest partition spec. + * 3. If partitionPath is provided, returns all data files that belong to the corresponding partition. + * @param table the iceberg table + * @param partitionPath partition path + */ + public static List getDataFiles(Table table, String partitionPath) { + CloseableIterable fileScanTasks = + table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles(); + CloseableIterable filteredFileScanTasks = + CloseableIterable.filter(fileScanTasks, t -> { + DataFile file = t.asFileScanTask().file(); + return shouldIncludeForCompaction(table, partitionPath, file); + }); + return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file())); + } + + /** + * Returns table's list of delete files as following: + * 1. If the table is unpartitioned, returns all delete files. + * 2. If partitionPath is not provided, returns all delete files that belong to the non-latest partition spec. + * 3. If partitionPath is provided, returns all delete files that belong to corresponding partition. + * @param table the iceberg table + * @param partitionPath partition path + */ + public static List getDeleteFiles(Table table, String partitionPath) { + Table deletesTable = + MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES); + CloseableIterable deletesScanTasks = deletesTable.newBatchScan().planFiles(); + CloseableIterable filteredDeletesScanTasks = + CloseableIterable.filter(deletesScanTasks, t -> { + DeleteFile file = ((PositionDeletesScanTask) t).file(); + return shouldIncludeForCompaction(table, partitionPath, file); + }); + return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks, + t -> ((PositionDeletesScanTask) t).file())); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/HiveTableRuntime.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/HiveTableRuntime.java new file mode 100644 index 000000000000..24eab3e51668 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/HiveTableRuntime.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator; + +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntime; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntimeMeta; + +public class HiveTableRuntime extends TableRuntime { + + public HiveTableRuntime(TableRuntimeMeta tableRuntimeMeta) { + super(tableRuntimeMeta); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/IcebergCompactionEvaluator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/IcebergCompactionEvaluator.java new file mode 100644 index 000000000000..7af1f4f6e355 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/IcebergCompactionEvaluator.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.Collections; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.CompactionType; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.mr.hive.compaction.IcebergCompactionUtil; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.CommonPartitionEvaluator; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.IcebergTableFileScanHelper; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.OptimizingConfig; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableConfiguration; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableFileScanHelper; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableFormat; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntime; +import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntimeMeta; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class IcebergCompactionEvaluator { + + private static final long LAST_OPTIMIZE_TIME = 0; + private static final int TRIGGER_INTERVAL = 0; + + private IcebergCompactionEvaluator() { + + } + + private static final Logger LOG = LoggerFactory.getLogger(IcebergCompactionEvaluator.class); + + public static boolean isEligibleForCompaction(Table icebergTable, String partitionPath, + CompactionType compactionType, HiveConf conf) { + + if (icebergTable.currentSnapshot() == null) { + LOG.info("Table {}{} doesn't require compaction because it is empty", icebergTable, + partitionPath == null ? "" : " partition " + partitionPath); + return false; + } + + CommonPartitionEvaluator partitionEvaluator = createCommonPartitionEvaluator(icebergTable, partitionPath, conf); + + if (partitionEvaluator == null) { + return false; + } + + switch (compactionType) { + case MINOR: + return partitionEvaluator.isMinorNecessary(); + case MAJOR: + return partitionEvaluator.isFullNecessary() || partitionEvaluator.isMajorNecessary(); + default: + return false; + } + } + + private static TableRuntime createTableRuntime(Table icebergTable, HiveConf conf) { + long targetFileSizeBytes = HiveConf.getSizeVar(conf, + HiveConf.ConfVars.HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE); + + OptimizingConfig optimizingConfig = OptimizingConfig.parse(Collections.emptyMap()); + optimizingConfig.setTargetSize(targetFileSizeBytes); + optimizingConfig.setFullTriggerInterval(TRIGGER_INTERVAL); + optimizingConfig.setMinorLeastInterval(TRIGGER_INTERVAL); + + TableConfiguration tableConfig = new TableConfiguration(); + tableConfig.setOptimizingConfig(optimizingConfig); + + TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta(); + tableRuntimeMeta.setTableName(icebergTable.name()); + tableRuntimeMeta.setFormat(TableFormat.ICEBERG); + tableRuntimeMeta.setLastFullOptimizingTime(LAST_OPTIMIZE_TIME); + tableRuntimeMeta.setLastMinorOptimizingTime(LAST_OPTIMIZE_TIME); + tableRuntimeMeta.setTableConfig(tableConfig); + + return new HiveTableRuntime(tableRuntimeMeta); + } + + private static CommonPartitionEvaluator createCommonPartitionEvaluator(Table table, String partitionPath, + HiveConf conf) { + TableRuntime tableRuntime = createTableRuntime(table, conf); + + TableFileScanHelper tableFileScanHelper = new IcebergTableFileScanHelper(table, + table.currentSnapshot().snapshotId()); + CommonPartitionEvaluator evaluator = null; + try (CloseableIterable results = + tableFileScanHelper.scan()) { + for (TableFileScanHelper.FileScanResult fileScanResult : results) { + DataFile file = fileScanResult.file(); + if (IcebergCompactionUtil.shouldIncludeForCompaction(table, partitionPath, file)) { + PartitionSpec partitionSpec = table.specs().get(file.specId()); + Pair partition = Pair.of(partitionSpec.specId(), fileScanResult.file().partition()); + + if (evaluator == null) { + evaluator = new CommonPartitionEvaluator(tableRuntime, partition, System.currentTimeMillis()); + } + + evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles()); + } + } + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return evaluator; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CommonPartitionEvaluator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CommonPartitionEvaluator.java new file mode 100644 index 000000000000..16cd9ea2f599 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CommonPartitionEvaluator.java @@ -0,0 +1,421 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import java.util.List; +import java.util.Set; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.FileContent; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class CommonPartitionEvaluator implements PartitionEvaluator { + private static final Logger LOG = LoggerFactory.getLogger(CommonPartitionEvaluator.class); + + private final Set deleteFileSet = Sets.newHashSet(); + protected final TableRuntime tableRuntime; + + private final Pair partition; + protected final OptimizingConfig config; + protected final long fragmentSize; + protected final long minTargetSize; + protected final long planTime; + + private final boolean reachFullInterval; + + // fragment files + protected int fragmentFileCount = 0; + protected long fragmentFileSize = 0; + + // segment files + protected int rewriteSegmentFileCount = 0; + protected long rewriteSegmentFileSize = 0L; + protected int undersizedSegmentFileCount = 0; + protected long undersizedSegmentFileSize = 0; + protected int rewritePosSegmentFileCount = 0; + protected int combinePosSegmentFileCount = 0; + protected long rewritePosSegmentFileSize = 0L; + protected long min1SegmentFileSize = Integer.MAX_VALUE; + protected long min2SegmentFileSize = Integer.MAX_VALUE; + + // delete files + protected int equalityDeleteFileCount = 0; + protected long equalityDeleteFileSize = 0L; + protected int posDeleteFileCount = 0; + protected long posDeleteFileSize = 0L; + + private long cost = -1; + private Boolean necessary = null; + private OptimizingType optimizingType = null; + private String name; + + public CommonPartitionEvaluator( + TableRuntime tableRuntime, Pair partition, long planTime) { + this.partition = partition; + this.tableRuntime = tableRuntime; + this.config = tableRuntime.getOptimizingConfig(); + this.fragmentSize = config.getTargetSize() / config.getFragmentRatio(); + this.minTargetSize = (long) (config.getTargetSize() * config.getMinTargetSizeRatio()); + if (minTargetSize > config.getTargetSize() - fragmentSize) { + LOG.warn( + "The self-optimizing.min-target-size-ratio is set too large, some segment files will not be able to find " + + "the another merge file."); + } + this.planTime = planTime; + this.reachFullInterval = + config.getFullTriggerInterval() >= 0 && planTime - tableRuntime.getLastFullOptimizingTime() > + config.getFullTriggerInterval(); + } + + @Override + public Pair getPartition() { + return partition; + } + + protected boolean isFragmentFile(DataFile dataFile) { + return dataFile.fileSizeInBytes() <= fragmentSize; + } + + protected boolean isUndersizedSegmentFile(DataFile dataFile) { + return dataFile.fileSizeInBytes() > fragmentSize && dataFile.fileSizeInBytes() <= minTargetSize; + } + + @Override + public boolean addFile(DataFile dataFile, List> deletes) { + if (!config.isEnabled()) { + return false; + } + if (isFragmentFile(dataFile)) { + return addFragmentFile(dataFile, deletes); + } else if (isUndersizedSegmentFile(dataFile)) { + return addUndersizedSegmentFile(dataFile, deletes); + } else { + return addTargetSizeReachedFile(dataFile, deletes); + } + } + + private boolean isDuplicateDelete(ContentFile delete) { + boolean deleteExist = deleteFileSet.contains(delete.path().toString()); + if (!deleteExist) { + deleteFileSet.add(delete.path().toString()); + } + return deleteExist; + } + + private boolean addFragmentFile(DataFile dataFile, List> deletes) { + fragmentFileSize += dataFile.fileSizeInBytes(); + fragmentFileCount++; + + for (ContentFile delete : deletes) { + addDelete(delete); + } + return true; + } + + private boolean addUndersizedSegmentFile(DataFile dataFile, List> deletes) { + // Because UndersizedSegment can determine whether it is rewritten during the split task stage. + // So the calculated posDeleteFileCount, posDeleteFileSize, equalityDeleteFileCount, + // equalityDeleteFileSize are not accurate + for (ContentFile delete : deletes) { + addDelete(delete); + } + if (fileShouldRewrite(dataFile, deletes)) { + rewriteSegmentFileSize += dataFile.fileSizeInBytes(); + rewriteSegmentFileCount++; + return true; + } + + // Cache the size of the smallest two files + if (dataFile.fileSizeInBytes() < min1SegmentFileSize) { + min2SegmentFileSize = min1SegmentFileSize; + min1SegmentFileSize = dataFile.fileSizeInBytes(); + } else if (dataFile.fileSizeInBytes() < min2SegmentFileSize) { + min2SegmentFileSize = dataFile.fileSizeInBytes(); + } + + undersizedSegmentFileSize += dataFile.fileSizeInBytes(); + undersizedSegmentFileCount++; + return true; + } + + private boolean addTargetSizeReachedFile(DataFile dataFile, List> deletes) { + if (fileShouldRewrite(dataFile, deletes)) { + rewriteSegmentFileSize += dataFile.fileSizeInBytes(); + rewriteSegmentFileCount++; + for (ContentFile delete : deletes) { + addDelete(delete); + } + return true; + } + + if (segmentShouldRewritePos(dataFile, deletes)) { + rewritePosSegmentFileSize += dataFile.fileSizeInBytes(); + rewritePosSegmentFileCount++; + for (ContentFile delete : deletes) { + addDelete(delete); + } + return true; + } + + return false; + } + + protected boolean fileShouldFullOptimizing(DataFile dataFile, List> deleteFiles) { + if (config.isFullRewriteAllFiles()) { + return true; + } + // If a file is related any delete files or is not big enough, it should full optimizing + return !deleteFiles.isEmpty() || isFragmentFile(dataFile) || isUndersizedSegmentFile(dataFile); + } + + public boolean fileShouldRewrite(DataFile dataFile, List> deletes) { + if (isFullOptimizing()) { + return fileShouldFullOptimizing(dataFile, deletes); + } + if (isFragmentFile(dataFile)) { + return true; + } + // When Upsert writing is enabled in the Flink engine, both INSERT and UPDATE_AFTER will + // generate deletes files (Most are eq-delete), and eq-delete file will be associated + // with the data file before the current snapshot. + // The eq-delete does not accurately reflect how much data has been deleted in the current + // segment file (That is, whether the segment file needs to be rewritten). + // And the eq-delete file will be converted to pos-delete during minor optimizing, so only + // pos-delete record count is calculated here. + return getPosDeletesRecordCount(deletes) > dataFile.recordCount() * config.getMajorDuplicateRatio(); + } + + public boolean segmentShouldRewritePos(DataFile dataFile, List> deletes) { + Preconditions.checkArgument(!isFragmentFile(dataFile), "Unsupported fragment file."); + if (deletes.stream().filter(delete -> delete.content() == FileContent.POSITION_DELETES).count() >= 2) { + combinePosSegmentFileCount++; + return true; + } + return deletes.stream().anyMatch(delete -> delete.content() == FileContent.EQUALITY_DELETES); + } + + protected boolean isFullOptimizing() { + return reachFullInterval(); + } + + private long getPosDeletesRecordCount(List> files) { + return files.stream() + .filter(file -> file.content() == FileContent.POSITION_DELETES) + .mapToLong(ContentFile::recordCount) + .sum(); + } + + private void addDelete(ContentFile delete) { + if (isDuplicateDelete(delete)) { + return; + } + if (delete.content() == FileContent.POSITION_DELETES) { + posDeleteFileCount++; + posDeleteFileSize += delete.fileSizeInBytes(); + } else { + equalityDeleteFileCount++; + equalityDeleteFileSize += delete.fileSizeInBytes(); + } + } + + @Override + public boolean isNecessary() { + if (necessary == null) { + if (isFullOptimizing()) { + necessary = isFullNecessary(); + } else { + necessary = isMajorNecessary() || isMinorNecessary(); + } + LOG.debug("{} necessary = {}, {}", name(), necessary, this); + } + return necessary; + } + + @Override + public long getCost() { + if (cost < 0) { + // We estimate that the cost of writing is 3 times that of reading. + // When rewriting the Position delete file, only the primary key field of the segment file + // will be read, so only one-tenth of the size is calculated based on the size. + cost = + (fragmentFileSize + rewriteSegmentFileSize + undersizedSegmentFileSize) * 4 + + rewritePosSegmentFileSize / 10 + posDeleteFileSize + equalityDeleteFileSize; + int fileCnt = + fragmentFileCount + rewriteSegmentFileCount + undersizedSegmentFileCount + rewritePosSegmentFileCount + + posDeleteFileCount + equalityDeleteFileCount; + cost += fileCnt * config.getOpenFileCost(); + } + return cost; + } + + @Override + public PartitionEvaluator.Weight getWeight() { + return new Weight(getCost()); + } + + @Override + public OptimizingType getOptimizingType() { + if (optimizingType == null) { + optimizingType = isFullNecessary() ? OptimizingType.FULL : isMajorNecessary() ? OptimizingType.MAJOR : + OptimizingType.MINOR; + LOG.debug("{} optimizingType = {} ", name(), optimizingType); + } + return optimizingType; + } + + /** + * Segment files has enough content. + * + *

1. The total size of all undersized segment files is greater than target size + * + *

2. There are two undersized segment file that can be merged into one + */ + public boolean enoughContent() { + return undersizedSegmentFileSize >= config.getTargetSize() && + min1SegmentFileSize + min2SegmentFileSize <= config.getTargetSize(); + } + + public boolean isMajorNecessary() { + return enoughContent() || rewriteSegmentFileCount > 0; + } + + public boolean isMinorNecessary() { + int smallFileCount = fragmentFileCount + equalityDeleteFileCount; + return smallFileCount >= config.getMinorLeastFileCount() || + smallFileCount > 1 && reachMinorInterval() || combinePosSegmentFileCount > 0; + } + + protected boolean reachMinorInterval() { + return config.getMinorLeastInterval() >= 0 && planTime - tableRuntime.getLastMinorOptimizingTime() > + config.getMinorLeastInterval(); + } + + protected boolean reachFullInterval() { + return reachFullInterval; + } + + public boolean isFullNecessary() { + if (!reachFullInterval()) { + return false; + } + return anyDeleteExist() || fragmentFileCount >= 2 || undersizedSegmentFileCount >= 2 || + rewriteSegmentFileCount > 0 || rewritePosSegmentFileCount > 0; + } + + protected String name() { + if (name == null) { + name = + String.format( + "partition %s of %s", partition, tableRuntime.getTableIdentifier().toString()); + } + return name; + } + + public boolean anyDeleteExist() { + return equalityDeleteFileCount > 0 || posDeleteFileCount > 0; + } + + @Override + public int getFragmentFileCount() { + return fragmentFileCount; + } + + @Override + public long getFragmentFileSize() { + return fragmentFileSize; + } + + @Override + public int getSegmentFileCount() { + return rewriteSegmentFileCount + undersizedSegmentFileCount + rewritePosSegmentFileCount; + } + + @Override + public long getSegmentFileSize() { + return rewriteSegmentFileSize + undersizedSegmentFileSize + rewritePosSegmentFileSize; + } + + @Override + public int getEqualityDeleteFileCount() { + return equalityDeleteFileCount; + } + + @Override + public long getEqualityDeleteFileSize() { + return equalityDeleteFileSize; + } + + @Override + public int getPosDeleteFileCount() { + return posDeleteFileCount; + } + + @Override + public long getPosDeleteFileSize() { + return posDeleteFileSize; + } + + public static class Weight implements PartitionEvaluator.Weight { + + private final long cost; + + public Weight(long cost) { + this.cost = cost; + } + + @Override + public int compareTo(PartitionEvaluator.Weight o) { + return Long.compare(this.cost, ((Weight) o).cost); + } + } + + @Override + public String toString() { + return MoreObjects.toStringHelper(this) + .add("partition", partition) + .add("config", config) + .add("fragmentSize", fragmentSize) + .add("undersizedSegmentSize", minTargetSize) + .add("planTime", planTime) + .add("lastMinorOptimizeTime", tableRuntime.getLastMinorOptimizingTime()) + .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime()) + .add("lastFullOptimizeTime", tableRuntime.getLastFullOptimizingTime()) + .add("fragmentFileCount", fragmentFileCount) + .add("fragmentFileSize", fragmentFileSize) + .add("rewriteSegmentFileCount", rewriteSegmentFileCount) + .add("rewriteSegmentFileSize", rewriteSegmentFileSize) + .add("undersizedSegmentFileCount", undersizedSegmentFileCount) + .add("undersizedSegmentFileSize", undersizedSegmentFileSize) + .add("rewritePosSegmentFileCount", rewritePosSegmentFileCount) + .add("rewritePosSegmentFileSize", rewritePosSegmentFileSize) + .add("min1SegmentFileSize", min1SegmentFileSize) + .add("min2SegmentFileSize", min2SegmentFileSize) + .add("equalityDeleteFileCount", equalityDeleteFileCount) + .add("equalityDeleteFileSize", equalityDeleteFileSize) + .add("posDeleteFileCount", posDeleteFileCount) + .add("posDeleteFileSize", posDeleteFileSize) + .toString(); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CompatiblePropertyUtil.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CompatiblePropertyUtil.java new file mode 100644 index 000000000000..80d9134daa46 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/CompatiblePropertyUtil.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import java.util.Map; +import org.apache.iceberg.util.PropertyUtil; + +/** PropertyUtil compatible with legacy properties */ +public class CompatiblePropertyUtil { + + private CompatiblePropertyUtil() { + } + + public static boolean propertyAsBoolean( + Map properties, String property, boolean defaultValue) { + return PropertyUtil.propertyAsBoolean( + properties, getCompatibleProperty(properties, property), defaultValue); + } + + public static double propertyAsDouble( + Map properties, String property, double defaultValue) { + return PropertyUtil.propertyAsDouble( + properties, getCompatibleProperty(properties, property), defaultValue); + } + + public static int propertyAsInt( + Map properties, String property, int defaultValue) { + return PropertyUtil.propertyAsInt( + properties, getCompatibleProperty(properties, property), defaultValue); + } + + public static long propertyAsLong( + Map properties, String property, long defaultValue) { + return PropertyUtil.propertyAsLong( + properties, getCompatibleProperty(properties, property), defaultValue); + } + + private static String getCompatibleProperty(Map properties, String property) { + String legacyProperty = getLegacyProperty(property); + if (legacyProperty != null && properties.containsKey(legacyProperty) && !properties.containsKey(property)) { + return legacyProperty; + } else { + return property; + } + } + + private static String getLegacyProperty(String property) { + if (property == null) { + return null; + } + switch (property) { + case TableProperties.ENABLE_SELF_OPTIMIZING: + return TableProperties.ENABLE_OPTIMIZE; + case TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT: + return TableProperties.MINOR_OPTIMIZE_TRIGGER_DELETE_FILE_COUNT; + case TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL: + return TableProperties.MINOR_OPTIMIZE_TRIGGER_MAX_INTERVAL; + case TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL: + return TableProperties.FULL_OPTIMIZE_TRIGGER_MAX_INTERVAL; + default: + return null; + } + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/IcebergTableFileScanHelper.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/IcebergTableFileScanHelper.java new file mode 100644 index 000000000000..4176c8df3e94 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/IcebergTableFileScanHelper.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; + +public class IcebergTableFileScanHelper implements TableFileScanHelper { + private final Table table; + private Expression partitionFilter = Expressions.alwaysTrue(); + private final long snapshotId; + + public IcebergTableFileScanHelper(Table table, long snapshotId) { + this.table = table; + this.snapshotId = snapshotId; + } + + @Override + public CloseableIterable scan() { + return CloseableIterable.transform( + table.newScan().useSnapshot(snapshotId).filter(partitionFilter).planFiles(), + this::buildFileScanResult); + } + + protected FileScanResult buildFileScanResult(FileScanTask fileScanTask) { + return new FileScanResult(fileScanTask.file(), Lists.newArrayList(fileScanTask.deletes())); + } + + @Override + public TableFileScanHelper withPartitionFilter(Expression filter) { + this.partitionFilter = filter; + return this; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingConfig.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingConfig.java new file mode 100644 index 000000000000..7e8b541db15e --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingConfig.java @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import java.util.Map; + +/** Configuration for optimizing process scheduling and executing. */ +@JsonIgnoreProperties(ignoreUnknown = true) +public class OptimizingConfig { + + // self-optimizing.enabled + private boolean enabled; + + // self-optimizing.target-size + private long targetSize; + + // read.split.open-file-cost + private long openFileCost; + + // self-optimizing.fragment-ratio + private int fragmentRatio; + + // self-optimizing.min-target-size-ratio + private double minTargetSizeRatio; + + // self-optimizing.minor.trigger.file-count + private int minorLeastFileCount; + + // self-optimizing.minor.trigger.interval + private int minorLeastInterval; + + // self-optimizing.major.trigger.duplicate-ratio + private double majorDuplicateRatio; + + // self-optimizing.full.trigger.interval + private int fullTriggerInterval; + + // self-optimizing.full.rewrite-all-files + private boolean fullRewriteAllFiles; + + public OptimizingConfig() { + } + + public boolean isEnabled() { + return enabled; + } + + public OptimizingConfig setEnabled(boolean enabled) { + this.enabled = enabled; + return this; + } + + public long getTargetSize() { + return targetSize; + } + + public OptimizingConfig setTargetSize(long targetSize) { + this.targetSize = targetSize; + return this; + } + + public long getOpenFileCost() { + return openFileCost; + } + + public OptimizingConfig setOpenFileCost(long openFileCost) { + this.openFileCost = openFileCost; + return this; + } + + public int getFragmentRatio() { + return fragmentRatio; + } + + public OptimizingConfig setFragmentRatio(int fragmentRatio) { + this.fragmentRatio = fragmentRatio; + return this; + } + + public double getMinTargetSizeRatio() { + return minTargetSizeRatio; + } + + public OptimizingConfig setMinTargetSizeRatio(double minTargetSizeRatio) { + this.minTargetSizeRatio = minTargetSizeRatio; + return this; + } + + public int getMinorLeastFileCount() { + return minorLeastFileCount; + } + + public OptimizingConfig setMinorLeastFileCount(int minorLeastFileCount) { + this.minorLeastFileCount = minorLeastFileCount; + return this; + } + + public int getMinorLeastInterval() { + return minorLeastInterval; + } + + public OptimizingConfig setMinorLeastInterval(int minorLeastInterval) { + this.minorLeastInterval = minorLeastInterval; + return this; + } + + public double getMajorDuplicateRatio() { + return majorDuplicateRatio; + } + + public OptimizingConfig setMajorDuplicateRatio(double majorDuplicateRatio) { + this.majorDuplicateRatio = majorDuplicateRatio; + return this; + } + + public int getFullTriggerInterval() { + return fullTriggerInterval; + } + + public OptimizingConfig setFullTriggerInterval(int fullTriggerInterval) { + this.fullTriggerInterval = fullTriggerInterval; + return this; + } + + public boolean isFullRewriteAllFiles() { + return fullRewriteAllFiles; + } + + public OptimizingConfig setFullRewriteAllFiles(boolean fullRewriteAllFiles) { + this.fullRewriteAllFiles = fullRewriteAllFiles; + return this; + } + + public static OptimizingConfig parse(Map properties) { + return new OptimizingConfig() + .setEnabled( + CompatiblePropertyUtil.propertyAsBoolean( + properties, + TableProperties.ENABLE_SELF_OPTIMIZING, + TableProperties.ENABLE_SELF_OPTIMIZING_DEFAULT)) + .setFragmentRatio( + CompatiblePropertyUtil.propertyAsInt( + properties, + TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO, + TableProperties.SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT)) + .setMinTargetSizeRatio( + CompatiblePropertyUtil.propertyAsDouble( + properties, + TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO, + TableProperties.SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT)) + .setOpenFileCost( + CompatiblePropertyUtil.propertyAsLong( + properties, + TableProperties.SPLIT_OPEN_FILE_COST, + TableProperties.SPLIT_OPEN_FILE_COST_DEFAULT)) + .setTargetSize( + CompatiblePropertyUtil.propertyAsLong( + properties, + TableProperties.SELF_OPTIMIZING_TARGET_SIZE, + TableProperties.SELF_OPTIMIZING_TARGET_SIZE_DEFAULT)) + .setMinorLeastFileCount( + CompatiblePropertyUtil.propertyAsInt( + properties, + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT, + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT_DEFAULT)) + .setMinorLeastInterval( + CompatiblePropertyUtil.propertyAsInt( + properties, + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL, + TableProperties.SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL_DEFAULT)) + .setMajorDuplicateRatio( + CompatiblePropertyUtil.propertyAsDouble( + properties, + TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO, + TableProperties.SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO_DEFAULT)) + .setFullTriggerInterval( + CompatiblePropertyUtil.propertyAsInt( + properties, + TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL, + TableProperties.SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT)) + .setFullRewriteAllFiles( + CompatiblePropertyUtil.propertyAsBoolean( + properties, + TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES, + TableProperties.SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT)); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingStatus.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingStatus.java new file mode 100644 index 000000000000..467cb3a5bea5 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingStatus.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +public enum OptimizingStatus { + FULL_OPTIMIZING("full", true), + MAJOR_OPTIMIZING("major", true), + MINOR_OPTIMIZING("minor", true), + COMMITTING("committing", true), + PLANNING("planning", false), + PENDING("pending", false), + IDLE("idle", false); + private final String displayValue; + + private final boolean isProcessing; + + OptimizingStatus(String displayValue, boolean isProcessing) { + this.displayValue = displayValue; + this.isProcessing = isProcessing; + } + + public boolean isProcessing() { + return isProcessing; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingType.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingType.java new file mode 100644 index 000000000000..842c0513c452 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/OptimizingType.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +public enum OptimizingType { + MINOR(OptimizingStatus.MINOR_OPTIMIZING), + MAJOR(OptimizingStatus.MAJOR_OPTIMIZING), + FULL(OptimizingStatus.FULL_OPTIMIZING); + + private final OptimizingStatus status; + + OptimizingType(OptimizingStatus status) { + this.status = status; + } + + public OptimizingStatus getStatus() { + return status; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/PartitionEvaluator.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/PartitionEvaluator.java new file mode 100644 index 000000000000..c103d2bc28a0 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/PartitionEvaluator.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import java.util.List; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.util.Pair; + +/** PartitionEvaluator is used to evaluate whether a partition is necessary to be optimized. */ +public interface PartitionEvaluator { + + /** + * Weight determines the priority of partition execution, with higher weights having higher + * priority. + */ + interface Weight extends Comparable {} + + /** + * Get the partition represented by a Pair of {@link PartitionSpec#specId()} and partition {@link + * StructLike}. + * + * @return the Pair of the partition spec id and the partition + */ + Pair getPartition(); + + /** + * Add a Data file and its related Delete files to this evaluator + * + * @param dataFile - Data file + * @param deletes - Delete files + * @return true if the file is added successfully, false if the file will not be optimized + */ + boolean addFile(DataFile dataFile, List> deletes); + + /** + * Whether this partition is necessary to optimize. + * + * @return true for is necessary to optimize, false for not necessary + */ + boolean isNecessary(); + + /** + * Get the cost of optimizing for this partition. + * + * @return the cost of optimizing + */ + long getCost(); + + /** + * Get the weight of this partition which determines the priority of partition execution. + * + * @return the weight of this partition + */ + Weight getWeight(); + + /** + * Get the optimizing type of this partition. + * + * @return the OptimizingType + */ + OptimizingType getOptimizingType(); + + /** Get the count of fragment files involved in optimizing. */ + int getFragmentFileCount(); + + /** Get the total size of fragment files involved in optimizing. */ + long getFragmentFileSize(); + + /** Get the count of segment files involved in optimizing. */ + int getSegmentFileCount(); + + /** Get the total size of segment files involved in optimizing. */ + long getSegmentFileSize(); + + /** Get the count of equality delete files involved in optimizing. */ + int getEqualityDeleteFileCount(); + + /** Get the total size of equality delete files involved in optimizing. */ + long getEqualityDeleteFileSize(); + + /** Get the count of positional delete files involved in optimizing. */ + int getPosDeleteFileCount(); + + /** Get the total size of positional delete files involved in optimizing. */ + long getPosDeleteFileSize(); +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/ServerTableIdentifier.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/ServerTableIdentifier.java new file mode 100644 index 000000000000..74dcc99a3335 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/ServerTableIdentifier.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import java.util.Objects; + +/** Server-side table identifier containing server-side id and table format. */ +public class ServerTableIdentifier { + + private Long id; + private String catalog; + private String database; + private String tableName; + private TableFormat format; + + // used by the MyBatis framework. + private ServerTableIdentifier() { + } + + private ServerTableIdentifier(TableIdentifier tableIdentifier, TableFormat format) { + this.catalog = tableIdentifier.getCatalog(); + this.database = tableIdentifier.getDatabase(); + this.tableName = tableIdentifier.getTableName(); + this.format = format; + } + + private ServerTableIdentifier( + String catalog, String database, String tableName, TableFormat format) { + this.catalog = catalog; + this.database = database; + this.tableName = tableName; + this.format = format; + } + + private ServerTableIdentifier( + Long id, String catalog, String database, String tableName, TableFormat format) { + this.id = id; + this.catalog = catalog; + this.database = database; + this.tableName = tableName; + this.format = format; + } + + public Long getId() { + return id; + } + + public String getCatalog() { + return catalog; + } + + public String getDatabase() { + return database; + } + + public String getTableName() { + return tableName; + } + + public TableFormat getFormat() { + return this.format; + } + + public void setId(Long id) { + this.id = id; + } + + public void setCatalog(String catalog) { + this.catalog = catalog; + } + + public void setDatabase(String database) { + this.database = database; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setFormat(TableFormat format) { + this.format = format; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ServerTableIdentifier that = (ServerTableIdentifier) o; + return Objects.equals(id, that.id) && Objects.equals(catalog, that.catalog) && + Objects.equals(database, that.database) && Objects.equals(tableName, that.tableName); + } + + @Override + public int hashCode() { + return Objects.hash(id, catalog, database, tableName); + } + + @Override + public String toString() { + return String.format("%s.%s.%s(tableId=%d)", catalog, database, tableName, id); + } + + public static ServerTableIdentifier of(TableIdentifier tableIdentifier, TableFormat format) { + return new ServerTableIdentifier(tableIdentifier, format); + } + + public static ServerTableIdentifier of( + String catalog, String database, String tableName, TableFormat format) { + return new ServerTableIdentifier(catalog, database, tableName, format); + } + + public static ServerTableIdentifier of( + Long id, String catalog, String database, String tableName, TableFormat format) { + return new ServerTableIdentifier(id, catalog, database, tableName, format); + } + + public TableIdentifier getIdentifier() { + return new TableIdentifier(catalog, database, tableName); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableConfiguration.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableConfiguration.java new file mode 100644 index 000000000000..3f0f88f4f4ed --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableConfiguration.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; + +@JsonIgnoreProperties(ignoreUnknown = true) +public class TableConfiguration { + + private OptimizingConfig optimizingConfig; + + public TableConfiguration() { + } + + public OptimizingConfig getOptimizingConfig() { + return optimizingConfig; + } + + public TableConfiguration setOptimizingConfig(OptimizingConfig optimizingConfig) { + this.optimizingConfig = optimizingConfig; + return this; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFileScanHelper.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFileScanHelper.java new file mode 100644 index 000000000000..8fbc0fd24838 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFileScanHelper.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +import java.util.List; +import org.apache.iceberg.ContentFile; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; + +public interface TableFileScanHelper { + class FileScanResult { + private final DataFile file; + private final List> deleteFiles; + + public FileScanResult(DataFile file, List> deleteFiles) { + this.file = file; + this.deleteFiles = deleteFiles; + } + + public DataFile file() { + return file; + } + + public List> deleteFiles() { + return deleteFiles; + } + } + + CloseableIterable scan(); + + TableFileScanHelper withPartitionFilter(Expression partitionFilter); +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFormat.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFormat.java new file mode 100644 index 000000000000..89ffb773510c --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableFormat.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +/** + * Table formats Amoro supported + * + * @since 0.4.0 + */ +public enum TableFormat { + ICEBERG, + MIXED_ICEBERG, + MIXED_HIVE, + PAIMON; + + public boolean in(TableFormat... tableFormats) { + for (TableFormat tableFormat : tableFormats) { + if (this == tableFormat) { + return true; + } + } + return false; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableIdentifier.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableIdentifier.java new file mode 100644 index 000000000000..6bcdcecfeb6b --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableIdentifier.java @@ -0,0 +1,615 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +@SuppressWarnings({"cast", "rawtypes", "unchecked", "unused", "checkstyle:WhitespaceAfter", "checkstyle:LineLength", + "checkstyle:VisibilityModifier", "checkstyle:TypeName", "checkstyle:WhitespaceAround", "checkstyle:NeedBraces", + "checkstyle:MemberName", "checkstyle:EmptyLineSeparator", "checkstyle:Indentation", "checkstyle:LeftCurly", + "checkstyle:CyclomaticComplexity", "checkstyle:LocalVariableName", "checkstyle:SimplifyBooleanExpression"}) +@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.13.0)", date = "2024-03-10") +public class TableIdentifier implements org.apache.thrift.TBase, + java.io.Serializable, Cloneable, Comparable { + private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = + new org.apache.thrift.protocol.TStruct("TableIdentifier"); + + private static final org.apache.thrift.protocol.TField CATALOG_FIELD_DESC = + new org.apache.thrift.protocol.TField("catalog", org.apache.thrift.protocol.TType.STRING, (short)1); + private static final org.apache.thrift.protocol.TField DATABASE_FIELD_DESC = + new org.apache.thrift.protocol.TField("database", org.apache.thrift.protocol.TType.STRING, (short)2); + private static final org.apache.thrift.protocol.TField TABLE_NAME_FIELD_DESC = + new org.apache.thrift.protocol.TField("tableName", org.apache.thrift.protocol.TType.STRING, (short)3); + + private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY + = new TableIdentifierStandardSchemeFactory(); + private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = + new TableIdentifierTupleSchemeFactory(); + + public @org.apache.thrift.annotation.Nullable String catalog; // required + public @org.apache.thrift.annotation.Nullable String database; // required + public @org.apache.thrift.annotation.Nullable String tableName; // required + + /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ + public enum _Fields implements org.apache.thrift.TFieldIdEnum { + CATALOG((short)1, "catalog"), + DATABASE((short)2, "database"), + TABLE_NAME((short)3, "tableName"); + + private static final java.util.Map byName = new java.util.HashMap(); + + static { + for (_Fields field : java.util.EnumSet.allOf(_Fields.class)) { + byName.put(field.getFieldName(), field); + } + } + + /** + * Find the _Fields constant that matches fieldId, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByThriftId(int fieldId) { + switch(fieldId) { + case 1: // CATALOG + return CATALOG; + case 2: // DATABASE + return DATABASE; + case 3: // TABLE_NAME + return TABLE_NAME; + default: + return null; + } + } + + /** + * Find the _Fields constant that matches fieldId, throwing an exception + * if it is not found. + */ + public static _Fields findByThriftIdOrThrow(int fieldId) { + _Fields fields = findByThriftId(fieldId); + if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!"); + return fields; + } + + /** + * Find the _Fields constant that matches name, or null if its not found. + */ + @org.apache.thrift.annotation.Nullable + public static _Fields findByName(String name) { + return byName.get(name); + } + + private final short _thriftId; + private final String _fieldName; + + _Fields(short thriftId, String fieldName) { + _thriftId = thriftId; + _fieldName = fieldName; + } + + public short getThriftFieldId() { + return _thriftId; + } + + public String getFieldName() { + return _fieldName; + } + } + + // isset id assignments + public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; + static { + java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); + tmpMap.put(_Fields.CATALOG, new org.apache.thrift.meta_data.FieldMetaData("catalog", + org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.DATABASE, new org.apache.thrift.meta_data.FieldMetaData("database", + org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + tmpMap.put(_Fields.TABLE_NAME, new org.apache.thrift.meta_data.FieldMetaData("tableName", + org.apache.thrift.TFieldRequirementType.DEFAULT, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); + metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); + org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(TableIdentifier.class, metaDataMap); + } + + public TableIdentifier() { + } + + public TableIdentifier( + String catalog, + String database, + String tableName) + { + this(); + this.catalog = catalog; + this.database = database; + this.tableName = tableName; + } + + /** + * Performs a deep copy on other. + */ + public TableIdentifier(TableIdentifier other) { + if (other.isSetCatalog()) { + this.catalog = other.catalog; + } + if (other.isSetDatabase()) { + this.database = other.database; + } + if (other.isSetTableName()) { + this.tableName = other.tableName; + } + } + + public TableIdentifier deepCopy() { + return new TableIdentifier(this); + } + + @Override + public void clear() { + this.catalog = null; + this.database = null; + this.tableName = null; + } + + @org.apache.thrift.annotation.Nullable + public String getCatalog() { + return this.catalog; + } + + public TableIdentifier setCatalog(@org.apache.thrift.annotation.Nullable String catalog) { + this.catalog = catalog; + return this; + } + + public void unsetCatalog() { + this.catalog = null; + } + + /** Returns true if field catalog is set (has been assigned a value) and false otherwise */ + public boolean isSetCatalog() { + return this.catalog != null; + } + + public void setCatalogIsSet(boolean value) { + if (!value) { + this.catalog = null; + } + } + + @org.apache.thrift.annotation.Nullable + public String getDatabase() { + return this.database; + } + + public TableIdentifier setDatabase(@org.apache.thrift.annotation.Nullable String database) { + this.database = database; + return this; + } + + public void unsetDatabase() { + this.database = null; + } + + /** Returns true if field database is set (has been assigned a value) and false otherwise */ + public boolean isSetDatabase() { + return this.database != null; + } + + public void setDatabaseIsSet(boolean value) { + if (!value) { + this.database = null; + } + } + + @org.apache.thrift.annotation.Nullable + public String getTableName() { + return this.tableName; + } + + public TableIdentifier setTableName(@org.apache.thrift.annotation.Nullable String tableName) { + this.tableName = tableName; + return this; + } + + public void unsetTableName() { + this.tableName = null; + } + + /** Returns true if field tableName is set (has been assigned a value) and false otherwise */ + public boolean isSetTableName() { + return this.tableName != null; + } + + public void setTableNameIsSet(boolean value) { + if (!value) { + this.tableName = null; + } + } + + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable Object value) { + switch (field) { + case CATALOG: + if (value == null) { + unsetCatalog(); + } else { + setCatalog((String)value); + } + break; + + case DATABASE: + if (value == null) { + unsetDatabase(); + } else { + setDatabase((String)value); + } + break; + + case TABLE_NAME: + if (value == null) { + unsetTableName(); + } else { + setTableName((String)value); + } + break; + + } + } + + @org.apache.thrift.annotation.Nullable + public Object getFieldValue(_Fields field) { + switch (field) { + case CATALOG: + return getCatalog(); + + case DATABASE: + return getDatabase(); + + case TABLE_NAME: + return getTableName(); + + } + throw new IllegalStateException(); + } + + /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */ + public boolean isSet(_Fields field) { + if (field == null) { + throw new IllegalArgumentException(); + } + + switch (field) { + case CATALOG: + return isSetCatalog(); + case DATABASE: + return isSetDatabase(); + case TABLE_NAME: + return isSetTableName(); + } + throw new IllegalStateException(); + } + + @Override + public boolean equals(Object that) { + if (that == null) + return false; + if (that instanceof TableIdentifier) + return this.equals((TableIdentifier)that); + return false; + } + + public boolean equals(TableIdentifier that) { + if (that == null) + return false; + if (this == that) + return true; + + boolean this_present_catalog = true && this.isSetCatalog(); + boolean that_present_catalog = true && that.isSetCatalog(); + if (this_present_catalog || that_present_catalog) { + if (!(this_present_catalog && that_present_catalog)) + return false; + if (!this.catalog.equals(that.catalog)) + return false; + } + + boolean this_present_database = true && this.isSetDatabase(); + boolean that_present_database = true && that.isSetDatabase(); + if (this_present_database || that_present_database) { + if (!(this_present_database && that_present_database)) + return false; + if (!this.database.equals(that.database)) + return false; + } + + boolean this_present_tableName = true && this.isSetTableName(); + boolean that_present_tableName = true && that.isSetTableName(); + if (this_present_tableName || that_present_tableName) { + if (!(this_present_tableName && that_present_tableName)) + return false; + if (!this.tableName.equals(that.tableName)) + return false; + } + + return true; + } + + @Override + public int hashCode() { + int hashCode = 1; + + hashCode = hashCode * 8191 + ((isSetCatalog()) ? 131071 : 524287); + if (isSetCatalog()) + hashCode = hashCode * 8191 + catalog.hashCode(); + + hashCode = hashCode * 8191 + ((isSetDatabase()) ? 131071 : 524287); + if (isSetDatabase()) + hashCode = hashCode * 8191 + database.hashCode(); + + hashCode = hashCode * 8191 + ((isSetTableName()) ? 131071 : 524287); + if (isSetTableName()) + hashCode = hashCode * 8191 + tableName.hashCode(); + + return hashCode; + } + + @Override + public int compareTo(TableIdentifier other) { + if (!getClass().equals(other.getClass())) { + return getClass().getName().compareTo(other.getClass().getName()); + } + + int lastComparison = 0; + + lastComparison = Boolean.valueOf(isSetCatalog()).compareTo(other.isSetCatalog()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetCatalog()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.catalog, other.catalog); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetDatabase()).compareTo(other.isSetDatabase()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetDatabase()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.database, other.database); + if (lastComparison != 0) { + return lastComparison; + } + } + lastComparison = Boolean.valueOf(isSetTableName()).compareTo(other.isSetTableName()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetTableName()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.tableName, other.tableName); + if (lastComparison != 0) { + return lastComparison; + } + } + return 0; + } + + @org.apache.thrift.annotation.Nullable + public _Fields fieldForId(int fieldId) { + return _Fields.findByThriftId(fieldId); + } + + public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException { + scheme(iprot).read(iprot, this); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException { + scheme(oprot).write(oprot, this); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder("TableIdentifier("); + boolean first = true; + + sb.append("catalog:"); + if (this.catalog == null) { + sb.append("null"); + } else { + sb.append(this.catalog); + } + first = false; + if (!first) sb.append(", "); + sb.append("database:"); + if (this.database == null) { + sb.append("null"); + } else { + sb.append(this.database); + } + first = false; + if (!first) sb.append(", "); + sb.append("tableName:"); + if (this.tableName == null) { + sb.append("null"); + } else { + sb.append(this.tableName); + } + first = false; + sb.append(")"); + return sb.toString(); + } + + public void validate() throws org.apache.thrift.TException { + // check for required fields + // check for sub-struct validity + } + + private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException { + try { + write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException { + try { + read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in))); + } catch (org.apache.thrift.TException te) { + throw new java.io.IOException(te); + } + } + + private static class TableIdentifierStandardSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public TableIdentifierStandardScheme getScheme() { + return new TableIdentifierStandardScheme(); + } + } + + private static class TableIdentifierStandardScheme extends org.apache.thrift.scheme.StandardScheme { + + public void read(org.apache.thrift.protocol.TProtocol iprot, TableIdentifier struct) + throws org.apache.thrift.TException { + org.apache.thrift.protocol.TField schemeField; + iprot.readStructBegin(); + while (true) + { + schemeField = iprot.readFieldBegin(); + if (schemeField.type == org.apache.thrift.protocol.TType.STOP) { + break; + } + switch (schemeField.id) { + case 1: // CATALOG + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.catalog = iprot.readString(); + struct.setCatalogIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 2: // DATABASE + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.database = iprot.readString(); + struct.setDatabaseIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + case 3: // TABLE_NAME + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.tableName = iprot.readString(); + struct.setTableNameIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; + default: + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + iprot.readFieldEnd(); + } + iprot.readStructEnd(); + + // check for required fields of primitive type, which can't be checked in the validate method + struct.validate(); + } + + public void write(org.apache.thrift.protocol.TProtocol oprot, TableIdentifier struct) + throws org.apache.thrift.TException { + struct.validate(); + + oprot.writeStructBegin(STRUCT_DESC); + if (struct.catalog != null) { + oprot.writeFieldBegin(CATALOG_FIELD_DESC); + oprot.writeString(struct.catalog); + oprot.writeFieldEnd(); + } + if (struct.database != null) { + oprot.writeFieldBegin(DATABASE_FIELD_DESC); + oprot.writeString(struct.database); + oprot.writeFieldEnd(); + } + if (struct.tableName != null) { + oprot.writeFieldBegin(TABLE_NAME_FIELD_DESC); + oprot.writeString(struct.tableName); + oprot.writeFieldEnd(); + } + oprot.writeFieldStop(); + oprot.writeStructEnd(); + } + + } + + private static class TableIdentifierTupleSchemeFactory implements org.apache.thrift.scheme.SchemeFactory { + public TableIdentifierTupleScheme getScheme() { + return new TableIdentifierTupleScheme(); + } + } + + private static class TableIdentifierTupleScheme extends org.apache.thrift.scheme.TupleScheme { + + @Override + public void write(org.apache.thrift.protocol.TProtocol prot, TableIdentifier struct) + throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetCatalog()) { + optionals.set(0); + } + if (struct.isSetDatabase()) { + optionals.set(1); + } + if (struct.isSetTableName()) { + optionals.set(2); + } + oprot.writeBitSet(optionals, 3); + if (struct.isSetCatalog()) { + oprot.writeString(struct.catalog); + } + if (struct.isSetDatabase()) { + oprot.writeString(struct.database); + } + if (struct.isSetTableName()) { + oprot.writeString(struct.tableName); + } + } + + @Override + public void read(org.apache.thrift.protocol.TProtocol prot, TableIdentifier struct) throws org.apache.thrift.TException { + org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; + java.util.BitSet incoming = iprot.readBitSet(3); + if (incoming.get(0)) { + struct.catalog = iprot.readString(); + struct.setCatalogIsSet(true); + } + if (incoming.get(1)) { + struct.database = iprot.readString(); + struct.setDatabaseIsSet(true); + } + if (incoming.get(2)) { + struct.tableName = iprot.readString(); + struct.setTableNameIsSet(true); + } + } + } + + private static S scheme(org.apache.thrift.protocol.TProtocol proto) { + return (org.apache.thrift.scheme.StandardScheme.class.equals(proto.getScheme()) ? + STANDARD_SCHEME_FACTORY : TUPLE_SCHEME_FACTORY).getScheme(); + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableProperties.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableProperties.java new file mode 100644 index 000000000000..cb3876b18bc5 --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableProperties.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +/** Reserved mixed-format table properties list. */ +public class TableProperties { + + private TableProperties() { + } + + /** table optimize related properties */ + public static final String ENABLE_SELF_OPTIMIZING = "self-optimizing.enabled"; + public static final boolean ENABLE_SELF_OPTIMIZING_DEFAULT = true; + public static final String SELF_OPTIMIZING_TARGET_SIZE = "self-optimizing.target-size"; + public static final long SELF_OPTIMIZING_TARGET_SIZE_DEFAULT = 134217728; // 128 MB + public static final String SELF_OPTIMIZING_FRAGMENT_RATIO = "self-optimizing.fragment-ratio"; + public static final int SELF_OPTIMIZING_FRAGMENT_RATIO_DEFAULT = 8; + public static final String SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO = "self-optimizing.min-target-size-ratio"; + public static final double SELF_OPTIMIZING_MIN_TARGET_SIZE_RATIO_DEFAULT = 0.75; + public static final String SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT = "self-optimizing.minor.trigger.file-count"; + public static final int SELF_OPTIMIZING_MINOR_TRIGGER_FILE_CNT_DEFAULT = 12; + public static final String SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL = "self-optimizing.minor.trigger.interval"; + public static final int SELF_OPTIMIZING_MINOR_TRIGGER_INTERVAL_DEFAULT = 3600000; // 1 h + public static final String SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO = + "self-optimizing.major.trigger.duplicate-ratio"; + public static final double SELF_OPTIMIZING_MAJOR_TRIGGER_DUPLICATE_RATIO_DEFAULT = 0.1; + public static final String SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL = "self-optimizing.full.trigger.interval"; + public static final int SELF_OPTIMIZING_FULL_TRIGGER_INTERVAL_DEFAULT = -1; // not trigger + public static final String SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES = "self-optimizing.full.rewrite-all-files"; + public static final boolean SELF_OPTIMIZING_FULL_REWRITE_ALL_FILES_DEFAULT = true; + + @Deprecated public static final String ENABLE_OPTIMIZE = "optimize.enable"; + @Deprecated public static final String FULL_OPTIMIZE_TRIGGER_MAX_INTERVAL = "optimize.full.trigger.max-interval"; + @Deprecated public static final String MINOR_OPTIMIZE_TRIGGER_MAX_INTERVAL = "optimize.minor.trigger.max-interval"; + @Deprecated public static final String MINOR_OPTIMIZE_TRIGGER_DELETE_FILE_COUNT = + "optimize.minor.trigger.delete-file-count"; + public static final String SPLIT_OPEN_FILE_COST = org.apache.iceberg.TableProperties.SPLIT_OPEN_FILE_COST; + public static final long SPLIT_OPEN_FILE_COST_DEFAULT = 4 * 1024 * 1024; // 4MB +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntime.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntime.java new file mode 100644 index 000000000000..c613154590fc --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntime.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +public class TableRuntime { + + private final TableConfiguration tableConfiguration; + private final ServerTableIdentifier tableIdentifier; + private volatile long lastFullOptimizingTime; + private volatile long lastMinorOptimizingTime; + + protected TableRuntime(TableRuntimeMeta tableRuntimeMeta) { + this.tableIdentifier = + ServerTableIdentifier.of( + tableRuntimeMeta.getTableId(), + tableRuntimeMeta.getCatalogName(), + tableRuntimeMeta.getDbName(), + tableRuntimeMeta.getTableName(), + tableRuntimeMeta.getFormat()); + this.lastMinorOptimizingTime = tableRuntimeMeta.getLastMinorOptimizingTime(); + this.lastFullOptimizingTime = tableRuntimeMeta.getLastFullOptimizingTime(); + this.tableConfiguration = tableRuntimeMeta.getTableConfig(); + } + + public OptimizingConfig getOptimizingConfig() { + return tableConfiguration.getOptimizingConfig(); + } + + public ServerTableIdentifier getTableIdentifier() { + return tableIdentifier; + } + + public long getLastFullOptimizingTime() { + return lastFullOptimizingTime; + } + + public long getLastMinorOptimizingTime() { + return lastMinorOptimizingTime; + } +} diff --git a/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntimeMeta.java b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntimeMeta.java new file mode 100644 index 000000000000..110ac0068b1b --- /dev/null +++ b/iceberg/iceberg-handler/src/main/java/org/apache/iceberg/mr/hive/compaction/evaluator/amoro/TableRuntimeMeta.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.iceberg.mr.hive.compaction.evaluator.amoro; + +public class TableRuntimeMeta { + + private String catalogName; + private long tableId; + private String dbName; + private String tableName; + private TableFormat format; + private long lastMinorOptimizingTime; + private long lastFullOptimizingTime; + private TableConfiguration tableConfig; + private String summary; + + public TableRuntimeMeta() { + + } + + public String getCatalogName() { + return catalogName; + } + + public String getTableName() { + return tableName; + } + + public TableFormat getFormat() { + return format; + } + + + public long getLastMinorOptimizingTime() { + return lastMinorOptimizingTime; + } + + public long getLastFullOptimizingTime() { + return lastFullOptimizingTime; + } + + public TableConfiguration getTableConfig() { + return tableConfig; + } + + public String getSummary() { + return summary; + } + + public void setCatalogName(String catalogName) { + this.catalogName = catalogName; + } + + public void setTableName(String tableName) { + this.tableName = tableName; + } + + public void setFormat(TableFormat format) { + this.format = format; + } + + public void setLastMinorOptimizingTime(long lastMinorOptimizingTime) { + this.lastMinorOptimizingTime = lastMinorOptimizingTime; + } + + public void setLastFullOptimizingTime(long lastFullOptimizingTime) { + this.lastFullOptimizingTime = lastFullOptimizingTime; + } + + public void setTableConfig(TableConfiguration tableConfig) { + this.tableConfig = tableConfig; + } + + public void setSummary(String summary) { + this.summary = summary; + } + + public long getTableId() { + return tableId; + } + + public void setTableId(long tableId) { + this.tableId = tableId; + } + + public String getDbName() { + return dbName; + } + + public void setDbName(String dbName) { + this.dbName = dbName; + } +} diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q index da81a842abe0..a2bd38b63491 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_ordered.q @@ -11,6 +11,7 @@ -- Mask current-snapshot-timestamp-ms --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ -- Mask removed file size @@ -42,7 +43,9 @@ alter table ice_orc set partition spec(dept_id); insert into ice_orc VALUES ('fn5','ln5', 2, 20, 101), -('fn6','ln6', 2, 20, 101), +('fn6','ln6', 2, 20, 101); + +insert into ice_orc VALUES ('fn7','ln7', 2, 20, 101), ('fn8','ln8', 2, 20, 101); @@ -53,8 +56,8 @@ insert into ice_orc VALUES ('fn12','ln12', 3, 20, 101); select * from ice_orc where company_id = 100; -select * from ice_orc where dept_id = 2; -select * from ice_orc where dept_id = 3; +select * from ice_orc where dept_id = 2 order by first_name; +select * from ice_orc where dept_id = 3 order by first_name; describe formatted ice_orc; explain alter table ice_orc COMPACT 'major' and wait order by first_name desc; diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q index 76b2dbae41a1..fa9111f45f76 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q @@ -12,6 +12,7 @@ -- Mask current-snapshot-timestamp-ms --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ -- Mask removed file size diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q index 53e915d09ca2..d256b200bad7 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q @@ -12,6 +12,7 @@ -- Mask current-snapshot-timestamp-ms --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ -- Mask removed file size diff --git a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution2.q b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution2.q index 6db0073e7337..31a82b9a4a1f 100644 --- a/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution2.q +++ b/iceberg/iceberg-handler/src/test/queries/positive/iceberg_major_compaction_single_partition_with_evolution2.q @@ -14,6 +14,7 @@ -- Mask current-snapshot-timestamp-ms --! qt:replace:/(\s+current-snapshot-timestamp-ms\s+)\S+(\s*)/$1#Masked#$2/ --! qt:replace:/(MAJOR\s+succeeded\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ +--! qt:replace:/(MAJOR\s+refused\s+)[a-zA-Z0-9\-\.\s+]+(\s+manual)/$1#Masked#$2/ -- Mask compaction id as they will be allocated in parallel threads --! qt:replace:/^[0-9]/#Masked#/ -- Mask iceberg version diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out index 3d1565ff9e57..58aa06f42586 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_ordered.q.out @@ -47,15 +47,23 @@ POSTHOOK: Input: default@ice_orc POSTHOOK: Output: default@ice_orc PREHOOK: query: insert into ice_orc VALUES ('fn5','ln5', 2, 20, 101), -('fn6','ln6', 2, 20, 101), +('fn6','ln6', 2, 20, 101) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@ice_orc +POSTHOOK: query: insert into ice_orc VALUES +('fn5','ln5', 2, 20, 101), +('fn6','ln6', 2, 20, 101) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@ice_orc +PREHOOK: query: insert into ice_orc VALUES ('fn7','ln7', 2, 20, 101), ('fn8','ln8', 2, 20, 101) PREHOOK: type: QUERY PREHOOK: Input: _dummy_database@_dummy_table PREHOOK: Output: default@ice_orc POSTHOOK: query: insert into ice_orc VALUES -('fn5','ln5', 2, 20, 101), -('fn6','ln6', 2, 20, 101), ('fn7','ln7', 2, 20, 101), ('fn8','ln8', 2, 20, 101) POSTHOOK: type: QUERY @@ -89,11 +97,11 @@ fn1 ln1 1 10 100 fn2 ln2 1 10 100 fn3 ln3 1 11 100 fn4 ln4 1 11 100 -PREHOOK: query: select * from ice_orc where dept_id = 2 +PREHOOK: query: select * from ice_orc where dept_id = 2 order by first_name PREHOOK: type: QUERY PREHOOK: Input: default@ice_orc #### A masked pattern was here #### -POSTHOOK: query: select * from ice_orc where dept_id = 2 +POSTHOOK: query: select * from ice_orc where dept_id = 2 order by first_name POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_orc #### A masked pattern was here #### @@ -101,18 +109,18 @@ fn5 ln5 2 20 101 fn6 ln6 2 20 101 fn7 ln7 2 20 101 fn8 ln8 2 20 101 -PREHOOK: query: select * from ice_orc where dept_id = 3 +PREHOOK: query: select * from ice_orc where dept_id = 3 order by first_name PREHOOK: type: QUERY PREHOOK: Input: default@ice_orc #### A masked pattern was here #### -POSTHOOK: query: select * from ice_orc where dept_id = 3 +POSTHOOK: query: select * from ice_orc where dept_id = 3 order by first_name POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_orc #### A masked pattern was here #### -fn9 ln9 3 20 101 fn10 ln10 3 20 101 fn11 ln11 3 20 101 fn12 ln12 3 20 101 +fn9 ln9 3 20 101 PREHOOK: query: describe formatted ice_orc PREHOOK: type: DESCTABLE PREHOOK: Input: default@ice_orc @@ -142,19 +150,19 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"4\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"3\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"added-records\":\"4\",\"added-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"4\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 iceberg.orc.files.only true #### A masked pattern was here #### - numFiles 3 + numFiles 4 numRows 12 parquet.compression zstd #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 3 + snapshot-count 4 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -229,10 +237,10 @@ POSTHOOK: query: select * from ice_orc where company_id = 100 POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_orc #### A masked pattern was here #### -fn4 ln4 1 11 100 -fn3 ln3 1 11 100 -fn2 ln2 1 10 100 fn1 ln1 1 10 100 +fn2 ln2 1 10 100 +fn3 ln3 1 11 100 +fn4 ln4 1 11 100 PREHOOK: query: select * from ice_orc where dept_id = 2 PREHOOK: type: QUERY PREHOOK: Input: default@ice_orc @@ -254,9 +262,9 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@ice_orc #### A masked pattern was here #### fn9 ln9 3 20 101 -fn12 ln12 3 20 101 -fn11 ln11 3 20 101 fn10 ln10 3 20 101 +fn11 ln11 3 20 101 +fn12 ln12 3 20 101 PREHOOK: query: describe formatted ice_orc PREHOOK: type: DESCTABLE PREHOOK: Input: default@ice_orc @@ -286,7 +294,7 @@ Table Parameters: bucketing_version 2 current-schema {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"first_name\",\"required\":false,\"type\":\"string\"},{\"id\":2,\"name\":\"last_name\",\"required\":false,\"type\":\"string\"},{\"id\":3,\"name\":\"dept_id\",\"required\":false,\"type\":\"long\"},{\"id\":4,\"name\":\"team_id\",\"required\":false,\"type\":\"long\"},{\"id\":5,\"name\":\"company_id\",\"required\":false,\"type\":\"long\"}]} current-snapshot-id #Masked# - current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"1\",\"added-records\":\"4\",\"deleted-records\":\"4\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"2\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"3\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} + current-snapshot-summary {\"added-data-files\":\"1\",\"deleted-data-files\":\"2\",\"added-records\":\"4\",\"deleted-records\":\"4\",\"added-files-size\":\"#Masked#\",\"removed-files-size\":\"#Masked#\",\"changed-partition-count\":\"1\",\"total-records\":\"12\",\"total-files-size\":\"#Masked#\",\"total-data-files\":\"3\",\"total-delete-files\":\"0\",\"total-position-deletes\":\"0\",\"total-equality-deletes\":\"0\",\"iceberg-version\":\"#Masked#\"} current-snapshot-timestamp-ms #Masked# default-partition-spec {\"spec-id\":1,\"fields\":[{\"name\":\"dept_id\",\"transform\":\"identity\",\"source-id\":3,\"field-id\":1001}]} format-version 2 @@ -298,7 +306,7 @@ Table Parameters: #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 6 + snapshot-count 5 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -321,5 +329,5 @@ POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId #Masked# default ice_orc dept_id=2 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc dept_id=3 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc dept_id=3 MAJOR refused #Masked# manual default 0 0 0 --- +#Masked# default ice_orc --- MAJOR refused #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out index d5bdbcee0297..9d744547aced 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_dyn_spec_w_filter.q.out @@ -358,7 +358,7 @@ Table Parameters: #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 18 + snapshot-count 17 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -381,7 +381,7 @@ POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId #Masked# default ice_orc event_src_trunc=AAA/event_time_month=2024-08 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc event_src_trunc=AAA/event_time_month=2024-09 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc event_src_trunc=AAA/event_time_month=2024-09 MAJOR refused #Masked# manual default 0 0 0 --- #Masked# default ice_orc event_src_trunc=BBB/event_time_month=2024-07 MAJOR succeeded #Masked# manual default 0 0 0 --- #Masked# default ice_orc event_src_trunc=BBB/event_time_month=2024-08 MAJOR succeeded #Masked# manual default 0 0 0 --- #Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 --- diff --git a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out index 7df4035b818a..99aa09be2ed0 100644 --- a/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out +++ b/iceberg/iceberg-handler/src/test/results/positive/llap/iceberg_major_compaction_partition_evolution_w_id_spec_w_filter.q.out @@ -383,7 +383,7 @@ Table Parameters: #### A masked pattern was here #### rawDataSize 0 serialization.format 1 - snapshot-count 20 + snapshot-count 16 storage_handler org.apache.iceberg.mr.hive.HiveIcebergStorageHandler table_type ICEBERG totalSize #Masked# @@ -405,8 +405,8 @@ PREHOOK: type: SHOW COMPACTIONS POSTHOOK: query: show compactions order by 'partition' POSTHOOK: type: SHOW COMPACTIONS CompactionId Database Table Partition Type State Worker host Worker Enqueue Time Start Time Duration(ms) HadoopJobId Error message Initiator host Initiator Pool name TxnId Next TxnId Commit Time Highest WriteId -#Masked# default ice_orc team_id=10 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc team_id=11 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc team_id=12 MAJOR succeeded #Masked# manual default 0 0 0 --- -#Masked# default ice_orc team_id=13 MAJOR succeeded #Masked# manual default 0 0 0 --- +#Masked# default ice_orc team_id=10 MAJOR refused #Masked# manual default 0 0 0 --- +#Masked# default ice_orc team_id=11 MAJOR refused #Masked# manual default 0 0 0 --- +#Masked# default ice_orc team_id=12 MAJOR refused #Masked# manual default 0 0 0 --- +#Masked# default ice_orc team_id=13 MAJOR refused #Masked# manual default 0 0 0 --- #Masked# default ice_orc --- MAJOR succeeded #Masked# manual default 0 0 0 ---