-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SUPPORT]The Problem facing in using Java Client Func write into hudi #12485
Comments
What is the storage the Hudi table backed on, HDFS or Object Store? |
On HDFS. |
Is there any parquet related errors, not sure why the input stream had been closed. |
Could you please share the full source code on GitHub? We will try to reproduce the issue internally. |
|
I looked at all the error logs and there were no parquet related error messages printed in the log files, but there were many rollback commits on '.hoodie', the metadata information folder for hudi tables in HDFS. I put my log on my github and the link is: https://github.com/notAprogrammer-0/HBaseToHudi/blob/main/nohup.log. Thank you for trying to help me with this. |
I am sharing the sample code. Please use it to try to reproduce the issue. package com.ranga;
import org.apache.avro.generic.GenericRecord;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.*;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.client.HoodieJavaWriteClient;
import org.apache.hudi.client.common.HoodieJavaEngineContext;
public class Test12485 {
public static final String DEFAULT_FIRST_PARTITION_PATH = "2020/01/01";
public static final String DEFAULT_SECOND_PARTITION_PATH = "2020/01/02";
public static final String DEFAULT_THIRD_PARTITION_PATH = "2020/01/03";
public static final String[] DEFAULT_PARTITION_PATHS =
{DEFAULT_FIRST_PARTITION_PATH, DEFAULT_SECOND_PARTITION_PATH, DEFAULT_THIRD_PARTITION_PATH};
public static String TRIP_EXAMPLE_SCHEMA = "{\"type\": \"record\",\"name\": \"triprec\",\"fields\": [ "
+ "{\"name\": \"ts\",\"type\": \"long\"},{\"name\": \"uuid\", \"type\": \"string\"},"
+ "{\"name\": \"rider\", \"type\": \"string\"},{\"name\": \"driver\", \"type\": \"string\"},"
+ "{\"name\": \"begin_lat\", \"type\": \"double\"},{\"name\": \"begin_lon\", \"type\": \"double\"},"
+ "{\"name\": \"end_lat\", \"type\": \"double\"},{\"name\": \"end_lon\", \"type\": \"double\"},"
+ "{\"name\":\"fare\",\"type\": \"double\"}]}";
public static Schema avroSchema = new Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
private static final Random RAND = new Random(46474747);
private static String tableType = HoodieTableType.COPY_ON_WRITE.name();
public static void main(String[] args) throws Exception {
List<HoodieRecord<HoodieAvroPayload>> records = processHBaseData();
writeIntoHudi(records);
}
private static void writeIntoHudi(List<HoodieRecord<HoodieAvroPayload>> records) throws Exception {
String tablePath = "file:///tmp/my_test/";
String tableName = "test_table";
Configuration hadoopConf = new Configuration();
Path path = new Path(tablePath);
FileSystem fs = FSUtils.getFs(tablePath, hadoopConf);
if (!fs.exists(path)) {
HoodieTableMetaClient.withPropertyBuilder()
.setTableType(tableType)
.setTableName(tableName)
.setPayloadClassName(HoodieAvroPayload.class.getName())
.initTable(hadoopConf, tablePath);
}
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(avroSchema.toString()).withParallelism(10, 10)
.withDeleteParallelism(10).forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(40, 50).build()).build();
HoodieJavaWriteClient<HoodieAvroPayload> hoodieWriteClient = new HoodieJavaWriteClient<>(new HoodieJavaEngineContext(hadoopConf), cfg);
try {
String newCommitTime = hoodieWriteClient.startCommit();
List<HoodieRecord<HoodieAvroPayload>> recordsSoFar = new ArrayList<>(records);
List<HoodieRecord<HoodieAvroPayload>> writeRecords =
recordsSoFar.stream().map(r -> new HoodieAvroRecord<HoodieAvroPayload>(r)).collect(Collectors.toList());
hoodieWriteClient.upsert(writeRecords, newCommitTime);
} catch (Exception e) {
e.printStackTrace();
} finally {
hoodieWriteClient.close();
}
}
private static List<HoodieRecord<HoodieAvroPayload>> processHBaseData() {
List<HoodieRecord<HoodieAvroPayload>> records = new ArrayList<>();
for (String partitionPath : DEFAULT_PARTITION_PATHS) {
HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), partitionPath);
String commitTime = HoodieActiveTimeline.createNewInstantTime();
HoodieRecord<HoodieAvroPayload> record = generateUpdateRecord(key, commitTime);
records.add(record);
}
return records;
}
public static HoodieRecord<HoodieAvroPayload> generateUpdateRecord(HoodieKey key, String commitTime) {
return new HoodieAvroRecord<>(key, generateRandomValue(key, commitTime));
}
public static HoodieAvroPayload generateRandomValue(HoodieKey key, String commitTime) {
GenericRecord rec = generateGenericRecord(key.getRecordKey(), "rider-" + commitTime, "driver-" + commitTime, 0);
return new HoodieAvroPayload(Option.of(rec));
}
public static GenericRecord generateGenericRecord(String rowKey, String riderName, String driverName,
long timestamp) {
GenericRecord rec = new GenericData.Record(avroSchema);
rec.put("uuid", rowKey);
rec.put("ts", timestamp);
rec.put("rider", riderName);
rec.put("driver", driverName);
rec.put("begin_lat", RAND.nextDouble());
rec.put("begin_lon", RAND.nextDouble());
rec.put("end_lat", RAND.nextDouble());
rec.put("end_lon", RAND.nextDouble());
rec.put("fare", RAND.nextDouble() * 100);
return rec;
}
} |
I have changed the table type to "mor" and used a while (true) loop to reproduce the error. When it ran for about 1.5 hours and it went wrong, I put a file called error.log in my github and it recorded the error message. The modified program file (with the same name as the file you gave me) is also in there. The github link is:https://github.com/notAprogrammer-0/HBaseToHudi. |
The issue is not reproducible with the local file system, which leads me to suspect an HDFS-related problem. I propose examining the HDFS logs to identify any client disconnections or other relevant errors, excluding Hudi-specific issues. Application start time: 2025-01-02 14:56:51 2025-01-02 14:56:51 WARN NativeCodeLoader:60 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2025-01-02 14:56:51 INFO EmbeddedTimelineService:67 - Starting Timeline service !!
2025-01-02 14:56:51 WARN EmbeddedTimelineService:104 - Unable to find driver bind address from spark config
2025-01-02 14:56:51 INFO FileSystemViewManager:232 - Creating View Manager with storage type :MEMORY
2025-01-02 14:56:51 INFO FileSystemViewManager:244 - Creating in-memory based Table View
2025-01-02 14:56:51 INFO log:193 - Logging initialized @1368ms to org.apache.hudi.org.apache.jetty.util.log.Slf4jLog
..................
2025-01-02 20:16:43 INFO HoodieActiveTimeline:556 - Checking for file exists ?file:/tmp/my_test/.hoodie/20250102201639581.clean.requested
2025-01-02 20:16:43 INFO HoodieActiveTimeline:564 - Create new file for toInstant ?file:/tmp/my_test/.hoodie/20250102201639581.clean.inflight
2025-01-02 20:16:43 INFO CleanActionExecutor:133 - Using cleanerParallelism: 3
2025-01-02 20:16:44 INFO HoodieActiveTimeline:129 - Loaded instants upto : Option{val=[==>20250102201639581__clean__INFLIGHT]}
2025-01-02 20:16:44 INFO HoodieActiveTimeline:556 - Checking for file exists ?file:/tmp/my_test/.hoodie/20250102201639581.clean.inflight
2025-01-02 20:16:44 INFO HoodieActiveTimeline:564 - Create new file for toInstant ?file:/tmp/my_test/.hoodie/20250102201639581.clean
2025-01-02 20:16:44 INFO CleanActionExecutor:226 - Marked clean started on 20250102201639581 as complete |
Ok, I'll take care to check my HDFS logs, thanks for your help. |
Tips before filing an issue
Have you gone through our FAQs? Yes
Join the mailing list to engage in conversations and get faster support at [email protected].
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
I want to use the Java hudi client to get data from HBase and write it to Hudi. All the code worked perfectly the first time, but when I proceeded to send another attempt request, it had the exception listed below.
To Reproduce
Steps to reproduce the behavior:
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
Hudi version : 0.11.0
Spark version :
Hive version :
Hadoop version : 3.3.1
Storage (HDFS/S3/GCS..) :
Running on Docker? (yes/no) :
Additional context
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error.
The text was updated successfully, but these errors were encountered: