Skip to content

Commit

Permalink
[INLONG-8676][Manager] merge master and process json from es Api
Browse files Browse the repository at this point in the history
  • Loading branch information
haibo-duan committed Feb 20, 2024
2 parents 9ae8e68 + 9044af7 commit db74e56
Show file tree
Hide file tree
Showing 1,335 changed files with 76,581 additions and 15,457 deletions.
2 changes: 2 additions & 0 deletions .github/ISSUE_TEMPLATE/bug-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ body:
multiple: false
options:
- 'master'
- '1.11.0'
- '1.10.0'
- '1.9.0'
- '1.8.0'
- '1.7.0'
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/ci_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -129,3 +129,5 @@ jobs:
name: apache-inlong-${{ env.VERSION }}-sort-connectors-flink-v1.15.tar.gz
path: ./inlong-distribution/target/apache-inlong-${{ env.VERSION }}-sort-connectors-flink-v1.15.tar.gz

- name: Clean up build packages
run: mvn clean
3 changes: 3 additions & 0 deletions .github/workflows/ci_docker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -153,3 +153,6 @@ jobs:
env:
DOCKER_USER: ${{ secrets.DOCKER_USER }}
DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }}

- name: Clean up build packages
run: mvn clean
3 changes: 3 additions & 0 deletions .github/workflows/ci_ut.yml
Original file line number Diff line number Diff line change
Expand Up @@ -120,3 +120,6 @@ jobs:
name: failsafe-reports
path: ./**/target/failsafe-reports/
if-no-files-found: ignore

- name: Clean up build packages
run: mvn clean
7 changes: 5 additions & 2 deletions .github/workflows/ci_ut_flink15.yml
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ jobs:
CI: false

- name: Unit test for Flink 1.15 with Maven
run: mvn --update-snapshots -e -V test -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15
run: mvn --update-snapshots -e -V verify -pl :sort-core,:sort-end-to-end-tests-v1.15 -am -Pv1.15
env:
CI: false

Expand All @@ -78,4 +78,7 @@ jobs:
with:
name: failsafe-reports
path: ./**/target/failsafe-reports/
if-no-files-found: ignore
if-no-files-found: ignore

- name: Clean up build packages
run: mvn clean
3 changes: 3 additions & 0 deletions .github/workflows/codeql_analysis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,6 @@ jobs:
# Perform CodeQL Analysis
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@v2

- name: Clean up build packages
run: mvn clean
336 changes: 98 additions & 238 deletions CHANGES.md

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions bin/init-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@ init_inlong_agent() {
cd $INLONG_HOME/inlong-agent/conf
$SED_COMMAND 's/agent.local.ip=.*/'''agent.local.ip=${local_ip}'''/g' agent.properties
$SED_COMMAND 's/agent.http.port=.*/'''agent.http.port=${agent_port}'''/g' agent.properties
$SED_COMMAND 's/agent.manager.vip.http.host=.*/'''agent.manager.vip.http.host=${manager_server_hostname}'''/g' agent.properties
$SED_COMMAND 's/agent.manager.vip.http.port=.*/'''agent.manager.vip.http.port=${manager_server_port}'''/g' agent.properties
$SED_COMMAND 's/agent.manager.addr=.*/'''agent.manager.addr=http://${manager_server_hostname}:${manager_server_port}'''/g' agent.properties
$SED_COMMAND "s/audit.enable=.*$/audit.enable=true/g" agent.properties
$SED_COMMAND 's/audit.proxys=.*/'''audit.proxys=${audit_proxys_ip}:${audit_proxys_port}'''/g' agent.properties
}
Expand Down
2 changes: 1 addition & 1 deletion inlong-agent/agent-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
<parent>
<groupId>org.apache.inlong</groupId>
<artifactId>inlong-agent</artifactId>
<version>1.10.0-SNAPSHOT</version>
<version>1.12.0-SNAPSHOT</version>
</parent>

<artifactId>agent-common</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class AgentThreadFactory implements ThreadFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(AgentThreadFactory.class);

public static final String NAMED_THREAD_PLACEHOLDER = "running-thread";
public static final String NAMED_THREAD_PLACEHOLDER = "agent-thread-factory";

private final AtomicInteger mThreadNum = new AtomicInteger(1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public int getInt(String key, int defaultValue) {
public int getInt(String key) {
JsonElement value = configStorage.get(key);
if (value == null) {
LOGGER.error("null value for key " + key);
throw new NullPointerException("null value for key " + key);
}
return value.getAsInt();
Expand Down Expand Up @@ -231,6 +232,7 @@ public String get(String key, String defaultValue) {
public String get(String key) {
JsonElement value = configStorage.get(key);
if (value == null) {
LOGGER.error("null value for key " + key);
throw new NullPointerException("null value for key " + key);
}
return value.getAsString();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.agent.conf;

import org.apache.inlong.agent.constant.TaskConstants;
import org.apache.inlong.agent.utils.file.FileUtils;
import org.apache.inlong.common.enums.InstanceStateEnum;
import org.apache.inlong.common.pojo.dataproxy.DataProxyTopicInfo;
import org.apache.inlong.common.pojo.dataproxy.MQClusterInfo;

import com.google.common.collect.ComparisonChain;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.DEFAULT_PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_GROUP_ID;
import static org.apache.inlong.agent.constant.CommonConstants.PROXY_INLONG_STREAM_ID;
import static org.apache.inlong.agent.constant.TaskConstants.INSTANCE_STATE;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_ClUSTERS;
import static org.apache.inlong.agent.constant.TaskConstants.JOB_MQ_TOPIC;
import static org.apache.inlong.agent.constant.TaskConstants.TASK_RETRY;

/**
* job profile which contains details describing properties of one job.
*/
public class InstanceProfile extends AbstractConfiguration implements Comparable<InstanceProfile> {

private static final Logger LOGGER = LoggerFactory.getLogger(InstanceProfile.class);
private static final Gson GSON = new Gson();

/**
* parse json string to configuration instance.
*
* @return job configuration
*/
public static InstanceProfile parseJsonStr(String jsonStr) {
InstanceProfile conf = new InstanceProfile();
conf.loadJsonStrResource(jsonStr);
return conf;
}

public String toJsonStr() {
return GSON.toJson(getConfigStorage());
}

public void setInstanceClass(String className) {
set(TaskConstants.INSTANCE_CLASS, className);
}

public String getInstanceClass() {
return get(TaskConstants.INSTANCE_CLASS);
}

public String getTaskId() {
return get(TaskConstants.TASK_ID);
}

public String getInstanceId() {
return get(TaskConstants.INSTANCE_ID);
}

public String getSourceClass() {
return get(TaskConstants.TASK_SOURCE);
}

public String getSinkClass() {
return get(TaskConstants.TASK_SINK);
}

public InstanceStateEnum getState() {
int value = getInt(INSTANCE_STATE, InstanceStateEnum.DEFAULT.ordinal());
return InstanceStateEnum.getTaskState(value);
}

public void setState(InstanceStateEnum state) {
setInt(INSTANCE_STATE, state.ordinal());
}

public long getFileUpdateTime() {
return getLong(TaskConstants.FILE_UPDATE_TIME, 0);
}

public void setFileUpdateTime(long lastUpdateTime) {
setLong(TaskConstants.FILE_UPDATE_TIME, lastUpdateTime);
}

public String getPredefineFields() {
return get(TaskConstants.PREDEFINE_FIELDS, "");
}

public String getInlongGroupId() {
return get(PROXY_INLONG_GROUP_ID, DEFAULT_PROXY_INLONG_GROUP_ID);
}

public String getInlongStreamId() {
return get(PROXY_INLONG_STREAM_ID, DEFAULT_PROXY_INLONG_STREAM_ID);
}

@Override
public boolean allRequiredKeyExist() {
return hasKey(TaskConstants.FILE_UPDATE_TIME);
}

/**
* get MQClusterInfo list from config
*/
public List<MQClusterInfo> getMqClusters() {
List<MQClusterInfo> result = null;
String mqClusterStr = get(JOB_MQ_ClUSTERS);
if (StringUtils.isNotBlank(mqClusterStr)) {
result = GSON.fromJson(mqClusterStr, new TypeToken<List<MQClusterInfo>>() {
}.getType());
}
return result;
}

/**
* get mqTopic from config
*/
public DataProxyTopicInfo getMqTopic() {
DataProxyTopicInfo result = null;
String topicStr = get(JOB_MQ_TOPIC);
if (StringUtils.isNotBlank(topicStr)) {
result = GSON.fromJson(topicStr, DataProxyTopicInfo.class);
}
return result;
}

public void setCreateTime(Long time) {
setLong(TaskConstants.INSTANCE_CREATE_TIME, time);
}

public Long getCreateTime() {
return getLong(TaskConstants.INSTANCE_CREATE_TIME, 0);
}

public void setModifyTime(Long time) {
setLong(TaskConstants.INSTANCE_MODIFY_TIME, time);
}

public Long getModifyTime() {
return getLong(TaskConstants.INSTANCE_MODIFY_TIME, 0);
}

public void setInstanceId(String instanceId) {
set(TaskConstants.INSTANCE_ID, instanceId);
}

public void setSourceDataTime(String dataTime) {
set(TaskConstants.SOURCE_DATA_TIME, dataTime);
}

public String getSourceDataTime() {
return get(TaskConstants.SOURCE_DATA_TIME);
}

public void setSinkDataTime(Long dataTime) {
setLong(TaskConstants.SINK_DATA_TIME, dataTime);
}

public Long getSinkDataTime() {
return getLong(TaskConstants.SINK_DATA_TIME, 0);
}

@Override
public int compareTo(InstanceProfile object) {
int ret = ComparisonChain.start()
.compare(getSourceDataTime(), object.getSourceDataTime())
.compare(FileUtils.getFileCreationTime(getInstanceId()),
FileUtils.getFileCreationTime(object.getInstanceId()))
.compare(FileUtils.getFileLastModifyTime(getInstanceId()),
FileUtils.getFileLastModifyTime(object.getInstanceId()))
.result();
return ret;
}

public boolean isRetry() {
return getBoolean(TASK_RETRY, false);
}
}
Loading

0 comments on commit db74e56

Please sign in to comment.