Skip to content

Commit

Permalink
[Improve] doris datastream-connector code style improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Jan 26, 2025
1 parent ecb01a1 commit dbc31b8
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,25 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/** DorisSinkFunction */
public class DorisSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {

private static final Logger LOGGER = LoggerFactory.getLogger(DorisSinkFunction.class);
private final Properties properties;
private final DorisSinkWriter dorisSinkWriter;
private final DorisConfig dorisConfig;
// state only works with `EXACTLY_ONCE`
private transient ListState<Map<String, DorisSinkBufferEntry>> checkpointedState;
private transient ListState<Map<String, DorisSinkBufferEntry>> checkpointState;
private transient Counter totalInvokeRowsTime;
private transient Counter totalInvokeRows;
private static final String COUNTER_INVOKE_ROWS_COST_TIME = "totalInvokeRowsTimeNs";
private static final String COUNTER_INVOKE_ROWS = "totalInvokeRows";

public DorisSinkFunction(StreamingContext context) {
this.properties = context.parameter().getProperties();
Properties properties = context.parameter().getProperties();
this.dorisConfig = new DorisConfig(properties);
this.dorisSinkWriter = new DorisSinkWriter(dorisConfig);
}
Expand All @@ -83,8 +83,8 @@ public void invoke(T value, SinkFunction.Context context) throws Exception {
|| null == data.getDataRows()) {
LOGGER.warn(
String.format(
" row data not fullfilled. {database: %s, table: %s, dataRows: %s}",
data.getDatabase(), data.getTable(), data.getDataRows()));
" row data not fulfilled. {database: %s, table: %s, dataRows: %s}",
data.getDatabase(), data.getTable(), Arrays.toString(data.getDataRows())));
return;
}
dorisSinkWriter.writeRecords(data.getDatabase(), data.getTable(), data.getDataRows());
Expand All @@ -110,7 +110,7 @@ public void close() throws Exception {
public void snapshotState(FunctionSnapshotContext context) throws Exception {
if (Semantic.EXACTLY_ONCE.equals(Semantic.of(dorisConfig.semantic()))) {
// save state
checkpointedState.add(dorisSinkWriter.getBufferedBatchMap());
checkpointState.add(dorisSinkWriter.getBufferedBatchMap());
flushPreviousState();
}
}
Expand All @@ -122,16 +122,16 @@ public void initializeState(FunctionInitializationContext context) throws Except
new ListStateDescriptor<>(
"buffered-rows",
TypeInformation.of(new TypeHint<Map<String, DorisSinkBufferEntry>>() {}));
checkpointedState = context.getOperatorStateStore().getListState(descriptor);
checkpointState = context.getOperatorStateStore().getListState(descriptor);
}
}

private void flushPreviousState() throws Exception {
// flush the batch saved at the previous checkpoint
for (Map<String, DorisSinkBufferEntry> state : checkpointedState.get()) {
for (Map<String, DorisSinkBufferEntry> state : checkpointState.get()) {
dorisSinkWriter.setBufferedBatchMap(state);
dorisSinkWriter.flush(null, true);
}
checkpointedState.clear();
checkpointState.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public class DorisSinkWriter implements Serializable {
private transient Counter totalFlushFailedTimes;

private final Map<String, DorisSinkBufferEntry> bufferMap = new ConcurrentHashMap<>();
private final Long timeout = 3000L;
private volatile boolean closed = false;
private volatile boolean flushThreadAlive = false;
private volatile Throwable flushException;
Expand Down Expand Up @@ -239,7 +238,8 @@ private void offer(DorisSinkBufferEntry bufferEntity) throws InterruptedExceptio
}

private boolean asyncFlush() throws Exception {
final DorisSinkBufferEntry flushData = flushQueue.poll(timeout, TimeUnit.MILLISECONDS);
long timeOut = 3000L;
final DorisSinkBufferEntry flushData = flushQueue.poll(timeOut, TimeUnit.MILLISECONDS);
if (flushData == null || flushData.getBatchCount() == 0) {
return true;
}
Expand Down Expand Up @@ -311,13 +311,13 @@ public synchronized void close() throws Exception {
private void checkFlushException() {
if (flushException != null) {
StackTraceElement[] stack = Thread.currentThread().getStackTrace();
for (int i = 0; i < stack.length; i++) {
for (StackTraceElement stackTraceElement : stack) {
LOG.info(
stack[i].getClassName()
stackTraceElement.getClassName()
+ "."
+ stack[i].getMethodName()
+ stackTraceElement.getMethodName()
+ " line:"
+ stack[i].getLineNumber());
+ stackTraceElement.getLineNumber());
}
throw new RuntimeException("Writing records to doris failed.", flushException);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,22 @@ public class DorisDelimiterParser {
private static final String HEX_STRING = "0123456789ABCDEF";

public static String parse(String sp) throws RuntimeException {
if (sp == null || sp.length() == 0) {
if (sp == null || sp.isEmpty()) {
throw new RuntimeException("Delimiter can't be empty");
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
String hexStr = getString(sp);
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
}

private static String getString(String sp) {
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
Expand All @@ -43,12 +53,7 @@ public static String parse(String sp) throws RuntimeException {
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
}
}
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
return hexStr;
}

private static byte[] hexStrToBytes(String hexStr) {
Expand Down

0 comments on commit dbc31b8

Please sign in to comment.