Skip to content

Commit

Permalink
[Improve] jdbc-connector filterFunction improvement
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Jan 26, 2025
1 parent 0ed5a0f commit 9c60db2
Show file tree
Hide file tree
Showing 6 changed files with 6 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ object HBaseSourceApp extends FlinkStreaming {
new HBaseQuery("person", new Scan())
}
},
r => new String(r.getRow),
null
r => new String(r.getRow)
)

HBaseRequest(id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ object MongoSourceApp extends FlinkStreaming {
.append("updateTime", new BasicDBObject("$gte", DateUtils.parse(offset)))
d.find(cond)
},
_.toList.map(_.toJson()),
null
_.toList.map(_.toJson())
)
.print()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ object MySQLSourceApp extends FlinkStreaming {
val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else lastOne.timestamp
s"select * from t_order where timestamp > '$laseOffset' order by timestamp asc "
},
_.map(x => new Order(x("market_id").toString, x("timestamp").toString)),
null
_.map(x => new Order(x("market_id").toString, x("timestamp").toString))
)
.print()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class HBaseSource(
def getDataStream[R: TypeInformation](
query: R => HBaseQuery,
func: Result => R,
running: R => Boolean): DataStream[R] = {
running: R => Boolean = null): DataStream[R] = {

if (query == null) {
throw new NullPointerException("getDataStream error, SQLQueryFunction must not be null")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ class JdbcSource(
def getDataStream[R: TypeInformation](
sqlFun: R => String,
func: Iterable[Map[String, _]] => Iterable[R],
filter: R => Boolean): DataStream[R] = {
filter: R => Boolean = null): DataStream[R] = {
val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias)
if (property != null) {
jdbc.putAll(property)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class MongoSource(
collection: String,
queryFun: (R, MongoCollection[Document]) => FindIterable[Document],
resultFun: MongoCursor[Document] => List[R],
filter: R => Boolean)(implicit prop: Properties = new Properties()): DataStream[R] = {
filter: R => Boolean = null)(implicit prop: Properties = new Properties()): DataStream[R] = {

Utils.copyProperties(property, prop)
val mongoFun = new MongoSourceFunction[R](collection, prop, queryFun, resultFun, filter)
Expand Down

0 comments on commit 9c60db2

Please sign in to comment.