Skip to content

Commit

Permalink
[Bug] Scala TypeInformation type conversion bug fixed.
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Jan 26, 2025
1 parent 8ea7223 commit 1143f64
Show file tree
Hide file tree
Showing 29 changed files with 459 additions and 360 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ public RestResponse alertConfigsList() {

@DeleteMapping("/delete")
public RestResponse deleteAlertConfig(
@RequestParam("id") @NotNull(message = "config id must be not null") Long id) {
@RequestParam("id") @NotNull(message = "config id must not be null") Long id) {
boolean result = alertConfigService.deleteById(id);
return RestResponse.success(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ flink:
pipeline.name: streampark-quickstartApp
yarn.application.queue:
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
parallelism.default: 1
jobmanager.memory:
flink.size:
heap.size:
Expand All @@ -51,7 +51,7 @@ flink:
execution:
checkpointing:
mode: EXACTLY_ONCE
interval: 30s
interval: 10s
timeout: 10min
unaligned: false
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,32 +20,32 @@
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Entity;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

/**
* @author benjobs
*/
/** @author benjobs */
public class ClickhouseJavaApp {

public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStreamSource<Entity> source = context.getJavaEnv().addSource(new MyDataSource());

//2) async高性能写入
new ClickHouseSink(context).asyncSink(source, value ->
String.format("insert into test.orders(userId, siteId) values (%d,%d)", value.userId, value.siteId)
).setParallelism(1);
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStreamSource<Entity> source = context.getJavaEnv().addSource(new MyDataSource());

//3) jdbc方式写入
/**
*
* new ClickHouseSink(context).jdbcSink(source, bean ->
* String.format("insert into test.orders(userId, siteId) values (%d,%d)", bean.userId, bean.siteId)
* ).setParallelism(1);
*
*/
context.start();
}
// 2) async高性能写入
new ClickHouseSink(context)
.asyncSink(
source,
value ->
String.format(
"insert into test.orders(userId, siteId) values (%d,%d)",
value.userId, value.siteId))
.setParallelism(1);

// 3) jdbc方式写入
/**
* new ClickHouseSink(context).jdbcSink(source, bean -> String.format("insert into
* test.orders(userId, siteId) values (%d,%d)", bean.userId, bean.siteId) ).setParallelism(1);
*/
context.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,28 @@
package org.apache.streampark.flink.quickstart.connector;

import org.apache.streampark.flink.connector.doris.sink.DorisSink;
import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;

/**
* @author wudi
**/
/** @author wudi */
public class DorisJavaApp {

public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStream<String> source = new KafkaJavaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
.returns(String.class);
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStream<String> source =
new KafkaJavaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
.returns(String.class);

new DorisSink<String>(context).sink(source);
new DorisSink<String>(context).sink(source);

context.start();
}
context.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,17 @@
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Entity;

import org.apache.flink.streaming.api.datastream.DataStreamSource;

/**
* @author wudi
**/
/** @author wudi */
public class HttpJavaApp {

public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStreamSource<Entity> source = context.getJavaEnv().addSource(new MyDataSource());
new HttpSink(context).get(source.map(x -> String.format("http://www.qq.com?id=%d", x.userId)));
context.start();
}
public static void main(String[] args) {
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
StreamingContext context = new StreamingContext(envConfig);
DataStreamSource<Entity> source = context.getJavaEnv().addSource(new MyDataSource());
new HttpSink(context).get(source.map(x -> String.format("http://www.qq.com?id=%d", x.userId)));
context.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,42 +17,45 @@
package org.apache.streampark.flink.quickstart.connector;

import org.apache.streampark.common.util.JsonUtils;
import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
import org.apache.streampark.flink.connector.kafka.sink.KafkaJavaSink;
import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Behavior;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;

/**
* @author benjobs
*/
/** @author benjobs */
public class KafkaJavaApp {

public static void main(String[] args) {

StreamEnvConfig javaConfig = new StreamEnvConfig(args, (environment, parameterTool) -> {
//environment.getConfig().enableForceAvro();
System.out.println("environment argument set...");
});

StreamingContext context = new StreamingContext(javaConfig);

//1) 从 kafka 中读取数据
DataStream<Behavior> source = new KafkaJavaSource<String>(context)
.getDataStream()
.map((MapFunction<KafkaRecord<String>, Behavior>) value -> JsonUtils.read(value, Behavior.class));


// 2) 将数据写入其他 kafka 主题
new KafkaJavaSink<Behavior>(context)
.serializer((SerializationSchema<Behavior>) element -> JsonUtils.write(element).getBytes())
.sink(source);

context.start();
}

public static void main(String[] args) {

StreamEnvConfig javaConfig =
new StreamEnvConfig(
args,
(environment, parameterTool) -> {
// environment.getConfig().enableForceAvro();
System.out.println("environment argument set...");
});

StreamingContext context = new StreamingContext(javaConfig);

// 1) 从 kafka 中读取数据
DataStream<Behavior> source =
new KafkaJavaSource<String>(context)
.getDataStream()
.map(
(MapFunction<KafkaRecord<String>, Behavior>)
value -> JsonUtils.read(value, Behavior.class));

// 2) 将数据写入其他 kafka 主题
new KafkaJavaSink<Behavior>(context)
.serializer((SerializationSchema<Behavior>) element -> JsonUtils.write(element).getBytes())
.sink(source);

context.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,55 +16,58 @@
*/
package org.apache.streampark.flink.quickstart.connector;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.streampark.flink.connector.function.SQLQueryFunction;
import org.apache.streampark.flink.connector.function.SQLResultFunction;
import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Order;

import org.apache.flink.api.common.typeinfo.TypeInformation;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class MySQLJavaApp {

public static void main(String[] args) {

StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
public static void main(String[] args) {

StreamingContext context = new StreamingContext(envConfig);
StreamEnvConfig envConfig = new StreamEnvConfig(args, null);

//读取MySQL数据源
new JdbcJavaSource<Order>(context)
.getDataStream(
(SQLQueryFunction<Order>) lastOne -> {
//5秒抽取一次
Thread.sleep(3000);
Serializable lastOffset = lastOne == null ? "2020-10-10 23:00:00" : lastOne.getTimestamp();
return String.format(
"select * from t_order " +
"where timestamp > '%s' " +
"order by timestamp asc ",
lastOffset
);
},
(SQLResultFunction<Order>) map -> {
List<Order> result = new ArrayList<>();
map.forEach(item -> {
Order order = new Order();
order.setOrderId(item.get("order_id").toString());
order.setMarketId(item.get("market_id").toString());
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
result.add(order);
});
return result;
})
.returns(TypeInformation.of(Order.class))
.print("jdbc source: >>>>>");
StreamingContext context = new StreamingContext(envConfig);

context.start();
// 读取MySQL数据源
new JdbcJavaSource<Order>(context, Order.class)
.getDataStream(
(SQLQueryFunction<Order>)
lastOne -> {
// 5秒抽取一次
Thread.sleep(3000);
Serializable lastOffset =
lastOne == null ? "2020-10-10 23:00:00" : lastOne.getTimestamp();
return String.format(
"select * from t_order "
+ "where timestamp > '%s' "
+ "order by timestamp asc ",
lastOffset);
},
(SQLResultFunction<Order>)
map -> {
List<Order> result = new ArrayList<>();
map.forEach(
item -> {
Order order = new Order();
order.setOrderId(item.get("order_id").toString());
order.setMarketId(item.get("market_id").toString());
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
result.add(order);
});
return result;
})
.returns(TypeInformation.of(Order.class))
.print("jdbc source: >>>>>");

}
context.start();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

import lombok.Data;

/**
* @author benjobs
*/
/** @author benjobs */
@Data
public class Behavior {
private String user_id;
private Long item_id;
private Long category_id;
private String behavior;
private Long ts;
private String user_id;
private Long item_id;
private Long category_id;
private String behavior;
private Long ts;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,15 @@
*/
package org.apache.streampark.flink.quickstart.connector.bean;


/**
* @author benjobs
*/

/** @author benjobs */
public class Entity {

public Long userId;
public Long orderId;
public Long siteId;
public Long cityId;
public Integer orderStatus;
public Double price;
public Integer quantity;
public Long timestamp;

public Long userId;
public Long orderId;
public Long siteId;
public Long cityId;
public Integer orderStatus;
public Double price;
public Integer quantity;
public Long timestamp;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@

@Data
public class LogBean implements Serializable {
private String platenum;
private String cardType;
private Long inTime;
private Long outTime;
private String controlid;
private String platenum;
private String cardType;
private Long inTime;
private Long outTime;
private String controlid;
}
Loading

0 comments on commit 1143f64

Please sign in to comment.