Skip to content

Commit

Permalink
[Improve] jdbc-datastream-connector filterFunction improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Jan 26, 2025
1 parent dbc31b8 commit 0ed5a0f
Show file tree
Hide file tree
Showing 13 changed files with 146 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@
*/
package org.apache.streampark.flink.quickstart.connector;

import org.apache.streampark.flink.connector.function.SQLQueryFunction;
import org.apache.streampark.flink.connector.function.SQLResultFunction;
import org.apache.streampark.flink.connector.function.QueryFunction;
import org.apache.streampark.flink.connector.function.ResultFunction;
import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
Expand All @@ -40,7 +40,7 @@ public static void main(String[] args) {
// 读取MySQL数据源
new JdbcJavaSource<>(context, Order.class)
.getDataStream(
(SQLQueryFunction<Order>)
(QueryFunction<Order>)
lastOne -> {
// 5秒抽取一次
Thread.sleep(3000);
Expand All @@ -52,7 +52,7 @@ public static void main(String[] args) {
+ "order by timestamp asc ",
lastOffset);
},
(SQLResultFunction<Order>)
(ResultFunction<Order>)
map -> {
List<Order> result = new ArrayList<>();
map.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@
import java.io.Serializable;

@FunctionalInterface
public interface RunningFunction extends Serializable {
public interface FilterFunction<T> extends Serializable {

/**
* Is it running...
*
* @return Boolean: isRunning
*/
Boolean running();
/** filter function */
Boolean filter(T t);
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.Serializable;

@FunctionalInterface
public interface SQLQueryFunction<T> extends Serializable {
public interface QueryFunction<T> extends Serializable {
/**
* Get the SQL to query
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.Map;

@FunctionalInterface
public interface SQLResultFunction<T> extends Serializable {
public interface ResultFunction<T> extends Serializable {
/**
* The result of the search is returned as a Map, and the user can convert it into an object.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.streampark.flink.connector.hbase.source;

import org.apache.streampark.common.util.ConfigUtils;
import org.apache.streampark.flink.connector.function.RunningFunction;
import org.apache.streampark.flink.connector.function.FilterFunction;
import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction;
import org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction;
import org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction;
Expand Down Expand Up @@ -64,7 +64,7 @@ public DataStreamSource<T> getDataStream(
public DataStreamSource<T> getDataStream(
HBaseQueryFunction<T> queryFunction,
HBaseResultFunction<T> resultFunction,
RunningFunction runningFunc) {
FilterFunction<T> filterFunction) {

if (queryFunction == null) {
throw new NullPointerException("HBaseJavaSource error: query function cannot be null");
Expand All @@ -79,7 +79,7 @@ public DataStreamSource<T> getDataStream(

HBaseSourceFunction<T> sourceFunction =
new HBaseSourceFunction<>(
property, queryFunction, resultFunction, runningFunc, typeInformation);
property, queryFunction, resultFunction, filterFunction, typeInformation);
return context.getJavaEnv().addSource(sourceFunction);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.hbase.internal
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.connector.function.RunningFunction
import org.apache.streampark.flink.connector.function.FilterFunction
import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
import org.apache.streampark.flink.connector.hbase.function.{HBaseQueryFunction, HBaseResultFunction}
import org.apache.streampark.flink.util.FlinkUtils
Expand All @@ -47,8 +47,12 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
with Logger {

@volatile private[this] var running = true
private[this] var scalaRunningFunc: Unit => Boolean = _
private[this] var javaRunningFunc: RunningFunction = _
private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {

/** filter function */
override def filter(t: R): lang.Boolean = true
}

@transient private[this] var table: Table = _

Expand All @@ -69,32 +73,29 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
prop: Properties,
queryFunc: R => HBaseQuery,
resultFunc: Result => R,
runningFunc: Unit => Boolean) = {
filter: R => Boolean) = {

this(ApiType.scala, prop)
this.scalaQueryFunc = queryFunc
this.scalaResultFunc = resultFunc
this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc

if (filter != null) {
this.scalaFilterFunc = filter
}
}

// for JAVA
def this(
prop: Properties,
queryFunc: HBaseQueryFunction[R],
resultFunc: HBaseResultFunction[R],
runningFunc: RunningFunction) {
filter: FilterFunction[R]) {

this(ApiType.java, prop)
this.javaQueryFunc = queryFunc
this.javaResultFunc = resultFunc
this.javaRunningFunc =
if (runningFunc != null) runningFunc
else
new RunningFunction {
override def running(): lang.Boolean = true
}

if (filter != null) {
this.javaFilterFunc = filter
}
}

@throws[Exception]
Expand All @@ -106,40 +107,42 @@ class HBaseSourceFunction[R: TypeInformation](apiType: ApiType = ApiType.scala,
while (this.running) {
apiType match {
case ApiType.scala =>
if (scalaRunningFunc()) {
ctx.getCheckpointLock.synchronized {
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
query = scalaQueryFunc(last)
require(
query != null && query.getTable != null,
"[StreamPark] HBaseSource query and query's param table must not be null ")
table = query.getTable(prop)
table
.getScanner(query)
.foreach(
x => {
last = scalaResultFunc(x)
ctx.collectWithTimestamp(last, System.currentTimeMillis())
})
}
ctx.getCheckpointLock.synchronized {
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
query = scalaQueryFunc(last)
require(
query != null && query.getTable != null,
"[StreamPark] HBaseSource query and query's param table must not be null ")
table = query.getTable(prop)
table
.getScanner(query)
.foreach(
x => {
val r = scalaResultFunc(x)
if (scalaFilterFunc(r)) {
last = r
ctx.collectWithTimestamp(r, System.currentTimeMillis())
}
})
}
case ApiType.java =>
if (javaRunningFunc.running()) {
ctx.getCheckpointLock.synchronized {
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
query = javaQueryFunc.query(last)
require(
query != null && query.getTable != null,
"[StreamPark] HBaseSource query and query's param table must not be null ")
table = query.getTable(prop)
table
.getScanner(query)
.foreach(
x => {
last = javaResultFunc.result(x)
ctx.collectWithTimestamp(last, System.currentTimeMillis())
})
}
ctx.getCheckpointLock.synchronized {
// Returns the query object of the last (or recovered from checkpoint) query to the user, and the user constructs the conditions for the next query based on this.
query = javaQueryFunc.query(last)
require(
query != null && query.getTable != null,
"[StreamPark] HBaseSource query and query's param table must not be null ")
table = query.getTable(prop)
table
.getScanner(query)
.foreach(
x => {
val r = javaResultFunc.result(x)
if (javaFilterFunc.filter(r)) {
last = r
ctx.collectWithTimestamp(r, System.currentTimeMillis())
}
})
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class HBaseSource(
def getDataStream[R: TypeInformation](
query: R => HBaseQuery,
func: Result => R,
running: Unit => Boolean): DataStream[R] = {
running: R => Boolean): DataStream[R] = {

if (query == null) {
throw new NullPointerException("getDataStream error, SQLQueryFunction must not be null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
package org.apache.streampark.flink.connector.jdbc.source;

import org.apache.streampark.common.util.ConfigUtils;
import org.apache.streampark.flink.connector.function.RunningFunction;
import org.apache.streampark.flink.connector.function.SQLQueryFunction;
import org.apache.streampark.flink.connector.function.SQLResultFunction;
import org.apache.streampark.flink.connector.function.FilterFunction;
import org.apache.streampark.flink.connector.function.QueryFunction;
import org.apache.streampark.flink.connector.function.ResultFunction;
import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction;
import org.apache.streampark.flink.core.scala.StreamingContext;

Expand Down Expand Up @@ -59,30 +59,28 @@ public JdbcJavaSource<T> alias(String alias) {
}

public DataStreamSource<T> getDataStream(
SQLQueryFunction<T> queryFunction, SQLResultFunction<T> resultFunction) {
QueryFunction<T> queryFunction, ResultFunction<T> resultFunction) {
return getDataStream(queryFunction, resultFunction, null);
}

public DataStreamSource<T> getDataStream(
SQLQueryFunction<T> queryFunction,
SQLResultFunction<T> resultFunction,
RunningFunction runningFunc) {
QueryFunction<T> queryFunction, ResultFunction<T> resultFunction, FilterFunction<T> filter) {

if (queryFunction == null) {
throw new NullPointerException(
"JdbcJavaSource getDataStream error: SQLQueryFunction must not be null");
"JdbcJavaSource getDataStream error: QueryFunction must not be null");
}
if (resultFunction == null) {
throw new NullPointerException(
"JdbcJavaSource getDataStream error: SQLResultFunction must not be null");
"JdbcJavaSource getDataStream error: ResultFunction must not be null");
}

if (this.jdbc == null) {
this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias);
}

JdbcSourceFunction<T> sourceFunction =
new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction, runningFunc, typeInformation);
new JdbcSourceFunction<T>(jdbc, queryFunction, resultFunction, filter, typeInformation);
return context.getJavaEnv().addSource(sourceFunction);
}
}
Loading

0 comments on commit 0ed5a0f

Please sign in to comment.