diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java index 5018d24515..0d273d65e2 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkFunction.java @@ -39,6 +39,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; import java.util.Map; import java.util.Properties; @@ -46,18 +47,17 @@ public class DorisSinkFunction extends RichSinkFunction implements CheckpointedFunction { private static final Logger LOGGER = LoggerFactory.getLogger(DorisSinkFunction.class); - private final Properties properties; private final DorisSinkWriter dorisSinkWriter; private final DorisConfig dorisConfig; // state only works with `EXACTLY_ONCE` - private transient ListState> checkpointedState; + private transient ListState> checkpointState; private transient Counter totalInvokeRowsTime; private transient Counter totalInvokeRows; private static final String COUNTER_INVOKE_ROWS_COST_TIME = "totalInvokeRowsTimeNs"; private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows"; public DorisSinkFunction(StreamingContext context) { - this.properties = context.parameter().getProperties(); + Properties properties = context.parameter().getProperties(); this.dorisConfig = new DorisConfig(properties); this.dorisSinkWriter = new DorisSinkWriter(dorisConfig); } @@ -83,8 +83,8 @@ public void invoke(T value, SinkFunction.Context context) throws Exception { || null == data.getDataRows()) { LOGGER.warn( String.format( - " row data not fullfilled. {database: %s, table: %s, dataRows: %s}", - data.getDatabase(), data.getTable(), data.getDataRows())); + " row data not fulfilled. {database: %s, table: %s, dataRows: %s}", + data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows()))); return; } dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(), data.getDataRows()); @@ -110,7 +110,7 @@ public void close() throws Exception { public void snapshotState(FunctionSnapshotContext context) throws Exception { if (Semantic.EXACTLY_ONCE.equals(Semantic.of(dorisConfig.semantic()))) { // save state - checkpointedState.add(dorisSinkWriter.getBufferedBatchMap()); + checkpointState.add(dorisSinkWriter.getBufferedBatchMap()); flushPreviousState(); } } @@ -122,16 +122,16 @@ public void initializeState(FunctionInitializationContext context) throws Except new ListStateDescriptor<>( "buffered-rows", TypeInformation.of(new TypeHint>() {})); - checkpointedState = context.getOperatorStateStore().getListState(descriptor); + checkpointState = context.getOperatorStateStore().getListState(descriptor); } } private void flushPreviousState() throws Exception { // flush the batch saved at the previous checkpoint - for (Map state : checkpointedState.get()) { + for (Map state : checkpointState.get()) { dorisSinkWriter.setBufferedBatchMap(state); dorisSinkWriter.flush(null, true); } - checkpointedState.clear(); + checkpointState.clear(); } } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java index 160b320478..aa063bf1bf 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/internal/DorisSinkWriter.java @@ -59,7 +59,6 @@ public class DorisSinkWriter implements Serializable { private transient Counter totalFlushFailedTimes; private final Map bufferMap = new ConcurrentHashMap<>(); - private final Long timeout = 3000L; private volatile boolean closed = false; private volatile boolean flushThreadAlive = false; private volatile Throwable flushException; @@ -239,7 +238,8 @@ private void offer(DorisSinkBufferEntry bufferEntity) throws InterruptedExceptio } private boolean asyncFlush() throws Exception { - final DorisSinkBufferEntry flushData = flushQueue.poll(timeout, TimeUnit.MILLISECONDS); + long timeOut = 3000L; + final DorisSinkBufferEntry flushData = flushQueue.poll(timeOut, TimeUnit.MILLISECONDS); if (flushData == null || flushData.getBatchCount() == 0) { return true; } @@ -311,13 +311,13 @@ public synchronized void close() throws Exception { private void checkFlushException() { if (flushException != null) { StackTraceElement[] stack = Thread.currentThread().getStackTrace(); - for (int i = 0; i < stack.length; i++) { + for (StackTraceElement stackTraceElement : stack) { LOG.info( - stack[i].getClassName() + stackTraceElement.getClassName() + "." - + stack[i].getMethodName() + + stackTraceElement.getMethodName() + " line:" - + stack[i].getLineNumber()); + + stackTraceElement.getLineNumber()); } throw new RuntimeException("Writing records to doris failed.", flushException); } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java index 1c804b7ae4..50e9e4a664 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/java/org/apache/streampark/flink/connector/doris/util/DorisDelimiterParser.java @@ -24,12 +24,22 @@ public class DorisDelimiterParser { private static final String HEX_STRING = "0123456789ABCDEF"; public static String parse(String sp) throws RuntimeException { - if (sp == null || sp.length() == 0) { + if (sp == null || sp.isEmpty()) { throw new RuntimeException("Delimiter can't be empty"); } if (!sp.toUpperCase().startsWith("\\X")) { return sp; } + String hexStr = getString(sp); + // transform to separator + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } + + private static String getString(String sp) { String hexStr = sp.substring(2); // check hex str if (hexStr.isEmpty()) { @@ -43,12 +53,7 @@ public static String parse(String sp) throws RuntimeException { throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); } } - // transform to separator - StringWriter writer = new StringWriter(); - for (byte b : hexStrToBytes(hexStr)) { - writer.append((char) b); - } - return writer.toString(); + return hexStr; } private static byte[] hexStrToBytes(String hexStr) {