Skip to content

Commit

Permalink
address few minor comments and code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
codope committed Jan 6, 2025
1 parent 627d50b commit 6fd129b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 92 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ public static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(Hoodi
.filter(writeStat -> writeStat.getPath().endsWith(baseFileFormat.getFileExtension()))
.collect(Collectors.toList());
List<HoodieWriteStat> logFileWriteStats = writeStats.stream()
.filter(writeStat -> FSUtils.isLogFile(writeStat.getPath()))
.filter(writeStat -> FSUtils.isLogFile(new StoragePath(writeStats.get(0).getPath())))
.collect(Collectors.toList());
// Ensure that only one of base file or log file write stats exists
checkState(baseFileWriteStats.isEmpty() || logFileWriteStats.isEmpty(),
Expand Down Expand Up @@ -861,7 +861,7 @@ public static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(Hoodi
allRecords.addAll(deletedRecords);
return allRecords.iterator();
}
// No base file or log file write stats found
LOG.warn("No base file or log file write stats found for fileId: {}", fileId);
return Collections.emptyIterator();
});

Expand All @@ -886,7 +886,7 @@ public static HoodieData<HoodieRecord> convertMetadataToRecordIndexRecords(Hoodi
}

/**
* Get the deleted keys from the merged log files. The logic is as below. Suppose:
* Get the revived and deleted keys from the merged log files. The logic is as below. Suppose:
* <li>A = Set of keys that are valid (not deleted) in the previous log files merged</li>
* <li>B = Set of keys that are valid in all log files including current log file merged</li>
* <li>C = Set of keys that are deleted in the current log file</li>
Expand Down Expand Up @@ -921,6 +921,11 @@ public static Pair<Set<String>, Set<String>> getRevivedAndDeletedKeysFromMergedL
.collect(Collectors.toSet());
return Pair.of(Collections.emptySet(), deletedKeys);
}
return getRevivedAndDeletedKeys(dataTableMetaClient, instantTime, engineType, logFilePaths, finalWriterSchemaOpt, logFilePathsWithoutCurrentLogFiles);
}

private static Pair<Set<String>, Set<String>> getRevivedAndDeletedKeys(HoodieTableMetaClient dataTableMetaClient, String instantTime, EngineType engineType, List<String> logFilePaths,
Option<Schema> finalWriterSchemaOpt, List<String> logFilePathsWithoutCurrentLogFiles) {
// Fetch log records for all log files
Map<String, HoodieRecord> allLogRecords =
getLogRecords(logFilePaths, dataTableMetaClient, finalWriterSchemaOpt, instantTime, engineType);
Expand Down Expand Up @@ -1036,62 +1041,6 @@ public static HoodieData<HoodieRecord> reduceByKeys(HoodieData<HoodieRecord> rec
}, parallelism).values();
}

@VisibleForTesting
public static HoodieData<String> getRecordKeysDeletedOrUpdated(HoodieEngineContext engineContext,
HoodieCommitMetadata commitMetadata,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient,
String instantTime) {
List<HoodieWriteStat> allWriteStats = commitMetadata.getPartitionToWriteStats().values().stream()
.flatMap(Collection::stream).collect(Collectors.toList());

if (allWriteStats.isEmpty()) {
return engineContext.emptyHoodieData();
}

try {
int parallelism = Math.max(Math.min(allWriteStats.size(), metadataConfig.getRecordIndexMaxParallelism()), 1);
String basePath = dataTableMetaClient.getBasePath().toString();
HoodieFileFormat baseFileFormat = dataTableMetaClient.getTableConfig().getBaseFileFormat();
// SI cannot support logs having inserts with current offering. So, lets validate that.
if (allWriteStats.stream().anyMatch(writeStat -> {
String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath());
return FSUtils.isLogFile(fileName) && writeStat.getNumInserts() > 0;
})) {
throw new HoodieIOException("Secondary index cannot support logs having inserts with current offering. Can you drop secondary index.");
}

// we might need to set some additional variables if we need to process log files.
boolean anyLogFiles = allWriteStats.stream().anyMatch(writeStat -> {
String fileName = FSUtils.getFileName(writeStat.getPath(), writeStat.getPartitionPath());
return FSUtils.isLogFile(fileName);
});
Option<Schema> writerSchemaOpt = Option.empty();
if (anyLogFiles) {
writerSchemaOpt = tryResolveSchemaForTable(dataTableMetaClient);
}
int maxBufferSize = metadataConfig.getMaxReaderBufferSize();
StorageConfiguration storageConfiguration = dataTableMetaClient.getStorageConf();
Option<Schema> finalWriterSchemaOpt = writerSchemaOpt;
return engineContext.parallelize(allWriteStats, parallelism)
.flatMap(writeStat -> {
HoodieStorage storage = HoodieStorageUtils.getStorage(new StoragePath(writeStat.getPath()), storageConfiguration);
StoragePath fullFilePath = new StoragePath(dataTableMetaClient.getBasePath(), writeStat.getPath());
// handle base files
if (writeStat.getPath().endsWith(baseFileFormat.getFileExtension())) {
return BaseFileRecordParsingUtils.getRecordKeysDeletedOrUpdated(basePath, writeStat, storage).iterator();
} else if (FSUtils.isLogFile(fullFilePath)) {
// for logs, every entry is either an update or a delete
return getRecordKeys(Collections.singletonList(fullFilePath.toString()), dataTableMetaClient, finalWriterSchemaOpt, maxBufferSize, instantTime, true, true).iterator();
} else {
throw new HoodieIOException("Found unsupported file type " + fullFilePath + ", while generating MDT records");
}
});
} catch (Exception e) {
throw new HoodieException("Failed to fetch deleted record keys while preparing MDT records", e);
}
}

@VisibleForTesting
public static Set<String> getRecordKeys(List<String> logFilePaths, HoodieTableMetaClient datasetMetaClient,
Option<Schema> writerSchemaOpt, int maxBufferSize,
Expand Down Expand Up @@ -2407,15 +2356,15 @@ public static HoodieData<HoodieRecord> convertWriteStatsToSecondaryIndexRecords(
recordKeyToSecondaryKeyForPreviousFileSlice = Collections.emptyMap();
} else {
StoragePath previousBaseFile = previousFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
List<String> logFiles = previousFileSliceForFileId.getLogFiles().map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
List<String> logFiles = previousFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
recordKeyToSecondaryKeyForPreviousFileSlice =
getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFiles, tableSchema, partition, Option.ofNullable(previousBaseFile), indexDefinition, instantTime);
}
List<FileSlice> latestIncludingInflightFileSlices = getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(), partition);
FileSlice currentFileSliceForFileId = latestIncludingInflightFileSlices.stream().filter(fs -> fs.getFileId().equals(fileId)).findFirst()
.orElseThrow(() -> new HoodieException("Could not find any file slice for fileId " + fileId));
StoragePath currentBaseFile = currentFileSliceForFileId.getBaseFile().map(HoodieBaseFile::getStoragePath).orElse(null);
List<String> logFilesIncludingInflight = currentFileSliceForFileId.getLogFiles().map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
List<String> logFilesIncludingInflight = currentFileSliceForFileId.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).map(HoodieLogFile::getPath).map(StoragePath::toString).collect(Collectors.toList());
Map<String, String> recordKeyToSecondaryKeyForCurrentFileSlice =
getRecordKeyToSecondaryKey(dataMetaClient, engineType, logFilesIncludingInflight, tableSchema, partition, Option.ofNullable(currentBaseFile), indexDefinition, instantTime);
// Need to find what secondary index record should be deleted, and what should be inserted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,7 @@ public void testComputeRevivedAndDeletedKeys() {
// Test Input Sets
Set<String> validKeysForPreviousLogs = new HashSet<>(Arrays.asList("K1", "K2", "K3"));
Set<String> deletedKeysForPreviousLogs = new HashSet<>(Arrays.asList("K4", "K5"));
Set<String> validKeysForAllLogs = new HashSet<>(Arrays.asList("K2", "K3", "K6"));
Set<String> validKeysForAllLogs = new HashSet<>(Arrays.asList("K2", "K4", "K6")); // revived: K4, deleted: K1
Set<String> deletedKeysForAllLogs = new HashSet<>(Arrays.asList("K1", "K5", "K7"));

// Expected Results
Expand All @@ -608,7 +608,7 @@ public void testComputeRevivedAndDeletedKeys() {
assertEquals(expectedRevivedKeys, result.getKey());
assertEquals(expectedDeletedKeys, result.getValue());

// Case 1: All keys remain valid
// Case 1: All keys remain valid, just updates, no deletes or revives
Set<String> allValidKeys = new HashSet<>(Arrays.asList("K1", "K2", "K3"));
Set<String> allEmpty = Collections.emptySet();
result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, allValidKeys, allEmpty);
Expand All @@ -620,10 +620,10 @@ public void testComputeRevivedAndDeletedKeys() {
assertEquals(Collections.emptySet(), result.getKey());
assertEquals(allValidKeys, result.getValue());

// Case 3: New keys added in the current log file - K9
result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, new HashSet<>(Arrays.asList("K1", "K2", "K3", "K8")), new HashSet<>(Arrays.asList("K4", "K9")));
// Case 3: Delete K3
result = computeRevivedAndDeletedKeys(allValidKeys, allEmpty, new HashSet<>(Arrays.asList("K1", "K2")), new HashSet<>(Collections.singletonList("K3")));
assertEquals(Collections.emptySet(), result.getKey());
assertEquals(new HashSet<>(Arrays.asList("K4", "K9")), result.getValue());
assertEquals(new HashSet<>(Collections.singletonList("K3")), result.getValue());

// Case 4: Empty input sets
result = computeRevivedAndDeletedKeys(Collections.emptySet(), Collections.emptySet(), Collections.emptySet(), Collections.emptySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,9 @@
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
Expand Down Expand Up @@ -64,7 +61,6 @@

import static org.apache.hudi.metadata.HoodieTableMetadataUtil.convertMetadataToRecordIndexRecords;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeys;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.getRevivedAndDeletedKeysFromMergedLogs;
import static org.apache.hudi.metadata.HoodieTableMetadataUtil.reduceByKeys;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
Expand Down Expand Up @@ -264,19 +260,6 @@ public void testRecordGenerationAPIsForMOR() throws IOException {
assertNoWriteErrors(writeStatuses3);
assertRLIandSIRecordGenerationAPIs(inserts3, updates3, deletes3, writeStatuses3, finalCommitTime3, writeConfig);

// lets validate that if any log file contains inserts, fetching keys will fail.
HoodieWriteStat writeStat = writeStatuses3.get(1).getStat();
writeStat.setNumInserts(5);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(Collections.singletonList(writeStat), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit");

try {
getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3);
fail("Should not have reached here");
} catch (Exception e) {
// no op
}

// trigger compaction
Option<String> compactionInstantOpt = client.scheduleCompaction(Option.empty());
assertTrue(compactionInstantOpt.isPresent());
Expand Down Expand Up @@ -328,15 +311,6 @@ private void assertRLIandSIRecordGenerationAPIs(List<HoodieRecord> inserts3, Lis
assertListEquality(expectedRLIInserts, actualInserts);
assertListEquality(expectedRLIDeletes, actualDeletes);
assertListEquality(expectedUpatesAndDeletes, actualUpdatesAndDeletes);
HoodieCommitMetadata commitMetadata = CommitUtils.buildMetadata(writeStatuses3.stream().map(writeStatus -> writeStatus.getStat()).collect(Collectors.toList()), Collections.emptyMap(),
Option.empty(), WriteOperationType.UPSERT, writeConfig.getSchema(), "commit");

// validate HoodieTableMetadataUtil.getRecordKeysDeletedOrUpdated for entire CommitMetadata which is used in SI code path.
List<String> updatedOrDeletedKeys =
new ArrayList<>(getRecordKeysDeletedOrUpdated(context, commitMetadata, writeConfig.getMetadataConfig(), metaClient, finalCommitTime3).collectAsList());
List<String> expectedUpdatesOrDeletes = new ArrayList<>(expectedUpdates);
expectedUpdatesOrDeletes.addAll(expectedRLIDeletes);
assertListEquality(expectedUpatesAndDeletes, updatedOrDeletedKeys);
}

@Test
Expand Down

0 comments on commit 6fd129b

Please sign in to comment.