From 6fd129b2113254d3b8bed45147dcaa08b34f84ff Mon Sep 17 00:00:00 2001 From: Sagar Sumit Date: Mon, 6 Jan 2025 16:30:49 +0530 Subject: [PATCH] address few minor comments and code cleanup --- .../metadata/HoodieTableMetadataUtil.java | 71 +++---------------- .../metadata/TestHoodieTableMetadataUtil.java | 10 +-- ...tMetadataUtilRLIandSIRecordGeneration.java | 26 ------- 3 files changed, 15 insertions(+), 92 deletions(-) diff --git a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java index 3484bc78ffb2..ca6ffb7a53e4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java +++ b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java @@ -813,7 +813,7 @@ public static HoodieData convertMetadataToRecordIndexRecords(Hoodi .filter(writeStat -> writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) .collect(Collectors.toList()); List logFileWriteStats = writeStats.stream() - .filter(writeStat -> FSUtils.isLogFile(writeStat.getPath())) + .filter(writeStat -> FSUtils.isLogFile(new StoragePath(writeStats.get(0).getPath()))) .collect(Collectors.toList()); // Ensure that only one of base file or log file write stats exists checkState(baseFileWriteStats.isEmpty() || logFileWriteStats.isEmpty(), @@ -861,7 +861,7 @@ public static HoodieData convertMetadataToRecordIndexRecords(Hoodi allRecords.addAll(deletedRecords); return allRecords.iterator(); } - // No base file or log file write stats found + LOG.warn("No base file or log file write stats found for fileId: {}", fileId); return Collections.emptyIterator(); }); @@ -886,7 +886,7 @@ public static HoodieData convertMetadataToRecordIndexRecords(Hoodi } /** - * Get the deleted keys from the merged log files. The logic is as below. Suppose: + * Get the revived and deleted keys from the merged log files. The logic is as below. Suppose: *
  • A = Set of keys that are valid (not deleted) in the previous log files merged
  • *
  • B = Set of keys that are valid in all log files including current log file merged
  • *
  • C = Set of keys that are deleted in the current log file
  • @@ -921,6 +921,11 @@ public static Pair, Set> getRevivedAndDeletedKeysFromMergedL .collect(Collectors.toSet()); return Pair.of(Collections.emptySet(), deletedKeys); } + return getRevivedAndDeletedKeys(dataTableMetaClient, instantTime, engineType, logFilePaths, finalWriterSchemaOpt, logFilePathsWithoutCurrentLogFiles); + } + + private static Pair, Set> getRevivedAndDeletedKeys(HoodieTableMetaClient dataTableMetaClient, String instantTime, EngineType engineType, List logFilePaths, + Option finalWriterSchemaOpt, List logFilePathsWithoutCurrentLogFiles) { // Fetch log records for all log files Map allLogRecords = getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType); @@ -1036,62 +1041,6 @@ public static HoodieData reduceByKeys(HoodieData rec }, parallelism).values(); } - @VisibleForTesting - public static HoodieData getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext, - HoodieCommitMetadata commitMetadata, - HoodieMetadataConfig metadataConfig, - HoodieTableMetaClient dataTableMetaClient, - String instantTime) { - List allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream() - .flatMap(Collection::stream).collect(Collectors.toList()); - - if (allWriteStats.isEmpty()) { - return engineContext.emptyHoodieData(); - } - - try { - int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1); - String basePath = dataTableMetaClient.getBasePath().toString(); - HoodieFileFormat baseFileFormat = dataTableMetaClient.getTableConfig().getBaseFileFormat(); - // SI cannot support logs having inserts with current offering. So, lets validate that. - if (allWriteStats.stream().anyMatch(writeStat -> { - String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); - return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0; - })) { - throw new HoodieIOException("Secondary index cannot support logs having inserts with current offering. Can you drop secondary index."); - } - - // we might need to set some additional variables if we need to process log files. - boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> { - String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath()); - return FSUtils.isLogFile(fileName); - }); - Option writerSchemaOpt = Option.empty(); - if (anyLogFiles) { - writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient); - } - int maxBufferSize = metadataConfig.getMaxReaderBufferSize(); - StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf(); - Option finalWriterSchemaOpt = writerSchemaOpt; - return engineContext.parallelize(allWriteStats, parallelism) - .flatMap(writeStat -> { - HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration); - StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath()); - // handle base files - if (writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) { - return BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, storage).iterator(); - } else if (FSUtils.isLogFile(fullFilePath)) { - // for logs, every entry is either an update or a delete - return getRecordKeys(Collections.singletonList(fullFilePath.toString()), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime, true, true).iterator(); - } else { - throw new HoodieIOException("Found unsupported file type " + fullFilePath + ", while generating MDT records"); - } - }); - } catch (Exception e) { - throw new HoodieException("Failed to fetch deleted record keys while preparing MDT records", e); - } - } - @VisibleForTesting public static Set getRecordKeys(List logFilePaths, HoodieTableMetaClient datasetMetaClient, Option writerSchemaOpt, int maxBufferSize, @@ -2407,7 +2356,7 @@ public static HoodieData convertWriteStatsToSecondaryIndexRecords( recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap(); } else { StoragePath previousBaseFile = previousFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null); - List logFiles = previousFileSliceForFileId.getLogFiles().map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList()); + List logFiles = previousFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList()); recordKeyToSecondaryKeyForPreviousFileSlice = getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFiles, tableSchema, partition, Option.ofNullable(previousBaseFile), indexDefinition, instantTime); } @@ -2415,7 +2364,7 @@ public static HoodieData convertWriteStatsToSecondaryIndexRecords( FileSlice currentFileSliceForFileId = latestIncludingInflightFileSlices.stream().filter(fs -> fs.getFileId().equals(fileId)).findFirst() .orElseThrow(() -> new HoodieException("Could not find any file slice for fileId " + fileId)); StoragePath currentBaseFile = currentFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null); - List logFilesIncludingInflight = currentFileSliceForFileId.getLogFiles().map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList()); + List logFilesIncludingInflight = currentFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList()); Map recordKeyToSecondaryKeyForCurrentFileSlice = getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFilesIncludingInflight, tableSchema, partition, Option.ofNullable(currentBaseFile), indexDefinition, instantTime); // Need to find what secondary index record should be deleted, and what should be inserted. diff --git a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java index 5aadd3634d4d..7e04c6abe0c6 100644 --- a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java +++ b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java @@ -596,7 +596,7 @@ public void testComputeRevivedAndDeletedKeys() { // Test Input Sets Set validKeysForPreviousLogs = new HashSet<>(Arrays.asList("K1", "K2", "K3")); Set deletedKeysForPreviousLogs = new HashSet<>(Arrays.asList("K4", "K5")); - Set validKeysForAllLogs = new HashSet<>(Arrays.asList("K2", "K3", "K6")); + Set validKeysForAllLogs = new HashSet<>(Arrays.asList("K2", "K4", "K6")); // revived: K4, deleted: K1 Set deletedKeysForAllLogs = new HashSet<>(Arrays.asList("K1", "K5", "K7")); // Expected Results @@ -608,7 +608,7 @@ public void testComputeRevivedAndDeletedKeys() { assertEquals(expectedRevivedKeys, result.getKey()); assertEquals(expectedDeletedKeys, result.getValue()); - // Case 1: All keys remain valid + // Case 1: All keys remain valid, just updates, no deletes or revives Set allValidKeys = new HashSet<>(Arrays.asList("K1", "K2", "K3")); Set allEmpty = Collections.emptySet(); result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, allValidKeys, allEmpty); @@ -620,10 +620,10 @@ public void testComputeRevivedAndDeletedKeys() { assertEquals(Collections.emptySet(), result.getKey()); assertEquals(allValidKeys, result.getValue()); - // Case 3: New keys added in the current log file - K9 - result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, new HashSet<>(Arrays.asList("K1", "K2", "K3", "K8")), new HashSet<>(Arrays.asList("K4", "K9"))); + // Case 3: Delete K3 + result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, new HashSet<>(Arrays.asList("K1", "K2")), new HashSet<>(Collections.singletonList("K3"))); assertEquals(Collections.emptySet(), result.getKey()); - assertEquals(new HashSet<>(Arrays.asList("K4", "K9")), result.getValue()); + assertEquals(new HashSet<>(Collections.singletonList("K3")), result.getValue()); // Case 4: Empty input sets result = computeRevivedAndDeletedKeys(Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet()); diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java index 1e0a7915f097..d3b5e1471664 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/client/functional/TestMetadataUtilRLIandSIRecordGeneration.java @@ -29,12 +29,9 @@ import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; -import org.apache.hudi.common.model.HoodieWriteStat; -import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.TableSchemaResolver; -import org.apache.hudi.common.util.CommitUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieCompactionConfig; import org.apache.hudi.config.HoodieWriteConfig; @@ -64,7 +61,6 @@ import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeys; -import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRevivedAndDeletedKeysFromMergedLogs; import static org.apache.hudi.metadata.HoodieTableMetadataUtil.reduceByKeys; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; @@ -264,19 +260,6 @@ public void testRecordGenerationAPIsForMOR() throws IOException { assertNoWriteErrors(writeStatuses3); assertRLIandSIRecordGenerationAPIs(inserts3, updates3, deletes3, writeStatuses3, finalCommitTime3, writeConfig); - // lets validate that if any log file contains inserts, fetching keys will fail. - HoodieWriteStat writeStat = writeStatuses3.get(1).getStat(); - writeStat.setNumInserts(5); - HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.singletonList(writeStat), Collections.emptyMap(), - Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit"); - - try { - getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3); - fail("Should not have reached here"); - } catch (Exception e) { - // no op - } - // trigger compaction Option compactionInstantOpt = client.scheduleCompaction(Option.empty()); assertTrue(compactionInstantOpt.isPresent()); @@ -328,15 +311,6 @@ private void assertRLIandSIRecordGenerationAPIs(List inserts3, Lis assertListEquality(expectedRLIInserts, actualInserts); assertListEquality(expectedRLIDeletes, actualDeletes); assertListEquality(expectedUpatesAndDeletes, actualUpdatesAndDeletes); - HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatuses3.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList()), Collections.emptyMap(), - Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit"); - - // validate HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated for entire CommitMetadata which is used in SI code path. - List updatedOrDeletedKeys = - new ArrayList<>(getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3).collectAsList()); - List expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates); - expectedUpdatesOrDeletes.addAll(expectedRLIDeletes); - assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys); } @Test