Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/apache/inlong
Browse files Browse the repository at this point in the history
  • Loading branch information
haibo-duan committed Mar 15, 2024
2 parents 9044af7 + 58b2fc9 commit edc58b5
Show file tree
Hide file tree
Showing 259 changed files with 5,700 additions and 1,824 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void loadJsonResource(String fileName) {
loadResource(fileName, true);
}

void loadPropertiesResource(String fileName) {
public void loadPropertiesResource(String fileName) {
loadResource(fileName, false);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public String getTimeOffset() {
}

public String getTimeZone() {
return get(TaskConstants.TASK_FILE_TIME_ZONE);
return get(TaskConstants.TASK_TIME_ZONE);
}

public TaskStateEnum getState() {
Expand Down Expand Up @@ -121,9 +121,7 @@ public static TaskProfile parseJsonStr(String jsonStr) {
public boolean allRequiredKeyExist() {
return hasKey(TaskConstants.TASK_ID) && hasKey(TaskConstants.TASK_SOURCE)
&& hasKey(TaskConstants.TASK_SINK) && hasKey(TaskConstants.TASK_CHANNEL)
&& hasKey(TaskConstants.TASK_GROUP_ID) && hasKey(TaskConstants.TASK_STREAM_ID)
&& hasKey(TaskConstants.TASK_CYCLE_UNIT)
&& hasKey(TaskConstants.TASK_FILE_TIME_ZONE);
&& hasKey(TaskConstants.TASK_GROUP_ID) && hasKey(TaskConstants.TASK_STREAM_ID);
}

public String toJsonStr() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class CommonConstants {

public static final String PROXY_SENDER_MAX_TIMEOUT = "proxy.sender.maxTimeout";
// max timeout in seconds.
public static final int DEFAULT_PROXY_SENDER_MAX_TIMEOUT = 20;
public static final int DEFAULT_PROXY_SENDER_MAX_TIMEOUT = 60;

public static final String PROXY_SENDER_MAX_RETRY = "proxy.sender.maxRetry";
public static final int DEFAULT_PROXY_SENDER_MAX_RETRY = 5;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,41 +23,25 @@
public class FetcherConstants {

public static final String AGENT_FETCHER_INTERVAL = "agent.fetcher.interval";
public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 10;
public static final int DEFAULT_AGENT_FETCHER_INTERVAL = 60;

public static final String AGENT_HEARTBEAT_INTERVAL = "agent.heartbeat.interval";
public static final int DEFAULT_AGENT_HEARTBEAT_INTERVAL = 10;
public static final String AGENT_MANAGER_REQUEST_TIMEOUT = "agent.manager.request.timeout";
// default is 30s
public static final int DEFAULT_AGENT_MANAGER_REQUEST_TIMEOUT = 30;

// enable https
public static final String ENABLE_HTTPS = "enable.https";
public static final boolean DEFAULT_ENABLE_HTTPS = false;

// required config
public static final String AGENT_MANAGER_ADDR = "agent.manager.addr";

public static final String AGENT_MANAGER_VIP_HTTP_PATH = "agent.manager.vip.http.managerIp.path";
public static final String DEFAULT_AGENT_TDM_VIP_HTTP_PATH = "/agent/getManagerIpList";

public static final String AGENT_MANAGER_VIP_HTTP_PREFIX_PATH = "agent.manager.vip.http.prefix.path";
public static final String DEFAULT_AGENT_MANAGER_VIP_HTTP_PREFIX_PATH = "/inlong/manager/openapi";

public static final String AGENT_MANAGER_TASK_HTTP_PATH = "agent.manager.task.http.path";
public static final String DEFAULT_AGENT_MANAGER_TASK_HTTP_PATH = "/agent/reportAndGetTask";
public static final String DEFAULT_AGENT_MANAGER_CONFIG_HTTP_PATH = "/agent/getExistTaskConfig";

public static final String AGENT_MANAGER_IP_CHECK_HTTP_PATH = "agent.manager.vip.http.checkIP.path";
public static final String DEFAULT_AGENT_TDM_IP_CHECK_HTTP_PATH = "/fileAgent/confirmAgentIp";

public static final String AGENT_MANAGER_DBCOLLECT_GETTASK_HTTP_PATH = "agent.manager.dbcollect.gettask.http.path";
public static final String DEFAULT_AGENT_MANAGER_DBCOLLECTOR_GETTASK_HTTP_PATH = "/dbCollector/getTask";

public static final String AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH = "agent.manager.reportsnapshot.http.path";
public static final String DEFAULT_AGENT_MANAGER_REPORTSNAPSHOT_HTTP_PATH = "/agent/reportSnapshot";

public static final String AGENT_MANAGER_HEARTBEAT_HTTP_PATH = "agent.manager.heartbeat.http.path";
public static final String AGENT_MANAGER_HEARTBEAT_HTTP_PATH = "heartbeat.http.path";
public static final String DEFAULT_AGENT_MANAGER_HEARTBEAT_HTTP_PATH = "/heartbeat/report";

public static final String AGENT_HTTP_APPLICATION_JSON = "application/json";
Expand All @@ -67,20 +51,15 @@ public class FetcherConstants {
public static final String AGENT_MANAGER_RETURN_PARAM_IP = "ip";
public static final String AGENT_MANAGER_RETURN_PARAM_DATA = "data";

public static final String VERSION = "1.0";

public static final String AGENT_MANAGER_AUTH_SECRET_ID = "agent.manager.auth.secretId";
public static final String AGENT_MANAGER_AUTH_SECRET_KEY = "agent.manager.auth.secretKey";

public static final String AGENT_GLOBAL_READER_SOURCE_PERMIT = "agent.global.reader.source.permit";
public static final int DEFAULT_AGENT_GLOBAL_READER_SOURCE_PERMIT = 16 * 1000 * 1000;

public static final String AGENT_GLOBAL_READER_QUEUE_PERMIT = "agent.global.reader.queue.permit";
public static final int DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT = 16 * 1000 * 1000;

public static final String AGENT_GLOBAL_CHANNEL_PERMIT = "agent.global.channel.permit";
public static final int DEFAULT_AGENT_GLOBAL_CHANNEL_PERMIT = 16 * 1000 * 1000;
public static final int DEFAULT_AGENT_GLOBAL_READER_QUEUE_PERMIT = 128 * 1000 * 1000;

public static final String AGENT_GLOBAL_WRITER_PERMIT = "agent.global.writer.permit";
public static final int DEFAULT_AGENT_GLOBAL_WRITER_PERMIT = 96 * 1000 * 1000;
public static final int DEFAULT_AGENT_GLOBAL_WRITER_PERMIT = 128 * 1000 * 1000;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public class TaskConstants extends CommonConstants {
public static final String JOB_UUID = "job.uuid";
public static final String TASK_GROUP_ID = "task.groupId";
public static final String TASK_STREAM_ID = "task.streamId";
public static final String RESTORE_FROM_DB = "task.restoreFromDB";

public static final String TASK_SOURCE = "task.source";
public static final String JOB_SOURCE_TYPE = "job.sourceType";
Expand Down Expand Up @@ -64,9 +65,10 @@ public class TaskConstants extends CommonConstants {
public static final String TASK_DIR_FILTER_PATTERN = "task.fileTask.dir.pattern"; // deprecated
public static final String FILE_DIR_FILTER_PATTERNS = "task.fileTask.dir.patterns";
public static final String TASK_FILE_TIME_OFFSET = "task.fileTask.timeOffset";
public static final String TASK_FILE_TIME_ZONE = "task.fileTask.timeZone";
public static final String TASK_TIME_ZONE = "task.timeZone";
public static final String TASK_FILE_MAX_WAIT = "task.fileTask.file.max.wait";
public static final String TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
public static final String TASK_CYCLE_UNIT = "task.cycleUnit";
public static final String FILE_TASK_CYCLE_UNIT = "task.fileTask.cycleUnit";
public static final String TASK_FILE_TRIGGER_TYPE = "task.fileTask.collectType";
public static final String JOB_FILE_LINE_END_PATTERN = "job.fileTask.line.endPattern";
public static final String JOB_FILE_CONTENT_COLLECT_TYPE = "job.fileTask.contentCollectType";
Expand Down Expand Up @@ -106,14 +108,24 @@ public class TaskConstants extends CommonConstants {
public static final String JOB_DATABASE_PORT = "job.binlogJob.port";

// Kafka job
public static final String JOB_KAFKA_TOPIC = "job.kafkaJob.topic";
public static final String JOB_KAFKA_BOOTSTRAP_SERVERS = "job.kafkaJob.bootstrap.servers";
public static final String JOB_KAFKA_GROUP_ID = "job.kafkaJob.group.id";
public static final String TASK_KAFKA_TOPIC = "task.kafkaJob.topic";
public static final String TASK_KAFKA_BOOTSTRAP_SERVERS = "task.kafkaJob.bootstrap.servers";
public static final String TASK_KAFKA_GROUP_ID = "task.kafkaJob.group.id";
public static final String JOB_KAFKA_RECORD_SPEED_LIMIT = "job.kafkaJob.recordSpeed.limit";
public static final String JOB_KAFKA_BYTE_SPEED_LIMIT = "job.kafkaJob.byteSpeed.limit";
public static final String JOB_KAFKA_OFFSET = "job.kafkaJob.partition.offset";
public static final String TASK_KAFKA_OFFSET = "task.kafkaJob.partition.offset";
public static final String JOB_KAFKA_READ_TIMEOUT = "job.kafkaJob.read.timeout";
public static final String JOB_KAFKA_AUTO_COMMIT_OFFSET_RESET = "job.kafkaJob.autoOffsetReset";
public static final String TASK_KAFKA_AUTO_COMMIT_OFFSET_RESET = "task.kafkaJob.autoOffsetReset";

// Pulsar task
public static final String TASK_PULSAR_TENANT = "task.pulsarTask.tenant";
public static final String TASK_PULSAR_NAMESPACE = "task.pulsarTask.namespace";
public static final String TASK_PULSAR_TOPIC = "task.pulsarTask.topic";
public static final String TASK_PULSAR_SUBSCRIPTION = "task.pulsarTask.subscription";
public static final String TASK_PULSAR_SUBSCRIPTION_TYPE = "task.pulsarTask.subscriptionType";
public static final String TASK_PULSAR_SERVICE_URL = "task.pulsarTask.serviceUrl";
public static final String TASK_PULSAR_SUBSCRIPTION_POSITION = "task.pulsarTask.subscriptionPosition";
public static final String TASK_PULSAR_RESET_TIME = "task.pulsarTask.resetTime";

public static final String JOB_MONGO_HOSTS = "job.mongoJob.hosts";
public static final String JOB_MONGO_USER = "job.mongoJob.user";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CommonConstants;
import org.apache.inlong.agent.constant.TaskConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -63,7 +62,7 @@ public void storeTask(TaskProfile task) {
if (task.allRequiredKeyExist()) {
String keyName = getKeyByTaskId(task.getTaskId());
KeyValueEntity entity = new KeyValueEntity(keyName,
task.toJsonStr(), task.get(TaskConstants.FILE_DIR_FILTER_PATTERNS));
task.toJsonStr(), "");
db.put(entity);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ public class AuditUtils {
public static final int AUDIT_ID_AGENT_INSTANCE_MGR_HEARTBEAT = 30009;
public static final int AUDIT_ID_AGENT_INSTANCE_HEARTBEAT = 30010;
public static final int AUDIT_ID_AGENT_SEND_FAILED_REAL_TIME = 30011;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30013;
public static final int AUDIT_ID_AGENT_DEL_INSTANCE_MEM_UNUSUAL = 30014;
public static final int AUDIT_ID_AGENT_ADD_INSTANCE_MEM_FAILED = 30015;
public static final int AUDIT_ID_AGENT_TRY_SEND = 30020;
public static final int AUDIT_ID_AGENT_TRY_SEND_REAL_TIME = 30021;
public static final int AUDIT_ID_AGENT_SEND_EXCEPTION = 30022;
public static final int AUDIT_ID_AGENT_SEND_EXCEPTION_REAL_TIME = 30023;
public static final int AUDIT_ID_AGENT_RESEND = 30024;
public static final int AUDIT_ID_AGENT_RESEND_REAL_TIME = 30025;

private static boolean IS_AUDIT = true;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,6 @@ public static class FileTaskConfig {
// '1d' means one day after, '-1d' means one day before
// Null means from current timestamp
private String timeOffset;
// Asia/Shanghai
private String timeZone;
// For example: a=b&c=b&e=f
private String additionalAttr;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class KafkaJob {
private RecordSpeed recordSpeed;
private ByteSpeed byteSpeed;
private String autoOffsetReset;
private String partitionOffsets;

@Data
public static class Group {
Expand Down Expand Up @@ -70,5 +71,6 @@ public static class KafkaJobTaskConfig {
private String recordSpeedLimit;
private String byteSpeedLimit;
private String autoOffsetReset;
private String partitionOffsets;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.pojo;

import lombok.Data;

@Data
public class PulsarTask {

private String tenant;
private String namespace;
private String topic;
private String subscription;
private String subscriptionType;
private String serviceUrl;
private String subscriptionPosition;
private String resetTime;

@Data
public static class PulsarTaskConfig {

private String pulsarTenant;
private String namespace;
private String topic;
private String subscription;
private String subscriptionType;
private String serviceUrl;
private String subscriptionPosition;
private String resetTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@

import org.apache.inlong.agent.conf.AgentConfiguration;
import org.apache.inlong.agent.conf.TaskProfile;
import org.apache.inlong.agent.constant.CycleUnitType;
import org.apache.inlong.agent.pojo.FileTask.FileTaskConfig;
import org.apache.inlong.agent.pojo.FileTask.Line;
import org.apache.inlong.agent.pojo.PulsarTask.PulsarTaskConfig;
import org.apache.inlong.common.constant.MQType;
import org.apache.inlong.common.enums.TaskTypeEnum;
import org.apache.inlong.common.pojo.agent.DataConfig;
Expand All @@ -37,6 +39,8 @@
public class TaskProfileDto {

public static final String DEFAULT_FILE_TASK = "org.apache.inlong.agent.plugin.task.file.LogFileTask";
public static final String DEFAULT_KAFKA_TASK = "org.apache.inlong.agent.plugin.task.KafkaTask";
public static final String DEFAULT_PULSAR_TASK = "org.apache.inlong.agent.plugin.task.PulsarTask";
public static final String DEFAULT_CHANNEL = "org.apache.inlong.agent.plugin.channel.MemoryChannel";
public static final String MANAGER_JOB = "MANAGER_JOB";
public static final String DEFAULT_DATA_PROXY_SINK = "org.apache.inlong.agent.plugin.sinks.ProxySink";
Expand All @@ -55,6 +59,8 @@ public class TaskProfileDto {
* kafka source
*/
public static final String KAFKA_SOURCE = "org.apache.inlong.agent.plugin.sources.KafkaSource";
// pulsar source
public static final String PULSAR_SOURCE = "org.apache.inlong.agent.plugin.sources.PulsarSource";
/**
* PostgreSQL source
*/
Expand Down Expand Up @@ -146,9 +152,6 @@ private static FileTask getFileJob(DataConfig dataConfig) {
if (taskConfig.getTimeOffset() != null) {
fileTask.setTimeOffset(taskConfig.getTimeOffset());
}
if (taskConfig.getTimeZone() != null) {
fileTask.setTimeZone(taskConfig.getTimeZone());
}

if (taskConfig.getAdditionalAttr() != null) {
fileTask.setAddictiveString(taskConfig.getAdditionalAttr());
Expand Down Expand Up @@ -192,7 +195,7 @@ private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
bootstrap.setServers(kafkaJobTaskConfig.getBootstrapServers());
kafkaJob.setBootstrap(bootstrap);
KafkaJob.Partition partition = new KafkaJob.Partition();
partition.setOffset(dataConfigs.getSnapshot());
partition.setOffset(kafkaJobTaskConfig.getPartitionOffsets());
kafkaJob.setPartition(partition);
KafkaJob.Group group = new KafkaJob.Group();
group.setId(kafkaJobTaskConfig.getGroupId());
Expand All @@ -210,6 +213,23 @@ private static KafkaJob getKafkaJob(DataConfig dataConfigs) {
return kafkaJob;
}

private static PulsarTask getPulsarTask(DataConfig dataConfig) {
PulsarTaskConfig pulsarTaskConfig = GSON.fromJson(dataConfig.getExtParams(),
PulsarTaskConfig.class);
PulsarTask pulsarTask = new PulsarTask();

pulsarTask.setTenant(pulsarTaskConfig.getPulsarTenant());
pulsarTask.setNamespace(pulsarTaskConfig.getNamespace());
pulsarTask.setTopic(pulsarTaskConfig.getTopic());
pulsarTask.setSubscription(pulsarTaskConfig.getSubscription());
pulsarTask.setSubscriptionType(pulsarTaskConfig.getSubscriptionType());
pulsarTask.setServiceUrl(pulsarTaskConfig.getServiceUrl());
pulsarTask.setSubscriptionPosition(pulsarTaskConfig.getSubscriptionPosition());
pulsarTask.setResetTime(pulsarTaskConfig.getResetTime());

return pulsarTask;
}

private static PostgreSQLJob getPostgresJob(DataConfig dataConfigs) {
PostgreSQLJob.PostgreSQLJobConfig config = GSON.fromJson(dataConfigs.getExtParams(),
PostgreSQLJob.PostgreSQLJobConfig.class);
Expand Down Expand Up @@ -411,6 +431,8 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
task.setVersion(dataConfig.getVersion());
task.setState(dataConfig.getState());
task.setPredefinedFields(dataConfig.getPredefinedFields());
task.setCycleUnit(CycleUnitType.REAL_TIME);
task.setTimeZone(dataConfig.getTimeZone());

// set sink type
if (dataConfig.getDataReportType() == NORMAL_SEND_TO_DATAPROXY.ordinal()) {
Expand Down Expand Up @@ -443,16 +465,25 @@ public static TaskProfile convertToTaskProfile(DataConfig dataConfig) {
case FILE:
task.setTaskClass(DEFAULT_FILE_TASK);
FileTask fileTask = getFileJob(dataConfig);
task.setCycleUnit(fileTask.getCycleUnit());
task.setFileTask(fileTask);
task.setSource(DEFAULT_SOURCE);
profileDto.setTask(task);
break;
case KAFKA:
task.setTaskClass(DEFAULT_KAFKA_TASK);
KafkaJob kafkaJob = getKafkaJob(dataConfig);
task.setKafkaJob(kafkaJob);
task.setSource(KAFKA_SOURCE);
profileDto.setTask(task);
break;
case PULSAR:
task.setTaskClass(DEFAULT_PULSAR_TASK);
PulsarTask pulsarTask = getPulsarTask(dataConfig);
task.setPulsarTask(pulsarTask);
task.setSource(PULSAR_SOURCE);
profileDto.setTask(task);
break;
case POSTGRES:
PostgreSQLJob postgreSQLJob = getPostgresJob(dataConfig);
task.setPostgreSQLJob(postgreSQLJob);
Expand Down Expand Up @@ -519,10 +550,13 @@ public static class Task {
private String taskClass;
private String predefinedFields;
private Integer state;
private String cycleUnit;
private String timeZone;

private FileTask fileTask;
private BinlogJob binlogJob;
private KafkaJob kafkaJob;
private PulsarTask pulsarTask;
private PostgreSQLJob postgreSQLJob;
private OracleJob oracleJob;
private MongoJob mongoJob;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.inlong.agent.constant.TaskConstants.RESTORE_FROM_DB;

/**
* handle the instance created by task, including add, delete, update etc.
* the instance info is store in both db and memory.
Expand Down Expand Up @@ -339,6 +341,7 @@ private void restoreFromDb() {
if (state == InstanceStateEnum.DEFAULT) {
LOGGER.info("instance restoreFromDb addToMem state {} taskId {} instanceId {}", state, taskId,
profile.getInstanceId());
profile.setBoolean(RESTORE_FROM_DB, true);
addToMemory(profile);
} else {
LOGGER.info("instance restoreFromDb ignore state {} taskId {} instanceId {}", state, taskId,
Expand Down
Loading

0 comments on commit edc58b5

Please sign in to comment.