Skip to content

Commit

Permalink
Introduce AccessSupportRssChecker to reject the un-support applicatio…
Browse files Browse the repository at this point in the history
…n earlier
  • Loading branch information
maobaolong committed Dec 8, 2024
1 parent 4ce1aa8 commit 3d2c4df
Show file tree
Hide file tree
Showing 4 changed files with 170 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.uniffle.common.config.ConfigOptions;
import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.config.RssBaseConf;
import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker;
import org.apache.uniffle.coordinator.conf.ClientConfParser;
import org.apache.uniffle.coordinator.strategy.assignment.AbstractAssignmentStrategy;
import org.apache.uniffle.coordinator.strategy.assignment.AssignmentStrategyFactory;
Expand Down Expand Up @@ -92,7 +93,8 @@ public class CoordinatorConf extends RssBaseConf {
.asList()
.defaultValues(
"org.apache.uniffle.coordinator.access.checker.AccessClusterLoadChecker",
"org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker")
"org.apache.uniffle.coordinator.access.checker.AccessQuotaChecker",
AccessSupportRssChecker.class.getCanonicalName())
.withDescription("Access checkers");
public static final ConfigOption<Integer> COORDINATOR_ACCESS_CANDIDATES_UPDATE_INTERVAL_SEC =
ConfigOptions.key("rss.coordinator.access.candidates.updateIntervalSec")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.access.checker;

import java.io.IOException;

import org.apache.hadoop.io.serializer.JavaSerialization;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.access.AccessCheckResult;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

/**
* AccessSupportRssChecker checks whether the extra properties support rss, for example, the
* serializer is java, rss is not supported.
*/
public class AccessSupportRssChecker extends AbstractAccessChecker {
private static final Logger LOG = LoggerFactory.getLogger(AccessSupportRssChecker.class);

public AccessSupportRssChecker(AccessManager accessManager) throws Exception {
super(accessManager);
}

@Override
public AccessCheckResult check(AccessInfo accessInfo) {
String serializer = accessInfo.getExtraProperties().get("serializer");
if (JavaSerialization.class.getName().equals(serializer)) {
String msg = String.format("Denied by AccessSupportRssChecker, accessInfo[%s].", accessInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("serializer is {}, {}", serializer, msg);
}
CoordinatorMetrics.counterTotalSupportRssDeniedRequest.inc();
return new AccessCheckResult(false, msg);
}

return new AccessCheckResult(true, Constants.COMMON_SUCCESS_MESSAGE);
}

@Override
public void close() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class CoordinatorMetrics {
private static final String TOTAL_CANDIDATES_DENIED_REQUEST = "total_candidates_denied_request";
private static final String TOTAL_LOAD_DENIED_REQUEST = "total_load_denied_request";
private static final String TOTAL_QUOTA_DENIED_REQUEST = "total_quota_denied_request";
private static final String TOTAL_SUPPORT_RSS_DENIED_REQUEST = "total_support_rss_denied_request";
public static final String REMOTE_STORAGE_IN_USED_PREFIX = "remote_storage_in_used_";
public static final String APP_NUM_TO_USER = "app_num";
public static final String USER_LABEL = "user_name";
Expand All @@ -57,6 +58,7 @@ public class CoordinatorMetrics {
public static Counter counterTotalCandidatesDeniedRequest;
public static Counter counterTotalQuotaDeniedRequest;
public static Counter counterTotalLoadDeniedRequest;
public static Counter counterTotalSupportRssDeniedRequest;
public static final Map<String, Gauge> GAUGE_USED_REMOTE_STORAGE = JavaUtils.newConcurrentMap();

private static MetricsManager metricsManager;
Expand Down Expand Up @@ -118,5 +120,7 @@ private static void setUpMetrics() {
metricsManager.addCounter(TOTAL_CANDIDATES_DENIED_REQUEST);
counterTotalQuotaDeniedRequest = metricsManager.addCounter(TOTAL_QUOTA_DENIED_REQUEST);
counterTotalLoadDeniedRequest = metricsManager.addCounter(TOTAL_LOAD_DENIED_REQUEST);
counterTotalSupportRssDeniedRequest =
metricsManager.addCounter(TOTAL_SUPPORT_RSS_DENIED_REQUEST);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.uniffle.coordinator.checker;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.serializer.JavaSerialization;
import org.apache.hadoop.io.serializer.WritableSerialization;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import org.apache.uniffle.coordinator.AccessManager;
import org.apache.uniffle.coordinator.ApplicationManager;
import org.apache.uniffle.coordinator.ClusterManager;
import org.apache.uniffle.coordinator.CoordinatorConf;
import org.apache.uniffle.coordinator.SimpleClusterManager;
import org.apache.uniffle.coordinator.access.AccessInfo;
import org.apache.uniffle.coordinator.access.checker.AccessSupportRssChecker;
import org.apache.uniffle.coordinator.metric.CoordinatorMetrics;

import static org.apache.uniffle.coordinator.CoordinatorConf.COORDINATOR_ACCESS_CHECKERS;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;

public class AccessSupportRssCheckerTest {

@BeforeEach
public void setUp() {
CoordinatorMetrics.register();
}

@AfterEach
public void clear() {
CoordinatorMetrics.clear();
}

@Test
public void test() throws Exception {
ClusterManager clusterManager = mock(SimpleClusterManager.class);

CoordinatorConf conf = new CoordinatorConf();
conf.set(
COORDINATOR_ACCESS_CHECKERS,
Collections.singletonList(AccessSupportRssChecker.class.getName()));
Map<String, String> properties = new HashMap<>();

/** case1: check success when the serializer config is empty. */
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
AccessManager accessManager =
new AccessManager(
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
AccessSupportRssChecker checker =
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertTrue(checker.check(accessInfo).isSuccess());
}

/** case2: check failed when the serializer config is JavaSerialization. */
properties.put("serializer", JavaSerialization.class.getCanonicalName());
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
AccessManager accessManager =
new AccessManager(
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
AccessSupportRssChecker checker =
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertFalse(checker.check(accessInfo).isSuccess());
}

/** case3: check success when the serializer config is other than JavaSerialization. */
properties.put("serializer", WritableSerialization.class.getCanonicalName());
try (ApplicationManager applicationManager = new ApplicationManager(conf)) {
AccessManager accessManager =
new AccessManager(
conf, clusterManager, applicationManager.getQuotaManager(), new Configuration());
AccessSupportRssChecker checker =
(AccessSupportRssChecker) accessManager.getAccessCheckers().get(0);
AccessInfo accessInfo = new AccessInfo("test", new HashSet<>(), properties, "user");
assertTrue(checker.check(accessInfo).isSuccess());
}
}
}

0 comments on commit 3d2c4df

Please sign in to comment.