Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Duplicate records when delete checkpoint in Flink+Hudi table after multiple successful writes #12523

Open
xiearthur opened this issue Dec 19, 2024 · 12 comments
Labels
data-duplication feature-enquiry issue contains feature enquiries/requests or great improvement ideas flink Issues related to flink

Comments

@xiearthur
Copy link

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

A clear and concise description of the problem.

To Reproduce

Steps to reproduce the behavior:

  1. Currently conducting integration testing using Fling1.13.2 and Hudi 0.14, we need to verify the issues of Kafka duplicate consumption and Flink checkpoint loss in extreme cases
  2. Flink restarted normally multiple times, but Kafka was filled with duplicate data multiple times, but there were still no duplicates in Hudi
  3. When simulating the scenario of Kafka reset or checkpoint deletion for the fourth time, data duplication occurred

We have attempted to adjust parameters such as timeline server, index, write mode, parallelism, merge, and cleanup, but have not been able to resolve the issue

Summary:
The first three writes were completely normal:
First write: 102823 data entries
Second write: 198218 data entries
Third write: 198218 data entries
The fourth time (deleting checkpoint/resetting Kafka offset) encountered an exception:
Actual data: 198218 records
Write result: 348918 records (150700 additional duplicate data)
Key log analysis:
The fourth write resulted in two parallel file groups:
CopyfileId1: 65e2e9f7-ed93-49e6-9067-163e02c89f63 (original)
FileId2: e1ebc3c2-65d9-406a-9f9b-0d803c07e25a (new)
Writing process:
Copy first batch of writes: 338957 (69072 updates) ->use new fileId
Second batch of writes: 198218 entries (all updated) ->using old fileId

Additionally, data with the same primary key comes from two questions or file groups

We have attempted to adjust parameters such as timeline server, index, write mode, parallelism, merge, and cleanup, but have not been able to resolve the issue

public static void writeHudiDataStream(ParameterTool params, DataStream<RowData> dataStream, Map<String, String> schema, Logger logger) {
    // Get basic parameters
    String basePath = params.get("basepath");
    String tableName = params.get("tablename");
    String primaryKey = params.get("primarykey");
    String hoodieTableType = params.get("hoodie_table_type");
    String precombing = params.get("precombing");
    String partition = params.get("partition");

    // Add Hudi table options
    Map<String, String> options = new HashMap<>();
    options.put(FlinkOptions.PATH.key(), basePath + tableName);
    String name = hoodieTableType.equals("cow") ? 
        HoodieTableType.COPY_ON_WRITE.name() : HoodieTableType.MERGE_ON_READ.name();
    options.put(FlinkOptions.TABLE_TYPE.key(), name);
    
    // Write options
    options.put("hoodie.upsert.shuffle.parallelism", "200");
    options.put("hoodie.insert.shuffle.parallelism", "200");
    options.put(FlinkOptions.OPERATION.key(), WriteOperationType.UPSERT.value());
    options.put(FlinkOptions.PRECOMBINE_FIELD.key(), precombing);
    options.put(FlinkOptions.PRE_COMBINE.key(), "true");

    // Index options
    options.put(FlinkOptions.INDEX_GLOBAL_ENABLED.key(), "true");
    options.put("index.type", "GLOBAL_BLOOM");

    // Clean strategy
    options.put("hoodie.clean.automatic", "true");
    options.put("hoodie.cleaner.policy", "KEEP_LATEST_COMMITS");
    options.put("hoodie.cleaner.commits.retained", "10");
    options.put("hoodie.clean.async", "true");
    options.put("hoodie.clean.parallelism", "200");

    // Archive strategy
    options.put("hoodie.archive.min.commits", "20");
    options.put("hoodie.archive.max.commits", "30");
    options.put("hoodie.archive.parallelism", "200");

    // Compaction options
    options.put("hoodie.compact.inline", "false");
    options.put("hoodie.compact.inline.max.delta.commits", "1");
    options.put("hoodie.compact.schedule.inline", "true");
    options.put("hoodie.compact.max.delta.commits", "1");
    options.put(FlinkOptions.COMPACTION_ASYNC_ENABLED.key(), "true");
    options.put(FlinkOptions.COMPACTION_TRIGGER_STRATEGY.key(), "num_commits");
    options.put(FlinkOptions.COMPACTION_DELTA_COMMITS.key(), "5");
    options.put(FlinkOptions.COMPACTION_MAX_MEMORY.key(), "1024");

    // Concurrency control
    options.put("hoodie.write.concurrency.mode", "SINGLE_WRITER");
    options.put("hoodie.write.lock.provider", "org.apache.hudi.client.transaction.lock.InProcessLockProvider");
    options.put("hoodie.merge.allow.duplicate.on.inserts", "false");
    options.put("hoodie.combine.before.insert", "true");
    options.put("hoodie.combine.before.upsert", "true");

    // Write performance tuning
    options.put("hoodie.write.markers.type", "DIRECT");
    options.put("hoodie.write.status.storage.level", "MEMORY_AND_DISK_SER");
    options.put(FlinkOptions.WRITE_BATCH_SIZE.key(), "50000");
    options.put(FlinkOptions.WRITE_TASK_MAX_SIZE.key(), "1024");
    options.put(FlinkOptions.WRITE_RATE_LIMIT.key(), "10000");
    options.put(FlinkOptions.WRITE_COMMIT_ACK_TIMEOUT.key(), "120s");

    // Timeline service
    options.put("hoodie.embed.timeline.server", "true");
    options.put("hoodie.filesystem.view.type", "EMBEDDED_KV_STORE");
    options.put("hoodie.filesystem.view.remote.timeout.secs", "600");
    options.put("hoodie.filesystem.view.incr.timeline.sync.enable", "true");
    options.put("hoodie.filesystem.view.secondary.type", "SPILLABLE_DISK");
    options.put("hoodie.filesystem.view.spillable.mem", "409715200");

    // Build and execute
    HoodiePipeline.Builder builder = HoodiePipeline.builder(tableName);
    schema.forEach((key, value) -> builder.column(key + " " + value));
    builder.pk(primaryKey)
           .options(options)
           .sink(dataStream, false);
}

hoodie.properties:
#Updated at 2024-12-18T09:32:46.185Z
#Wed Dec 18 17:32:46 CST 2024
hoodie.table.keygenerator.class=org.apache.hudi.keygen.NonpartitionedAvroKeyGenerator
hoodie.table.precombine.field=process_time
hoodie.table.version=6
hoodie.database.name=default_database
hoodie.datasource.write.hive_style_partitioning=false
hoodie.table.metadata.partitions.inflight=
hoodie.table.checksum=219834916
hoodie.table.cdc.enabled=false
hoodie.archivelog.folder=archived
hoodie.table.name=irce_credit_info_mor_test_03
hoodie.compaction.payload.class=org.apache.hudi.common.model.EventTimeAvroPayload
hoodie.compaction.record.merger.strategy=eeb8d96f-b1e4-49fd-bbf8-28ac514178e5
hoodie.table.type=MERGE_ON_READ
hoodie.datasource.write.partitionpath.urlencode=false
hoodie.datasource.write.drop.partition.columns=false
hoodie.table.metadata.partitions=
hoodie.timeline.layout.version=1
hoodie.table.recordkey.fields=id

hudi cli :

##The first time
Run Flink task
Spark query count (*) has reached 198218 data points
Query Hive data for 102823 records

commits show
╔═══════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠═══════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20241218161120812 │ 142.9 MB │ 0 │ 1 │ 1 │ 95395 │ 95395 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161120239 │ 22.8 MB │ 1 │ 0 │ 1 │ 102823 │ 0 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161024176 │ 150.8 MB │ 0 │ 1 │ 1 │ 102823 │ 0 │ 0 ║
╚═══════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝
cleans show
╔═══════════╤═════════════════════════╤═════════════════════╤══════════════════╗
║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time Taken ║
╠═══════════╧═════════════════════════╧═════════════════════╧══════════════════╣
║ (empty) ║
╚══════════════════════════════════════════════════════════════════════════════╝

compactions show all
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20241218161120239 │ COMPLETED │ 1 ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝

##The second time

The time may be 16:11
Write the. txt data to the topic again
Querying Hive data for 198218 records
Search for hudi cli related information

commits show
╔═══════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠═══════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20241218163616318 │ 43.6 MB │ 0 │ 1 │ 1 │ 198218 │ 179543 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163514630 │ 17.8 MB │ 0 │ 1 │ 1 │ 12182 │ 12182 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163404017 │ 249.2 MB │ 0 │ 1 │ 1 │ 167361 │ 167361 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163403100 │ 43.5 MB │ 0 │ 1 │ 1 │ 198218 │ 18675 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161227103 │ 26.7 MB │ 0 │ 1 │ 1 │ 18675 │ 18675 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161120812 │ 142.9 MB │ 0 │ 1 │ 1 │ 95395 │ 95395 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161120239 │ 22.8 MB │ 1 │ 0 │ 1 │ 102823 │ 0 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161024176 │ 150.8 MB │ 0 │ 1 │ 1 │ 102823 │ 0 │ 0 ║
╚═══════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝

cleans show
╔═══════════╤═════════════════════════╤═════════════════════╤══════════════════╗
║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time Taken ║
╠═══════════╧═════════════════════════╧═════════════════════╧══════════════════╣
║ (empty) ║
╚══════════════════════════════════════════════════════════════════════════════╝

compactions show all
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20241218163616318 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218163403100 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218161120239 │ COMPLETED │ 1 ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝

##The third time
The time is 16:50

[root@bobcfc-sdcdh-un01 ~]# hdfs dfs -ls hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx
Found 11 items
-rw-r--r-- 3 hive hive 158089330 2024-12-18 16:11 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218161024176.log.1_0-1-0
-rw-r--r-- 3 hive hive 177831541 2024-12-18 16:34 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218161120239.log.1_0-1-0
-rw-r--r-- 3 hive hive 280037870 2024-12-18 16:36 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218163403100.log.1_0-1-0
-rw-r--r-- 3 hive hive 264809860 2024-12-18 16:53 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218163616318.log.1_0-1-0
-rw-r--r-- 3 hive hive 43221873 2024-12-18 16:54 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218165336103.log.1_0-1-0
drwxr-xr-x - hive hive 0 2024-12-18 16:55 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.hoodie
-rw-r--r-- 3 hive hive 96 2024-12-18 16:11 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.hoodie_partition_metadata
-rw-r--r-- 3 hive hive 23933255 2024-12-18 16:12 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218161120239.parquet
-rw-r--r-- 3 hive hive 45645067 2024-12-18 16:35 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218163403100.parquet
-rw-r--r-- 3 hive hive 45676402 2024-12-18 16:37 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218163616318.parquet
-rw-r--r-- 3 hive hive 45654029 2024-12-18 16:55 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218165336103.parquet

commits show
╔═══════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠═══════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20241218165336300 │ 41.2 MB │ 0 │ 1 │ 1 │ 27052 │ 27052 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218165336103 │ 43.5 MB │ 0 │ 1 │ 1 │ 198218 │ 171166 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163616516 │ 252.5 MB │ 0 │ 1 │ 1 │ 171166 │ 171166 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163616318 │ 43.6 MB │ 0 │ 1 │ 1 │ 198218 │ 179543 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163514630 │ 17.8 MB │ 0 │ 1 │ 1 │ 12182 │ 12182 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163404017 │ 249.2 MB │ 0 │ 1 │ 1 │ 167361 │ 167361 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163403100 │ 43.5 MB │ 0 │ 1 │ 1 │ 198218 │ 18675 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161227103 │ 26.7 MB │ 0 │ 1 │ 1 │ 18675 │ 18675 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161120812 │ 142.9 MB │ 0 │ 1 │ 1 │ 95395 │ 95395 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161120239 │ 22.8 MB │ 1 │ 0 │ 1 │ 102823 │ 0 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161024176 │ 150.8 MB │ 0 │ 1 │ 1 │ 102823 │ 0 │ 0 ║
╚═══════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝

cleans show
╔═══════════╤═════════════════════════╤═════════════════════╤══════════════════╗
║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time Taken ║
╠═══════════╧═════════════════════════╧═════════════════════╧══════════════════╣
║ (empty) ║
╚══════════════════════════════════════════════════════════════════════════════╝

compactions show all
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20241218165336103 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218163616318 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218163403100 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218161120239 │ COMPLETED │ 1 ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝

Fourth time (including duplicate data) 0033 17:11

Delete checkpoint/modify groupID and restart Flink task
The inquiry for Hive data shows 348918 data points, with 150700 duplicate data points
Search for hudi cli related information

commits show

╔═══════════════════╤═════════════════════╤═══════════════════╤═════════════════════╤══════════════════════════╤═══════════════════════╤══════════════════════════════╤══════════════╗
║ CommitTime │ Total Bytes Written │ Total Files Added │ Total Files Updated │ Total Partitions Written │ Total Records Written │ Total Update Records Written │ Total Errors ║
╠═══════════════════╪═════════════════════╪═══════════════════╪═════════════════════╪══════════════════════════╪═══════════════════════╪══════════════════════════════╪══════════════╣
║ 20241218171546059 │ 77.4 MB │ 0 │ 2 │ 1 │ 348918 │ 188257 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218171437259 │ 231.4 MB │ 0 │ 2 │ 1 │ 155032 │ 155032 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218171328561 │ 293.8 MB │ 0 │ 2 │ 1 │ 198218 │ 198218 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218171327532 │ 75.3 MB │ 1 │ 1 │ 1 │ 338957 │ 69072 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218171219664 │ 271.2 MB │ 0 │ 2 │ 1 │ 182759 │ 42020 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218165336300 │ 41.2 MB │ 0 │ 1 │ 1 │ 27052 │ 27052 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218165336103 │ 43.5 MB │ 0 │ 1 │ 1 │ 198218 │ 171166 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163616516 │ 252.5 MB │ 0 │ 1 │ 1 │ 171166 │ 171166 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163616318 │ 43.6 MB │ 0 │ 1 │ 1 │ 198218 │ 179543 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163514630 │ 17.8 MB │ 0 │ 1 │ 1 │ 12182 │ 12182 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163404017 │ 249.2 MB │ 0 │ 1 │ 1 │ 167361 │ 167361 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218163403100 │ 43.5 MB │ 0 │ 1 │ 1 │ 198218 │ 18675 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161227103 │ 26.7 MB │ 0 │ 1 │ 1 │ 18675 │ 18675 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161120812 │ 142.9 MB │ 0 │ 1 │ 1 │ 95395 │ 95395 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161120239 │ 22.8 MB │ 1 │ 0 │ 1 │ 102823 │ 0 │ 0 ║
╟───────────────────┼─────────────────────┼───────────────────┼─────────────────────┼──────────────────────────┼───────────────────────┼──────────────────────────────┼──────────────╢
║ 20241218161024176 │ 150.8 MB │ 0 │ 1 │ 1 │ 102823 │ 0 │ 0 ║
╚═══════════════════╧═════════════════════╧═══════════════════╧═════════════════════╧══════════════════════════╧═══════════════════════╧══════════════════════════════╧══════════════╝

cleans show
╔═══════════════════╤═════════════════════════╤═════════════════════╤══════════════════╗
║ CleanTime │ EarliestCommandRetained │ Total Files Deleted │ Total Time Taken ║
╠═══════════════════╪═════════════════════════╪═════════════════════╪══════════════════╣
║ 20241218171645948 │ 20241218163404017 │ 2 │ 109 ║
╟───────────────────┼─────────────────────────┼─────────────────────┼──────────────────╢
║ 20241218171426737 │ 20241218161120812 │ 1 │ 74 ║
╚═══════════════════╧═════════════════════════╧═════════════════════╧══════════════════╝

compactions show all
╔═════════════════════════╤═══════════╤═══════════════════════════════╗
║ Compaction Instant Time │ State │ Total FileIds to be Compacted ║
╠═════════════════════════╪═══════════╪═══════════════════════════════╣
║ 20241218171546059 │ COMPLETED │ 2 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218171327532 │ COMPLETED │ 2 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218165336103 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218163616318 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218163403100 │ COMPLETED │ 1 ║
╟─────────────────────────┼───────────┼───────────────────────────────╢
║ 20241218161120239 │ COMPLETED │ 1 ║
╚═════════════════════════╧═══════════╧═══════════════════════════════╝

all hdfs file

[root@bobcfc-sdcdh-un01 ~]# hdfs dfs -du -s -h hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/*
267.1 M 801.2 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218163403100.log.1_0-1-0
252.5 M 757.6 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218163616318.log.1_0-1-0
102.0 M 305.9 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218165336103.log.1_0-1-0
76.8 M 230.5 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.65e2e9f7-ed93-49e6-9067-163e02c89f63_20241218171327532.log.1_0-1-0
210.5 M 631.5 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.e1ebc3c2-65d9-406a-9f9b-0d803c07e25a_20241218171219664.log.1_0-1-0
448.3 M 1.3 G hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.e1ebc3c2-65d9-406a-9f9b-0d803c07e25a_20241218171327532.log.1_0-1-0
168.8 K 506.3 K hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.hoodie
96 288 hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/.hoodie_partition_metadata
43.5 M 130.6 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218163403100.parquet
43.6 M 130.7 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218163616318.parquet
43.5 M 130.6 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218165336103.parquet
43.5 M 130.6 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218171327532.parquet
43.5 M 130.6 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/65e2e9f7-ed93-49e6-9067-163e02c89f63_0-1-0_20241218171546059.parquet
31.8 M 95.4 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/e1ebc3c2-65d9-406a-9f9b-0d803c07e25a_0-1-0_20241218171327532.parquet
33.8 M 101.5 M hdfs://nameservice1/user/hive/warehouse/ods.db/xxxxx/e1ebc3c2-65d9-406a-9f9b-0d803c07e25a_0-1-0_20241218171546059.parquet

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 0.14

  • flink version : 1.13

  • Hive version : 3.1.1

  • Hadoop version : 3.1

  • Storage (HDFS/S3/GCS..) :

  • Running on Docker? (yes/no) : no

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

@xiearthur xiearthur changed the title Duplicate records when delete checkpoint in Flink+Hudi table after multiple successful writes [SUPPORT] Duplicate records when delete checkpoint in Flink+Hudi table after multiple successful writes Dec 19, 2024
@danny0405
Copy link
Contributor

Thanks for the feedback @xiearthur , it looks like you are using the FLINK_STATE index where the checkpoint is required for storing the index items. Once the index got removed, an UPSERT message could be mistakenly deemed as INSERT so duplication occurs.

We have another index type BUCKET which does not have this index storage, it utilities a hashing algorithm for mapping the records to fixed number of buckets, maybe you should try this one.

@danny0405 danny0405 added flink Issues related to flink data-duplication feature-enquiry issue contains feature enquiries/requests or great improvement ideas labels Dec 20, 2024
@github-project-automation github-project-automation bot moved this to ⏳ Awaiting Triage in Hudi Issue Support Dec 20, 2024
@xiearthur
Copy link
Author

In fact, I have used it before, but because we designed a large amount of historical data and migrated it using Spark, there was a problem of bucket overlap when using buckets. For example, Spark migrated ten buckets in the history (bulk_insert), but when using Flink to connect to real-time, it would generate ten more buckets. They all used simple buckets

@danny0405
Copy link
Contributor

For example, Spark migrated ten buckets in the history (bulk_insert), but when using Flink to connect to real-time, it would generate ten more buckets

There should be some mis-configured options, let's figure it out first.

@xiearthur
Copy link
Author

Title: Flink-Hudi COW Table Write Fails with Bucket Index While MOR Works Fine
Description:
We encountered an issue with Flink-Hudi when writing to COW tables using bucket index. The write operation fails during checkpoint while the same configuration works perfectly with MOR tables.
Issue Details:

The Flink job generates parquet files in buckets but fails to commit them
Files under .hoodie directory show rollback operations
Unable to read the data written by Flink
Generated parquet files disappear after job restart

Error Log:
CopyIOException: Could not perform checkpoint 2 for operator Bucket_write: default_database.irce_credit_info_mor_test -> Sink: clean_commits (1/1)
Stack Trace:
Copyat org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Configuration Used:
javaCopyoptions.put("index.type", "BUCKET");
options.put("hoodie.bucket.index.num.buckets", "10");
options.put("hoodie.index.bucket.engine", "SIMPLE");
Observed Behavior:

The job initiates and creates parquet files in the configured buckets
During checkpoint (specifically checkpoint 2), the operation fails
A rollback operation is triggered
All previously written data becomes inaccessible
After job restart, the generated parquet files are removed

Expected Behavior:
The Flink job should successfully write and commit data to COW table using bucket index, similar to how it works with MOR tables.
Key Points:

This issue only occurs with COW tables
The same configuration works correctly with MOR tables
The failure happens consistently during checkpoint operations


Additional Information:

No data loss is observed in MOR tables with identical configuration
The issue appears to be specific to the interaction between COW tables and bucket index during checkpoint phase

Questions:

Is this a known limitation of COW tables with bucket index?
Are there any workarounds or alternative configurations recommended for COW tables?
Are there specific checkpoint configurations that might help resolve this issue?



@danny0405
Copy link
Contributor

Is this a known limitation of COW tables with bucket index?

No, I guess there should be some parquet jar conflicts that induce the error, let's collect more clues here.

@xiearthur
Copy link
Author

Currently, under the same configuration, the performance of mor is normal

@danny0405
Copy link
Contributor

Currently, under the same configuration, the performance of mor is normal

There might be issues if the compaction triggers.

@xiearthur
Copy link
Author

What information do you need? We tried batch migration without using Spark and used an empty hudi table with buckets and cows. Writing was not a problem, but as long as it was combined with Spark's batch migration, it could not be written. The message in the Flink log is:
Error Log:
CopyIOException: Could not perform checkpoint 2 for operator Bucket_write: default_database.irce_credit_info_mor_test -> Sink: clean_commits (1/1)
Stack Trace:
Copyat org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)

@danny0405
Copy link
Contributor

Did you have concurrent writers? can you paste the "caused by" part of the error strack trace.

@xiearthur
Copy link
Author

2024-12-25 16:02:51,393 INFO org.apache.hudi.client.transaction.TransactionManager [] - Transaction manager closed
2024-12-25 16:02:51,399 WARN org.apache.flink.runtime.taskmanager.Task [] - bucket_write: default_database.irce_credit_info_test -> Sink: clean_commits (1/1)#1 (207da83f04d3cb1bdff010184f72ffb8) switched from RUNNING to FAILED with failure cause: java.io.IOException: Could not perform checkpoint 5 for operator bucket_write: default_database.irce_credit_info_test -> Sink: clean_commits (1/1)#1.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1045)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:135)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:250)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:61)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:431)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:61)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:227)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:180)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:158)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:423)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:204)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:681)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeInvoke(StreamTask.java:636)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:647)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:620)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:779)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 5 for operator bucket_write: default_database.irce_credit_info_test -> Sink: clean_commits (1/1)#1. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:264)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:169)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:371)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:706)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:627)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:590)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:312)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$8(StreamTask.java:1089)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1073)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1029)
... 19 more
Caused by: org.apache.hudi.exception.HoodieException: Timeout(301000ms) while waiting for instant initialize
at org.apache.hudi.sink.utils.TimeWait.waitFor(TimeWait.java:57)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.instantToWrite(AbstractStreamWriteFunction.java:269)
at org.apache.hudi.sink.StreamWriteFunction.flushRemaining(StreamWriteFunction.java:452)
at org.apache.hudi.sink.StreamWriteFunction.snapshotState(StreamWriteFunction.java:137)
at org.apache.hudi.sink.bucket.BucketStreamWriteFunction.snapshotState(BucketStreamWriteFunction.java:101)
at org.apache.hudi.sink.common.AbstractStreamWriteFunction.snapshotState(AbstractStreamWriteFunction.java:167)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:89)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:218)
... 29 more

@danny0405
Copy link
Contributor

Timeout(301000ms) while waiting for instant initialize

Do you have multiple sub-pipelines in one workload?

@xiearthur
Copy link
Author

Thank you for your input. Regarding the sub-pipeline issue, upon review, we confirmed that we do not utilize multiple sub-pipelines in our data processing flow. Addressing the checkpoint timeout and failure in our Flink job, we have made adjustments by setting the checkpoint timeout to 10 minutes and increasing the write size parallelism to 10. These changes have led to successful data writes.

We identified that the previous timeouts were due to the incomplete merging of Parquet files within a single checkpoint interval. We are currently using a Simple Bucketing strategy with a Copy-on-Write (COW) table.

Our current bucket size ranges from 300MB to 800MB. It appears that the number of buckets might be insufficient, leading to oversized individual buckets, which could be impacting the efficiency of the merge operation.

As for the optimal bucket size, it typically depends on the data characteristics and system resources. A general guideline is to keep individual bucket sizes between 100MB to 500MB to balance processing efficiency and resource usage. We will further adjust the number of buckets to optimize our data processing.

We will continue to monitor system performance and make necessary adjustments as needed. If you have any further suggestions or questions, please feel free to reach out.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
data-duplication feature-enquiry issue contains feature enquiries/requests or great improvement ideas flink Issues related to flink
Projects
Status: Awaiting Triage
Development

No branches or pull requests

2 participants