Skip to content

Commit

Permalink
HIVE-28600: Iceberg: Check whether the table/partition requires compa…
Browse files Browse the repository at this point in the history
…ction before initiating one (Dmitriy Fingerman, reviewed by Denys Kuzmenko)

Closes #5529
  • Loading branch information
difin authored Jan 20, 2025
1 parent 2d3f047 commit 5146dba
Show file tree
Hide file tree
Showing 29 changed files with 2,357 additions and 84 deletions.
3 changes: 2 additions & 1 deletion common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,8 @@ public static enum ConfVars {
"If this is set to true the URI for auth will have the default location masked with DEFAULT_TABLE_LOCATION"),
HIVE_ICEBERG_ALLOW_DATAFILES_IN_TABLE_LOCATION_ONLY("hive.iceberg.allow.datafiles.in.table.location.only", false,
"If this is set to true, then all the data files being read should be withing the table location"),

HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE("hive.iceberg.compaction.target.file.size", "128mb",
new SizeValidator(), "Target file size for Iceberg compaction."),
HIVE_USE_EXPLICIT_RCFILE_HEADER("hive.exec.rcfile.use.explicit.header", true,
"If this is set the header for RCFiles will simply be RCF. If this is not\n" +
"set the header will be that borrowed from sequence files, e.g. SEQ- followed\n" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.mr.hive.compaction.IcebergCompactionService;
import org.apache.iceberg.mr.hive.compaction.IcebergCompactionUtil;
import org.apache.iceberg.mr.hive.writer.HiveIcebergWriter;
import org.apache.iceberg.mr.hive.writer.WriterRegistry;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -598,8 +599,8 @@ private void commitWrite(Table table, String branchName, Long snapshotId, long s
*/
private void commitCompaction(Table table, Long snapshotId, long startTime, FilesForCommit results,
String partitionPath) {
List<DataFile> existingDataFiles = IcebergTableUtil.getDataFiles(table, partitionPath);
List<DeleteFile> existingDeleteFiles = IcebergTableUtil.getDeleteFiles(table, partitionPath);
List<DataFile> existingDataFiles = IcebergCompactionUtil.getDataFiles(table, partitionPath);
List<DeleteFile> existingDeleteFiles = IcebergCompactionUtil.getDeleteFiles(table, partitionPath);

RewriteFiles rewriteFiles = table.newRewrite();
existingDataFiles.forEach(rewriteFiles::deleteFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@
import org.apache.hadoop.hive.ql.plan.PlanUtils;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hadoop.hive.ql.session.SessionStateUtil;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.DeleteFiles;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.ManageSnapshots;
Expand All @@ -61,9 +59,7 @@
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.PartitionsTable;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.RowLevelOperationMode;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Snapshot;
import org.apache.iceberg.SnapshotRef;
Expand All @@ -80,7 +76,6 @@
import org.apache.iceberg.mr.Catalogs;
import org.apache.iceberg.mr.InputFormatConfig;
import org.apache.iceberg.relocated.com.google.common.collect.FluentIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Type;
Expand Down Expand Up @@ -412,52 +407,6 @@ public static PartitionData toPartitionData(StructLike sourceKey, Types.StructTy
return data;
}

/**
* Returns table's list of data files as following:
* 1. If the table is unpartitioned, returns all data files.
* 2. If partitionPath is not provided, returns all data files that belong to the non-latest partition spec.
* 3. If partitionPath is provided, returns all data files that belong to the corresponding partition.
* @param table the iceberg table
* @param partitionPath partition path
*/
public static List<DataFile> getDataFiles(Table table, String partitionPath) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return !table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
}

/**
* Returns table's list of delete files as following:
* 1. If the table is unpartitioned, returns all delete files.
* 2. If partitionPath is not provided, returns all delete files that belong to the non-latest partition spec.
* 3. If partitionPath is provided, returns all delete files that belong to corresponding partition.
* @param table the iceberg table
* @param partitionPath partition path
*/
public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return !table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file()));
}

public static Expression generateExpressionFromPartitionSpec(Table table, Map<String, String> partitionSpec)
throws SemanticException {
Map<String, PartitionField> partitionFieldMap = getPartitionFields(table).stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hadoop.hive.ql.txn.compactor.CompactorPipeline;
import org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil;
import org.apache.hadoop.hive.ql.txn.compactor.service.CompactionService;
import org.apache.iceberg.mr.hive.IcebergTableUtil;
import org.apache.iceberg.mr.hive.compaction.evaluator.IcebergCompactionEvaluator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -50,6 +52,14 @@ public Boolean compact(Table table, CompactionInfo ci) throws Exception {
}
CompactorUtil.checkInterrupt(CLASS_NAME);

org.apache.iceberg.Table icebergTable = IcebergTableUtil.getTable(conf, table);
if (!IcebergCompactionEvaluator.isEligibleForCompaction(icebergTable, ci.partName, ci.type, conf)) {
LOG.info("Table={}{} doesn't meet requirements for compaction", table.getTableName(),
ci.partName == null ? "" : ", partition=" + ci.partName);
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
return false;
}

if (ci.runAs == null) {
ci.runAs = TxnUtils.findUserToRunAs(table.getSd().getLocation(), table, conf);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.mr.hive.compaction;

import java.util.List;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileScanTask;
import org.apache.iceberg.MetadataTableType;
import org.apache.iceberg.MetadataTableUtils;
import org.apache.iceberg.PositionDeletesScanTask;
import org.apache.iceberg.ScanTask;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

public class IcebergCompactionUtil {

private IcebergCompactionUtil() {

}

/**
* This method implements a common filter that is used in several places in Iceberg compaction code.
* Its aim is to determine if the provided file needs to be handled when compacting a partition whose path is equal to
* the provided partitionPath. Returns true when one of the following conditions is true, otherwise returns false:
* 1. table is unpartitioned
* 2. partitionPath is null and the file belongs to the non-latest partition spec
* 3. partitionPath is not null and the file belongs to the partition whose path is the partitionPath
* @param table the iceberg table
* @param partitionPath partition path
* @param file Data or Delete file
*/
public static boolean shouldIncludeForCompaction(Table table, String partitionPath, ContentFile<?> file) {
return !table.spec().isPartitioned() ||
partitionPath == null && file.specId() != table.spec().specId() ||
partitionPath != null &&
table.specs().get(file.specId()).partitionToPath(file.partition()).equals(partitionPath);
}

/**
* Returns table's list of data files as following:
* 1. If the table is unpartitioned, returns all data files.
* 2. If partitionPath is not provided, returns all data files that belong to the non-latest partition spec.
* 3. If partitionPath is provided, returns all data files that belong to the corresponding partition.
* @param table the iceberg table
* @param partitionPath partition path
*/
public static List<DataFile> getDataFiles(Table table, String partitionPath) {
CloseableIterable<FileScanTask> fileScanTasks =
table.newScan().useSnapshot(table.currentSnapshot().snapshotId()).ignoreResiduals().planFiles();
CloseableIterable<FileScanTask> filteredFileScanTasks =
CloseableIterable.filter(fileScanTasks, t -> {
DataFile file = t.asFileScanTask().file();
return shouldIncludeForCompaction(table, partitionPath, file);
});
return Lists.newArrayList(CloseableIterable.transform(filteredFileScanTasks, t -> t.file()));
}

/**
* Returns table's list of delete files as following:
* 1. If the table is unpartitioned, returns all delete files.
* 2. If partitionPath is not provided, returns all delete files that belong to the non-latest partition spec.
* 3. If partitionPath is provided, returns all delete files that belong to corresponding partition.
* @param table the iceberg table
* @param partitionPath partition path
*/
public static List<DeleteFile> getDeleteFiles(Table table, String partitionPath) {
Table deletesTable =
MetadataTableUtils.createMetadataTableInstance(table, MetadataTableType.POSITION_DELETES);
CloseableIterable<ScanTask> deletesScanTasks = deletesTable.newBatchScan().planFiles();
CloseableIterable<ScanTask> filteredDeletesScanTasks =
CloseableIterable.filter(deletesScanTasks, t -> {
DeleteFile file = ((PositionDeletesScanTask) t).file();
return shouldIncludeForCompaction(table, partitionPath, file);
});
return Lists.newArrayList(CloseableIterable.transform(filteredDeletesScanTasks,
t -> ((PositionDeletesScanTask) t).file()));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.mr.hive.compaction.evaluator;

import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntime;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntimeMeta;

public class HiveTableRuntime extends TableRuntime {

public HiveTableRuntime(TableRuntimeMeta tableRuntimeMeta) {
super(tableRuntimeMeta);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.iceberg.mr.hive.compaction.evaluator;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Collections;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.Table;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.mr.hive.compaction.IcebergCompactionUtil;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.CommonPartitionEvaluator;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.IcebergTableFileScanHelper;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.OptimizingConfig;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableConfiguration;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableFileScanHelper;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableFormat;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntime;
import org.apache.iceberg.mr.hive.compaction.evaluator.amoro.TableRuntimeMeta;
import org.apache.iceberg.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IcebergCompactionEvaluator {

private static final long LAST_OPTIMIZE_TIME = 0;
private static final int TRIGGER_INTERVAL = 0;

private IcebergCompactionEvaluator() {

}

private static final Logger LOG = LoggerFactory.getLogger(IcebergCompactionEvaluator.class);

public static boolean isEligibleForCompaction(Table icebergTable, String partitionPath,
CompactionType compactionType, HiveConf conf) {

if (icebergTable.currentSnapshot() == null) {
LOG.info("Table {}{} doesn't require compaction because it is empty", icebergTable,
partitionPath == null ? "" : " partition " + partitionPath);
return false;
}

CommonPartitionEvaluator partitionEvaluator = createCommonPartitionEvaluator(icebergTable, partitionPath, conf);

if (partitionEvaluator == null) {
return false;
}

switch (compactionType) {
case MINOR:
return partitionEvaluator.isMinorNecessary();
case MAJOR:
return partitionEvaluator.isFullNecessary() || partitionEvaluator.isMajorNecessary();
default:
return false;
}
}

private static TableRuntime createTableRuntime(Table icebergTable, HiveConf conf) {
long targetFileSizeBytes = HiveConf.getSizeVar(conf,
HiveConf.ConfVars.HIVE_ICEBERG_COMPACTION_TARGET_FILE_SIZE);

OptimizingConfig optimizingConfig = OptimizingConfig.parse(Collections.emptyMap());
optimizingConfig.setTargetSize(targetFileSizeBytes);
optimizingConfig.setFullTriggerInterval(TRIGGER_INTERVAL);
optimizingConfig.setMinorLeastInterval(TRIGGER_INTERVAL);

TableConfiguration tableConfig = new TableConfiguration();
tableConfig.setOptimizingConfig(optimizingConfig);

TableRuntimeMeta tableRuntimeMeta = new TableRuntimeMeta();
tableRuntimeMeta.setTableName(icebergTable.name());
tableRuntimeMeta.setFormat(TableFormat.ICEBERG);
tableRuntimeMeta.setLastFullOptimizingTime(LAST_OPTIMIZE_TIME);
tableRuntimeMeta.setLastMinorOptimizingTime(LAST_OPTIMIZE_TIME);
tableRuntimeMeta.setTableConfig(tableConfig);

return new HiveTableRuntime(tableRuntimeMeta);
}

private static CommonPartitionEvaluator createCommonPartitionEvaluator(Table table, String partitionPath,
HiveConf conf) {
TableRuntime tableRuntime = createTableRuntime(table, conf);

TableFileScanHelper tableFileScanHelper = new IcebergTableFileScanHelper(table,
table.currentSnapshot().snapshotId());
CommonPartitionEvaluator evaluator = null;
try (CloseableIterable<TableFileScanHelper.FileScanResult> results =
tableFileScanHelper.scan()) {
for (TableFileScanHelper.FileScanResult fileScanResult : results) {
DataFile file = fileScanResult.file();
if (IcebergCompactionUtil.shouldIncludeForCompaction(table, partitionPath, file)) {
PartitionSpec partitionSpec = table.specs().get(file.specId());
Pair<Integer, StructLike> partition = Pair.of(partitionSpec.specId(), fileScanResult.file().partition());

if (evaluator == null) {
evaluator = new CommonPartitionEvaluator(tableRuntime, partition, System.currentTimeMillis());
}

evaluator.addFile(fileScanResult.file(), fileScanResult.deleteFiles());
}
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return evaluator;
}
}
Loading

0 comments on commit 5146dba

Please sign in to comment.