From 5bcabc2ffb0efba03b833df9c7488044df07eac1 Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Wed, 23 Sep 2015 17:33:10 +0900 Subject: [PATCH 1/5] Add EM plan and optimizer packages. - The plan package contains an API for defining and executing an EM configuration change. - The optimizer package contains an API for creating plans based on parameters given by the Evaluators' status. - Frameworks (e.g., Dolphin) can use both packages to optimize EM's memory storage layout (e.g., to improve job completion time). --- .../services/em/optimizer/api/DataInfo.java | 32 ++ .../em/optimizer/api/EvaluatorParameters.java | 33 ++ .../services/em/optimizer/api/Optimizer.java | 35 ++ .../em/optimizer/api/package-info.java | 20 ++ .../em/optimizer/impl/DataInfoImpl.java | 49 +++ .../impl/EvaluatorParametersImpl.java | 44 +++ .../em/optimizer/impl/RandomOptimizer.java | 303 ++++++++++++++++++ .../em/optimizer/impl/package-info.java | 20 ++ .../snu/cay/services/em/plan/api/Plan.java | 44 +++ .../services/em/plan/api/PlanExecutor.java | 32 ++ .../cay/services/em/plan/api/PlanResult.java | 23 ++ .../services/em/plan/api/TransferStep.java | 27 ++ .../services/em/plan/api/package-info.java | 19 ++ .../em/plan/impl/LoggingPlanExecutor.java | 67 ++++ .../cay/services/em/plan/impl/PlanImpl.java | 131 ++++++++ .../services/em/plan/impl/PlanResultImpl.java | 24 ++ .../em/plan/impl/TransferStepImpl.java | 58 ++++ .../services/em/plan/impl/package-info.java | 19 ++ .../optimizer/impl/RandomOptimizerTest.java | 195 +++++++++++ .../em/optimizer/impl/package-info.java | 19 ++ 20 files changed, 1194 insertions(+) create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/DataInfo.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/EvaluatorParameters.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/Optimizer.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/package-info.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/EvaluatorParametersImpl.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/package-info.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/Plan.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanExecutor.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanResult.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/TransferStep.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/package-info.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/LoggingPlanExecutor.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanResultImpl.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java create mode 100644 services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/package-info.java create mode 100644 services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java create mode 100644 services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/package-info.java diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/DataInfo.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/DataInfo.java new file mode 100644 index 000000000..a1fa06b7d --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/DataInfo.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.optimizer.api; + +/** + * Information about a dataType stored at an Evaluator. + */ +public interface DataInfo { + + /** + * @return the dataType + */ + String getDataType(); + + /** + * @return number of units stored with the dataType + */ + int getNumUnits(); +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/EvaluatorParameters.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/EvaluatorParameters.java new file mode 100644 index 000000000..6d01eed25 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/EvaluatorParameters.java @@ -0,0 +1,33 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.optimizer.api; + +import java.util.Collection; + +/** + * The current state of an evaluator, represented as a set of input parameters for an optimizer. + */ +public interface EvaluatorParameters { + /** + * @return the evaluator's context ID + */ + String getId(); + + /** + * @return information about all data on the evaluator + */ + Collection getDataInfos(); +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/Optimizer.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/Optimizer.java new file mode 100644 index 000000000..1a5c74487 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/Optimizer.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.optimizer.api; + +import edu.snu.cay.services.em.plan.api.Plan; + +import java.util.Collection; + +/** + * Given the current state of evaluators (as parameters) and available resources (as the number of available + * evaluators), the optimizer generates an optimized plan. + */ +public interface Optimizer { + /** + * @param activeEvaluators all currently active evaluators and their parameters + * @param availableEvaluators the total number of evaluators available for optimization. + * If availableEvaluators < activeEvaluators.size(), the optimized plan must delete evaluators. + * If availableEvaluators > activeEvaluators.size(), the optimized plan may add evaluators. + * @return the optimized plan + */ + Plan optimize(Collection activeEvaluators, int availableEvaluators); +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/package-info.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/package-info.java new file mode 100644 index 000000000..7578d6435 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/api/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * An optimizer that takes the current state of evaluators (as parameters) and available resources + * (as the number of available evaluators), and generates a plan {@link edu.snu.cay.services.em.plan.api}. + */ +package edu.snu.cay.services.em.optimizer.api; diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java new file mode 100644 index 000000000..82463df52 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java @@ -0,0 +1,49 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.optimizer.impl; + +import edu.snu.cay.services.em.optimizer.api.DataInfo; + +/** + * A plain-old-data implementation of DataInfo. + */ +public final class DataInfoImpl implements DataInfo { + private final String dataType; + private final int numUnits; + + public DataInfoImpl(final String dataType, final int numUnits) { + this.dataType = dataType; + this.numUnits = numUnits; + } + + @Override + public String getDataType() { + return dataType; + } + + @Override + public int getNumUnits() { + return numUnits; + } + + @Override + public String toString() { + return "DataInfoImpl{" + + "dataType='" + dataType + '\'' + + ", numUnits=" + numUnits + + '}'; + } +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/EvaluatorParametersImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/EvaluatorParametersImpl.java new file mode 100644 index 000000000..c6c9ab189 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/EvaluatorParametersImpl.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.optimizer.impl; + +import edu.snu.cay.services.em.optimizer.api.DataInfo; +import edu.snu.cay.services.em.optimizer.api.EvaluatorParameters; + +import java.util.Collection; + +/** + * A plain-old-data implementation of EvaluatorParameters. + */ +public final class EvaluatorParametersImpl implements EvaluatorParameters { + private final String id; + private final Collection dataInfos; + + public EvaluatorParametersImpl(final String id, final Collection dataInfos) { + this.id = id; + this.dataInfos = dataInfos; + } + + @Override + public String getId() { + return id; + } + + @Override + public Collection getDataInfos() { + return dataInfos; + } +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java new file mode 100644 index 000000000..5335f4235 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java @@ -0,0 +1,303 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.optimizer.impl; + +import edu.snu.cay.services.em.optimizer.api.DataInfo; +import edu.snu.cay.services.em.optimizer.api.EvaluatorParameters; +import edu.snu.cay.services.em.plan.api.Plan; +import edu.snu.cay.services.em.optimizer.api.Optimizer; +import edu.snu.cay.services.em.plan.impl.PlanImpl; +import edu.snu.cay.services.em.plan.api.TransferStep; +import edu.snu.cay.services.em.plan.impl.TransferStepImpl; +import org.apache.reef.tang.annotations.Name; +import org.apache.reef.tang.annotations.NamedParameter; +import org.apache.reef.tang.annotations.Parameter; + +import javax.inject.Inject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Random; +import java.util.Set; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * An Optimizer implementation that creates a Random plan. + * + * The plan is generated by: + * 1. Choosing uniformly between [minEvaluatorsFraction * availableEvaluators, + * maxEvaluatorsFraction * availableEvaluators] evaluators to use in the plan. + * {@link #getEvaluatorsToUse} + * 2. For each dataType, requesting uniformly at random a fixed number of units (10) + * across the evaluators. + * {@link #uniformlyRandomDataRequestPerEvaluator} + * 3. For each dataType, creating transfer steps by greedily taking data needed to + * satisfy the request from an evaluator from it's right side (in a circular array). + * {@link #getTransferSteps} + */ +public final class RandomOptimizer implements Optimizer { + + @NamedParameter(short_name = "random_optimizer_min_fraction", + default_value = "0.5", doc = "The minimum fraction of available evaluators to use. Range [0, 1.0]") + public static final class MinEvaluatorsFraction implements Name { + } + + @NamedParameter(short_name = "random_optimizer_max_fraction", + default_value = "1.0", doc = "The maximum fraction of available evaluators to use. Range [0, 1.0]") + public static final class MaxEvaluatorsFraction implements Name { + } + + private static final Logger LOG = Logger.getLogger(RandomOptimizer.class.getName()); + + private final Random random = new Random(); + private final double minEvaluatorsFraction; + private final double maxEvaluatorsFraction; + + @Inject + private RandomOptimizer(@Parameter(MinEvaluatorsFraction.class) final double minEvaluatorsFraction, + @Parameter(MaxEvaluatorsFraction.class) final double maxEvaluatorsFraction) { + this.minEvaluatorsFraction = minEvaluatorsFraction; + this.maxEvaluatorsFraction = maxEvaluatorsFraction; + } + + @Override + public Plan optimize(final Collection activeEvaluators, final int availableEvaluators) { + final int numEvaluators = getEvaluatorsToUse(availableEvaluators); + + final List evaluatorsToAdd; + final List evaluatorsToDelete; + final List activeEvaluatorsList = new ArrayList<>(activeEvaluators); + if (numEvaluators > activeEvaluatorsList.size()) { + evaluatorsToDelete = new ArrayList<>(0); + evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluatorsList.size()); // Add to the tail + } else { + evaluatorsToAdd = new ArrayList<>(0); + evaluatorsToDelete = new ArrayList<>( + activeEvaluatorsList.subList(numEvaluators, activeEvaluatorsList.size())); // Delete from the tail + } + + final PlanImpl.Builder planBuilder = PlanImpl.newBuilder() + .addEvaluatorsToAdd(getIds(evaluatorsToAdd)) + .addEvaluatorsToDelete(getIds(evaluatorsToDelete)); + + activeEvaluatorsList.addAll(evaluatorsToAdd); + final Collection dataTypes = getDataTypes(activeEvaluatorsList); + for (final String dataType : dataTypes) { + + long sumData = 0; + for (final EvaluatorParameters evaluator : activeEvaluatorsList) { + for (final DataInfo dataInfo : evaluator.getDataInfos()) { + if (dataType.equals(dataInfo.getDataType())) { + sumData += dataInfo.getNumUnits(); + } + } + } + + final List evaluators = getWrappedEvaluators(dataType, activeEvaluatorsList); + uniformlyRandomDataRequestPerEvaluator(evaluators.subList(0, numEvaluators), sumData); + + for (final OptimizedEvaluator evaluator : evaluators) { + LOG.log(Level.FINE, evaluator.toString()); + } + + final List transferSteps = getTransferSteps(evaluators); + planBuilder.addTransferSteps(transferSteps); + + } + return planBuilder.build(); + } + + private int getEvaluatorsToUse(final int availableEvaluators) { + final int minEvaluators = (int) (availableEvaluators * minEvaluatorsFraction); + final int maxEvaluators = (int) (availableEvaluators * maxEvaluatorsFraction); + + if (maxEvaluators - minEvaluators == 0) { + return availableEvaluators; + } else { + return minEvaluators + random.nextInt(maxEvaluators - minEvaluators + 1); // + 1 makes this inclusive + } + } + + private List getNewEvaluators(final int numEvaluators) { + final List newEvaluators = new ArrayList<>(); + for (int i = 0; i < numEvaluators; i++) { + newEvaluators.add(getNewEvaluator(i)); + } + return newEvaluators; + } + + private EvaluatorParameters getNewEvaluator(final int index) { + return new EvaluatorParametersImpl("new-" + random.nextInt() + "-" + index, new ArrayList(0)); + } + + private static Collection getDataTypes(final Collection evaluators) { + final Set dataTypes = new HashSet<>(); + for (final EvaluatorParameters evaluator : evaluators) { + for (final DataInfo dataInfo : evaluator.getDataInfos()) { + dataTypes.add(dataInfo.getDataType()); + } + } + return dataTypes; + } + + private static List getWrappedEvaluators(final String dataType, + final Collection evaluators) { + final List wrappedEvaluators = new ArrayList<>(evaluators.size()); + for (final EvaluatorParameters parameters : evaluators) { + boolean dataTypeAdded = false; + for (final DataInfo dataInfo : parameters.getDataInfos()) { + if (dataType.equals(dataInfo.getDataType())) { + wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo)); + dataTypeAdded = true; + } + } + if (!dataTypeAdded) { + wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataType)); + } + } + return wrappedEvaluators; + } + + private static List getIds(final Collection evaluators) { + final List ids = new ArrayList<>(evaluators.size()); + for (final EvaluatorParameters evaluator : evaluators) { + ids.add(evaluator.getId()); + } + return ids; + } + + private void uniformlyRandomDataRequestPerEvaluator(final List evaluators, + final long totalData) { + // Add each unit of data to a random evaluator. + final long unit = 10; + + // Do remainder first + final long remainder = totalData % unit; + evaluators.get(randomEvaluatorIndex(evaluators.size())).setDataRequested(remainder); + + // Do the rest in unit increments + for (long i = 0; i < totalData / unit; i++) { + final OptimizedEvaluator evaluator = evaluators.get(randomEvaluatorIndex(evaluators.size())); + evaluator.setDataRequested(evaluator.getDataRequested() + unit); + } + } + + /** + * Iterate through the evaluators. When a evaluator needs more data, take it from the right side (circular). + */ + private List getTransferSteps(final List evaluators) { + final List transferSteps = new ArrayList<>(); + + for (int i = 0; i < evaluators.size(); i++) { + final OptimizedEvaluator dstEvaluator = evaluators.get(i); + if (dstEvaluator.getDataRemaining() > 0) { + for (int j = 1; j < evaluators.size(); j++) { + final OptimizedEvaluator srcEvaluator = evaluators.get((i + j) % evaluators.size()); + if (srcEvaluator.getDataRemaining() < 0) { + final long dataToTransfer = Math.min(dstEvaluator.getDataRemaining(), 0 - srcEvaluator.getDataRemaining()); + srcEvaluator.sendData(dstEvaluator.getId(), dataToTransfer); + dstEvaluator.receiveData(srcEvaluator.getId(), dataToTransfer); + } + if (dstEvaluator.getDataRemaining() == 0) { + break; + } + } + } + if (dstEvaluator.getDataRemaining() > 0) { + for (final OptimizedEvaluator evaluator : evaluators) { + LOG.log(Level.SEVERE, evaluator.toString()); + } + + throw new RuntimeException("Unable to satisfy data request. Still need to receive " + + dstEvaluator.getDataRemaining()); + } + transferSteps.addAll(dstEvaluator.getDstTransferSteps()); + } + + return transferSteps; + } + + private int randomEvaluatorIndex(final int numEvaluators) { + return random.nextInt(numEvaluators); + } + + private static class OptimizedEvaluator { + private final String id; + private final String dataType; + private final List dstTransferSteps = new ArrayList<>(); + private long dataAllocated; + private long dataRequested; + + public OptimizedEvaluator(final String id, final DataInfo dataInfo) { + this.id = id; + this.dataType = dataInfo.getDataType(); + this.dataAllocated = dataInfo.getNumUnits(); + this.dataRequested = 0; + } + + public OptimizedEvaluator(final String id, final String dataType) { + this.id = id; + this.dataType = dataType; + this.dataAllocated = 0; + this.dataRequested = 0; + } + + public String getId() { + return id; + } + + public long getDataRequested() { + return dataRequested; + } + + public void setDataRequested(final long dataRequested) { + this.dataRequested = dataRequested; + } + + public long getDataAllocated() { + return dataAllocated; + } + + public long getDataRemaining() { + return dataRequested - dataAllocated; + } + + public void sendData(final String dst, final long data) { + dataAllocated -= data; + } + + public void receiveData(final String src, final long data) { + dataAllocated += data; + dstTransferSteps.add(new TransferStepImpl(src, id, new DataInfoImpl(dataType, (int) data))); + } + + public List getDstTransferSteps() { + return dstTransferSteps; + } + + @Override + public String toString() { + return "OptimizedEvaluator{" + + "id='" + id + '\'' + + ", dataAllocated=" + dataAllocated + + ", dataRequested=" + dataRequested + + ", dstTransferSteps=" + dstTransferSteps + + '}'; + } + } +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/package-info.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/package-info.java new file mode 100644 index 000000000..6a87fc23a --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/package-info.java @@ -0,0 +1,20 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Optimizer class implementations. + * Includes a (trivial) Random Optimizer that can be used for end-to-end testing. + */ +package edu.snu.cay.services.em.optimizer.impl; diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/Plan.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/Plan.java new file mode 100644 index 000000000..178ca207c --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/Plan.java @@ -0,0 +1,44 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.api; + +import java.util.Collection; + +/** + * A plan to be executed. Plan execution must adhere to the following constraints: + * 1. Evaluators must be added before they participate in transfers. + * 2. Evaluators must finish all transfers which they are a part of before they are deleted. + */ +public interface Plan { + /** + * Evaluators to be added before transfer steps. + * @return IDs of evaluators to add. These IDs are referenced by the transfer steps in this plan. + * Different evaluator IDs will likely be assigned during plan execution. + */ + Collection getEvaluatorsToAdd(); + + /** + * Evaluators to be deleted after transfer steps. + * @return IDs of evaluators to delete + */ + Collection getEvaluatorsToDelete(); + + /** + * Transfer steps to be applied. + * @return src, dst, and information about data to be transferred + */ + Collection getTransferSteps(); +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanExecutor.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanExecutor.java new file mode 100644 index 000000000..777194693 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanExecutor.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.api; + +import java.util.concurrent.Future; + +/** + * A plan executor interface. + * Plan execution returns a Future. Executors should start new thread(s) to execute within and update the Future + * when complete. + */ +public interface PlanExecutor { + + /** + * @param plan to execute + * @return a Future that summarizes a plan execution when it has finished + */ + Future execute(Plan plan); +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanResult.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanResult.java new file mode 100644 index 000000000..f3980e0e2 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/PlanResult.java @@ -0,0 +1,23 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.api; + +/** + * The result of the applied plan. + * This interface should be expanded to return a summary of plan execution's result. + */ +public interface PlanResult { +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/TransferStep.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/TransferStep.java new file mode 100644 index 000000000..ea5ca98f7 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/TransferStep.java @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.api; + +import edu.snu.cay.services.em.optimizer.api.DataInfo; + +/** + * A single transfer step to execute as part of a Plan. + */ +public interface TransferStep { + String getSrcId(); + String getDstId(); + DataInfo getDataInfo(); +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/package-info.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/package-info.java new file mode 100644 index 000000000..829e337da --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/api/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * An API for EM plans. A plan is executed to modify the EM state. + */ +package edu.snu.cay.services.em.plan.api; diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/LoggingPlanExecutor.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/LoggingPlanExecutor.java new file mode 100644 index 000000000..cee0c281f --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/LoggingPlanExecutor.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.impl; + +import edu.snu.cay.services.em.plan.api.TransferStep; +import edu.snu.cay.services.em.plan.api.Plan; +import edu.snu.cay.services.em.plan.api.PlanExecutor; +import edu.snu.cay.services.em.plan.api.PlanResult; + +import javax.inject.Inject; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A mock plan executor that executes plans by logging the planned actions within a new thread. + * A real single-threaded executor implementation could follow a similar pattern. + */ +public final class LoggingPlanExecutor implements PlanExecutor { + private static final Logger LOG = Logger.getLogger(LoggingPlanExecutor.class.getName()); + + private final ExecutorService executor = Executors.newSingleThreadExecutor(); + + @Inject + private LoggingPlanExecutor() { + } + + @Override + public Future execute(final Plan plan) { + return executor.submit(new Callable() { + @Override + public PlanResult call() throws Exception { + for (final String evaluatorToAdd : plan.getEvaluatorsToAdd()) { + LOG.log(Level.INFO, "Add evaluator: " + evaluatorToAdd); + } + + for (final TransferStep transferStep : plan.getTransferSteps()) { + LOG.log(Level.INFO, "Apply transfer step: " + transferStep); + } + + for (final String evaluatorToDelete : plan.getEvaluatorsToDelete()) { + LOG.log(Level.INFO, "Delete evaluator: " + evaluatorToDelete); + } + + LOG.log(Level.INFO, "Done."); + + return new PlanResultImpl(); + } + }); + } +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java new file mode 100644 index 000000000..fdc951956 --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java @@ -0,0 +1,131 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.impl; + +import edu.snu.cay.services.em.plan.api.TransferStep; +import edu.snu.cay.services.em.plan.api.Plan; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * A plan implementation with builder. + * The builder checks for duplicate evaluators. + */ +public final class PlanImpl implements Plan { + private final Collection evaluatorsToAdd; + private final Collection evaluatorsToDelete; + private final Collection transferSteps; + + private PlanImpl(final Collection evaluatorsToAdd, + final Collection evaluatorsToDelete, + final Collection transferSteps) { + this.evaluatorsToAdd = evaluatorsToAdd; + this.evaluatorsToDelete = evaluatorsToDelete; + this.transferSteps = transferSteps; + } + + @Override + public Collection getEvaluatorsToAdd() { + return evaluatorsToAdd; + } + + @Override + public Collection getEvaluatorsToDelete() { + return evaluatorsToDelete; + } + + @Override + public Collection getTransferSteps() { + return transferSteps; + } + + @Override + public String toString() { + return "PlanImpl{" + + "evaluatorsToAdd=" + evaluatorsToAdd + + ", evaluatorsToDelete=" + evaluatorsToDelete + + ", transferSteps=" + transferSteps + + '}'; + } + + public static PlanImpl.Builder newBuilder() { + return new Builder(); + } + + public static final class Builder implements org.apache.reef.util.Builder { + private final Set evaluatorsToAdd = new HashSet<>(); + private final Set evaluatorsToDelete = new HashSet<>(); + private final List transferSteps = new ArrayList<>(); + + private Builder() { + } + + public Builder addEvaluatorToAdd(final String evaluatorId) { + evaluatorsToAdd.add(evaluatorId); + return this; + } + + public Builder addEvaluatorsToAdd(final Collection evaluatorIds) { + evaluatorsToAdd.addAll(evaluatorIds); + return this; + } + + public Builder addEvaluatorToDelete(final String evaluatorId) { + evaluatorsToDelete.add(evaluatorId); + return this; + } + + public Builder addEvaluatorsToDelete(final Collection evaluatorIds) { + evaluatorsToDelete.addAll(evaluatorIds); + return this; + } + + public Builder addTransferStep(final TransferStep transferStep) { + transferSteps.add(transferStep); + return this; + } + + public Builder addTransferSteps(final Collection newTransferSteps) { + for (final TransferStep transferStep : newTransferSteps) { + addTransferStep(transferStep); + } + return this; + } + + @Override + public PlanImpl build() { + for (final String evaluator : evaluatorsToAdd) { + if (evaluatorsToDelete.contains(evaluator)) { + throw new RuntimeException(evaluator + " is planned for addition and deletion."); + } + } + + for (final TransferStep transferStep : transferSteps) { + if (evaluatorsToDelete.contains(transferStep.getDstId())) { + throw new RuntimeException(transferStep.getDstId() + " is planned for deletion."); + } else if (evaluatorsToAdd.contains(transferStep.getSrcId())) { + throw new RuntimeException(transferStep.getSrcId() + " is planned for addition."); + } + } + + return new PlanImpl(evaluatorsToAdd, evaluatorsToDelete, transferSteps); + } + } +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanResultImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanResultImpl.java new file mode 100644 index 000000000..177349e3f --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanResultImpl.java @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.impl; + +import edu.snu.cay.services.em.plan.api.PlanResult; + +/** + * A plain-old-data implementation of PlanResult. + */ +public final class PlanResultImpl implements PlanResult { +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java new file mode 100644 index 000000000..0083e010f --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.plan.impl; + +import edu.snu.cay.services.em.optimizer.api.DataInfo; +import edu.snu.cay.services.em.plan.api.TransferStep; + +/** + * A plain-old-data implementation of TransferStep. + */ +public class TransferStepImpl implements TransferStep { + private final String srcId; + private final String dstId; + private final DataInfo dataInfo; + + public TransferStepImpl(final String srcId, final String dstId, final DataInfo dataInfo) { + this.srcId = srcId; + this.dstId = dstId; + this.dataInfo = dataInfo; + } + + @Override + public String getSrcId() { + return srcId; + } + + @Override + public String getDstId() { + return dstId; + } + + @Override + public DataInfo getDataInfo() { + return dataInfo; + } + + @Override + public String toString() { + return "TransferStepImpl{" + + "srcId='" + srcId + '\'' + + ", dstId='" + dstId + '\'' + + ", dataInfo=" + dataInfo + + '}'; + } +} diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/package-info.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/package-info.java new file mode 100644 index 000000000..28ab27f4a --- /dev/null +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Implementations of EM plans and plan executors. + */ +package edu.snu.cay.services.em.plan.impl; diff --git a/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java b/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java new file mode 100644 index 000000000..eccdef463 --- /dev/null +++ b/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java @@ -0,0 +1,195 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package edu.snu.cay.services.em.optimizer.impl; + +import edu.snu.cay.services.em.optimizer.api.DataInfo; +import edu.snu.cay.services.em.optimizer.api.EvaluatorParameters; +import edu.snu.cay.services.em.plan.api.TransferStep; +import edu.snu.cay.services.em.plan.api.Plan; +import org.apache.reef.tang.Configuration; +import org.apache.reef.tang.Tang; +import org.apache.reef.tang.exceptions.InjectionException; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +/** + * Test that the Random Optimizer behaves correctly. + */ +public final class RandomOptimizerTest { + + /** + * Test that no evaluators are added or deleted when + * random optimizer is given a min, max of 1.0, 1.0 and + * the available evaluators is the same as the previous number of evaluators. + */ + @Test + public void testSameNumEvaluators() { + final RandomOptimizer randomOptimizer = getRandomOptimizer(1.0, 1.0); + final long dataPerEvaluator = 5000; + final int numEvaluators = 10; + final Collection evaluators = getUniformEvaluators(dataPerEvaluator, numEvaluators); + + final Plan plan = randomOptimizer.optimize(evaluators, numEvaluators); + + assertEquals(0, plan.getEvaluatorsToAdd().size()); + assertEquals(0, plan.getEvaluatorsToDelete().size()); + } + + /** + * Test that half the evaluators are deleted when + * random optimizer is given a min, max of 1.0, 1.0 and + * the available evaluators is half the previous number of evaluators. + */ + @Test + public void testHalfNumEvaluators() { + final RandomOptimizer randomOptimizer = getRandomOptimizer(1.0, 1.0); + final long dataPerEvaluator = 5000; + final int numEvaluators = 10; + final Collection evaluators = getUniformEvaluators(dataPerEvaluator, numEvaluators); + + final Plan plan = randomOptimizer.optimize(evaluators, numEvaluators / 2); + + assertEquals(0, plan.getEvaluatorsToAdd().size()); + assertEquals(numEvaluators / 2, plan.getEvaluatorsToDelete().size()); + } + + /** + * Test that evaluators are added to double the number of evaluators when + * random optimizer is given a min, max of 1.0, 1.0 and + * the available evaluators is twice the previous number of evaluators. + */ + @Test + public void testDoubleNumEvaluators() { + final RandomOptimizer randomOptimizer = getRandomOptimizer(1.0, 1.0); + final long dataPerEvaluator = 5000; + final int numEvaluators = 10; + final Collection evaluators = getUniformEvaluators(dataPerEvaluator, numEvaluators); + + final Plan plan = randomOptimizer.optimize(evaluators, numEvaluators * 2); + + assertEquals(numEvaluators, plan.getEvaluatorsToAdd().size()); + assertEquals(0, plan.getEvaluatorsToDelete().size()); + } + + /** + * Test that optimization runs without exception when + * random optimizer is given a min, max of 0.5, 1.0 and + * the available evaluators is 1.5 times the previous number of evaluators. + */ + @Test + public void testRandomNumEvaluators() { + final RandomOptimizer randomOptimizer = getRandomOptimizer(0.5, 1.0); + final long dataPerEvaluator = 5000; + final int numEvaluators = 10; + final Collection evaluators = getUniformEvaluators(dataPerEvaluator, numEvaluators); + + final Plan plan = randomOptimizer.optimize(evaluators, numEvaluators * 3 / 2); + + assertNotNull(plan.getEvaluatorsToAdd()); + assertNotNull(plan.getEvaluatorsToDelete()); + assertNotNull(plan.getTransferSteps()); + } + + /** + * Test that the optimizer runs correctly with multiple data types. + * In particular, + * (sameNumEvaluatorsPlan) no evaluators are added or deleted when + * random optimizer is given a min, max of 1.0, 1.0 and + * the available evaluators is the same as the previous number of evaluators. + * (singleEvaluatorPlan) given a single evaluator is deleted, the + * expected data types and number of units are planned for transfer. + * + */ + @Test + public void testMultipleDataTypes() { + final RandomOptimizer randomOptimizer = getRandomOptimizer(1.0, 1.0); + final List evaluators = new ArrayList<>(); + + final List dataInfo1 = new ArrayList<>(); + dataInfo1.add(new DataInfoImpl("dataTypeB", 300)); + evaluators.add(new EvaluatorParametersImpl("1", dataInfo1)); + + final List dataInfo2 = new ArrayList<>(); + dataInfo2.add(new DataInfoImpl("dataTypeA", 1000)); + dataInfo2.add(new DataInfoImpl("dataTypeB", 500)); + evaluators.add(new EvaluatorParametersImpl("2", dataInfo2)); + + final Plan sameNumEvaluatorsPlan = randomOptimizer.optimize(evaluators, evaluators.size()); + + assertEquals(0, sameNumEvaluatorsPlan.getEvaluatorsToAdd().size()); + assertEquals(0, sameNumEvaluatorsPlan.getEvaluatorsToDelete().size()); + + final Plan singleEvaluatorPlan = randomOptimizer.optimize(evaluators, 1); + + assertEquals(0, singleEvaluatorPlan.getEvaluatorsToAdd().size()); + assertEquals(1, singleEvaluatorPlan.getEvaluatorsToDelete().size()); + + final String evaluatorToDelete = singleEvaluatorPlan.getEvaluatorsToDelete().iterator().next(); + if ("1".equals(evaluatorToDelete)) { + + assertEquals(1, singleEvaluatorPlan.getTransferSteps().size()); + final TransferStep transferStep = singleEvaluatorPlan.getTransferSteps().iterator().next(); + assertEquals("1", transferStep.getSrcId()); + assertEquals("2", transferStep.getDstId()); + assertEquals("dataTypeB", transferStep.getDataInfo().getDataType()); + assertEquals(300, transferStep.getDataInfo().getNumUnits()); + + } else if ("2".equals(evaluatorToDelete)) { + + assertEquals(2, singleEvaluatorPlan.getTransferSteps().size()); + long sum = 0; + for (final TransferStep transferStep : singleEvaluatorPlan.getTransferSteps()) { + assertEquals("2", transferStep.getSrcId()); + assertEquals("1", transferStep.getDstId()); + sum += transferStep.getDataInfo().getNumUnits(); + } + assertEquals(1000 + 500, sum); + + } else { + fail("Unknown evaluatorToDelete " + evaluatorToDelete); + } + } + + private Collection getUniformEvaluators(final long dataPerEvaluator, final int numEvaluators) { + final List evaluators = new ArrayList<>(numEvaluators); + for (int i = 0; i < numEvaluators; i++) { + final List dataInfos = new ArrayList<>(1); + dataInfos.add(new DataInfoImpl("testType", (int) dataPerEvaluator)); + evaluators.add(new EvaluatorParametersImpl("test-" + i, dataInfos)); + } + return evaluators; + } + + private static RandomOptimizer getRandomOptimizer(final double minEvaluatorsFraction, + final double maxEvaluatorsFraction) { + final Configuration configuration = Tang.Factory.getTang().newConfigurationBuilder() + .bindNamedParameter(RandomOptimizer.MinEvaluatorsFraction.class, Double.toString(minEvaluatorsFraction)) + .bindNamedParameter(RandomOptimizer.MaxEvaluatorsFraction.class, Double.toString(maxEvaluatorsFraction)) + .build(); + try { + return Tang.Factory.getTang().newInjector(configuration).getInstance(RandomOptimizer.class); + } catch (final InjectionException e) { + throw new RuntimeException(e); + } + } +} diff --git a/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/package-info.java b/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/package-info.java new file mode 100644 index 000000000..0edddef25 --- /dev/null +++ b/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright (C) 2015 Seoul National University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * Tests for the (trivial) Random Optimizer. + */ +package edu.snu.cay.services.em.optimizer.impl; From 9baae4b08134fa5df991d09473b0e5de6d1ab8f7 Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Thu, 24 Sep 2015 23:42:12 +0900 Subject: [PATCH 2/5] Address code review comments - Fix comments - Add some basic argument checking - Remove use of long for numUnits - Fix unit test --- .../em/optimizer/impl/RandomOptimizer.java | 45 ++++++++++++------- .../optimizer/impl/RandomOptimizerTest.java | 32 ++++--------- 2 files changed, 39 insertions(+), 38 deletions(-) diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java index 5335f4235..b8ed4b76e 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java @@ -43,8 +43,8 @@ * 1. Choosing uniformly between [minEvaluatorsFraction * availableEvaluators, * maxEvaluatorsFraction * availableEvaluators] evaluators to use in the plan. * {@link #getEvaluatorsToUse} - * 2. For each dataType, requesting uniformly at random a fixed number of units (10) - * across the evaluators. + * 2. For each dataType, randomly redistributing all units -- by batching units (of 10) and + * sending each batch to an evaluator selected uniformly at random. * {@link #uniformlyRandomDataRequestPerEvaluator} * 3. For each dataType, creating transfer steps by greedily taking data needed to * satisfy the request from an evaluator from it's right side (in a circular array). @@ -71,12 +71,24 @@ public static final class MaxEvaluatorsFraction implements Name { @Inject private RandomOptimizer(@Parameter(MinEvaluatorsFraction.class) final double minEvaluatorsFraction, @Parameter(MaxEvaluatorsFraction.class) final double maxEvaluatorsFraction) { + if (minEvaluatorsFraction < 0.0 || minEvaluatorsFraction > 1.0 + || maxEvaluatorsFraction < 0.0 || maxEvaluatorsFraction > 1.0) { + throw new IllegalArgumentException( + "minEvaluatorsFraction " + minEvaluatorsFraction + + " and maxEvaluatorsFraction " + maxEvaluatorsFraction + + " must be within range [0.0, 1.0]"); + } + this.minEvaluatorsFraction = minEvaluatorsFraction; this.maxEvaluatorsFraction = maxEvaluatorsFraction; } @Override public Plan optimize(final Collection activeEvaluators, final int availableEvaluators) { + if (availableEvaluators <= 0) { + throw new IllegalArgumentException("availableEvaluators " + availableEvaluators + " must be > 0"); + } + final int numEvaluators = getEvaluatorsToUse(availableEvaluators); final List evaluatorsToAdd; @@ -156,12 +168,15 @@ private static Collection getDataTypes(final Collection getWrappedEvaluators(final String dataType, - final Collection evaluators) { + final Collection evaluators) { final List wrappedEvaluators = new ArrayList<>(evaluators.size()); for (final EvaluatorParameters parameters : evaluators) { boolean dataTypeAdded = false; for (final DataInfo dataInfo : parameters.getDataInfos()) { if (dataType.equals(dataInfo.getDataType())) { + if (dataTypeAdded) { + throw new IllegalArgumentException("Cannot have multiple infos for " + dataType); + } wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo)); dataTypeAdded = true; } @@ -184,10 +199,10 @@ private static List getIds(final Collection evaluat private void uniformlyRandomDataRequestPerEvaluator(final List evaluators, final long totalData) { // Add each unit of data to a random evaluator. - final long unit = 10; + final int unit = 10; // Do remainder first - final long remainder = totalData % unit; + final int remainder = (int) totalData % unit; evaluators.get(randomEvaluatorIndex(evaluators.size())).setDataRequested(remainder); // Do the rest in unit increments @@ -209,7 +224,7 @@ private List getTransferSteps(final List evalu for (int j = 1; j < evaluators.size(); j++) { final OptimizedEvaluator srcEvaluator = evaluators.get((i + j) % evaluators.size()); if (srcEvaluator.getDataRemaining() < 0) { - final long dataToTransfer = Math.min(dstEvaluator.getDataRemaining(), 0 - srcEvaluator.getDataRemaining()); + final int dataToTransfer = Math.min(dstEvaluator.getDataRemaining(), 0 - srcEvaluator.getDataRemaining()); srcEvaluator.sendData(dstEvaluator.getId(), dataToTransfer); dstEvaluator.receiveData(srcEvaluator.getId(), dataToTransfer); } @@ -240,8 +255,8 @@ private static class OptimizedEvaluator { private final String id; private final String dataType; private final List dstTransferSteps = new ArrayList<>(); - private long dataAllocated; - private long dataRequested; + private int dataAllocated; + private int dataRequested; public OptimizedEvaluator(final String id, final DataInfo dataInfo) { this.id = id; @@ -261,29 +276,29 @@ public String getId() { return id; } - public long getDataRequested() { + public int getDataRequested() { return dataRequested; } - public void setDataRequested(final long dataRequested) { + public void setDataRequested(final int dataRequested) { this.dataRequested = dataRequested; } - public long getDataAllocated() { + public int getDataAllocated() { return dataAllocated; } - public long getDataRemaining() { + public int getDataRemaining() { return dataRequested - dataAllocated; } - public void sendData(final String dst, final long data) { + public void sendData(final String dst, final int data) { dataAllocated -= data; } - public void receiveData(final String src, final long data) { + public void receiveData(final String src, final int data) { dataAllocated += data; - dstTransferSteps.add(new TransferStepImpl(src, id, new DataInfoImpl(dataType, (int) data))); + dstTransferSteps.add(new TransferStepImpl(src, id, new DataInfoImpl(dataType, data))); } public List getDstTransferSteps() { diff --git a/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java b/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java index eccdef463..4527ece2a 100644 --- a/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java +++ b/services/elastic-memory/src/test/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizerTest.java @@ -30,7 +30,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.fail; /** * Test that the Random Optimizer behaves correctly. @@ -145,29 +144,16 @@ public void testMultipleDataTypes() { assertEquals(1, singleEvaluatorPlan.getEvaluatorsToDelete().size()); final String evaluatorToDelete = singleEvaluatorPlan.getEvaluatorsToDelete().iterator().next(); - if ("1".equals(evaluatorToDelete)) { - - assertEquals(1, singleEvaluatorPlan.getTransferSteps().size()); - final TransferStep transferStep = singleEvaluatorPlan.getTransferSteps().iterator().next(); - assertEquals("1", transferStep.getSrcId()); - assertEquals("2", transferStep.getDstId()); - assertEquals("dataTypeB", transferStep.getDataInfo().getDataType()); - assertEquals(300, transferStep.getDataInfo().getNumUnits()); - - } else if ("2".equals(evaluatorToDelete)) { - - assertEquals(2, singleEvaluatorPlan.getTransferSteps().size()); - long sum = 0; - for (final TransferStep transferStep : singleEvaluatorPlan.getTransferSteps()) { - assertEquals("2", transferStep.getSrcId()); - assertEquals("1", transferStep.getDstId()); - sum += transferStep.getDataInfo().getNumUnits(); - } - assertEquals(1000 + 500, sum); - - } else { - fail("Unknown evaluatorToDelete " + evaluatorToDelete); + assertEquals("2", evaluatorToDelete); + assertEquals(2, singleEvaluatorPlan.getTransferSteps().size()); + + long sum = 0; + for (final TransferStep transferStep : singleEvaluatorPlan.getTransferSteps()) { + assertEquals("2", transferStep.getSrcId()); + assertEquals("1", transferStep.getDstId()); + sum += transferStep.getDataInfo().getNumUnits(); } + assertEquals(1000 + 500, sum); } private Collection getUniformEvaluators(final long dataPerEvaluator, final int numEvaluators) { From d08903d75ae00ef35fe086d544ee025ecae78d44 Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Sun, 27 Sep 2015 22:22:03 +0900 Subject: [PATCH 3/5] Change toString to use StringBuilder (addressing comments). --- .../services/em/optimizer/impl/DataInfoImpl.java | 9 +++++---- .../em/optimizer/impl/RandomOptimizer.java | 14 ++++++++------ .../snu/cay/services/em/plan/impl/PlanImpl.java | 11 ++++++----- .../services/em/plan/impl/TransferStepImpl.java | 11 ++++++----- 4 files changed, 25 insertions(+), 20 deletions(-) diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java index 82463df52..34875d2f3 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/DataInfoImpl.java @@ -41,9 +41,10 @@ public int getNumUnits() { @Override public String toString() { - return "DataInfoImpl{" + - "dataType='" + dataType + '\'' + - ", numUnits=" + numUnits + - '}'; + final StringBuilder sb = new StringBuilder("DataInfoImpl{"); + sb.append("dataType='").append(dataType).append('\''); + sb.append(", numUnits=").append(numUnits); + sb.append('}'); + return sb.toString(); } } diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java index b8ed4b76e..41bf5fad4 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java @@ -307,12 +307,14 @@ public List getDstTransferSteps() { @Override public String toString() { - return "OptimizedEvaluator{" + - "id='" + id + '\'' + - ", dataAllocated=" + dataAllocated + - ", dataRequested=" + dataRequested + - ", dstTransferSteps=" + dstTransferSteps + - '}'; + final StringBuilder sb = new StringBuilder("OptimizedEvaluator{"); + sb.append("id='").append(id).append('\''); + sb.append(", dataType='").append(dataType).append('\''); + sb.append(", dstTransferSteps=").append(dstTransferSteps); + sb.append(", dataAllocated=").append(dataAllocated); + sb.append(", dataRequested=").append(dataRequested); + sb.append('}'); + return sb.toString(); } } } diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java index fdc951956..2c7c59db6 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/PlanImpl.java @@ -58,11 +58,12 @@ public Collection getTransferSteps() { @Override public String toString() { - return "PlanImpl{" + - "evaluatorsToAdd=" + evaluatorsToAdd + - ", evaluatorsToDelete=" + evaluatorsToDelete + - ", transferSteps=" + transferSteps + - '}'; + final StringBuilder sb = new StringBuilder("PlanImpl{"); + sb.append("evaluatorsToAdd=").append(evaluatorsToAdd); + sb.append(", evaluatorsToDelete=").append(evaluatorsToDelete); + sb.append(", transferSteps=").append(transferSteps); + sb.append('}'); + return sb.toString(); } public static PlanImpl.Builder newBuilder() { diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java index 0083e010f..25ab83baa 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/plan/impl/TransferStepImpl.java @@ -49,10 +49,11 @@ public DataInfo getDataInfo() { @Override public String toString() { - return "TransferStepImpl{" + - "srcId='" + srcId + '\'' + - ", dstId='" + dstId + '\'' + - ", dataInfo=" + dataInfo + - '}'; + final StringBuilder sb = new StringBuilder("TransferStepImpl{"); + sb.append("srcId='").append(srcId).append('\''); + sb.append(", dstId='").append(dstId).append('\''); + sb.append(", dataInfo=").append(dataInfo); + sb.append('}'); + return sb.toString(); } } From 82508c5c3c12b9970641a6f1f462d973508f7e52 Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Sun, 27 Sep 2015 22:25:54 +0900 Subject: [PATCH 4/5] Add explicit else case to make code clearer (addressing comments). --- .../snu/cay/services/em/optimizer/impl/RandomOptimizer.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java index 41bf5fad4..640d68ed2 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java @@ -97,10 +97,13 @@ public Plan optimize(final Collection activeEvaluators, fin if (numEvaluators > activeEvaluatorsList.size()) { evaluatorsToDelete = new ArrayList<>(0); evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluatorsList.size()); // Add to the tail - } else { + } else if (numEvaluators < activeEvaluatorsList.size()) { evaluatorsToAdd = new ArrayList<>(0); evaluatorsToDelete = new ArrayList<>( activeEvaluatorsList.subList(numEvaluators, activeEvaluatorsList.size())); // Delete from the tail + } else { + evaluatorsToAdd = new ArrayList<>(0); + evaluatorsToDelete = new ArrayList<>(0); } final PlanImpl.Builder planBuilder = PlanImpl.newBuilder() From 453c2b01ec3eaaca98fbd984bf2d0b1d942b3ab9 Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Mon, 28 Sep 2015 00:01:24 +0900 Subject: [PATCH 5/5] Add comments and refactor RandomOptimizer for better readability (addressing comments). --- .../em/optimizer/impl/RandomOptimizer.java | 126 ++++++++++++------ 1 file changed, 85 insertions(+), 41 deletions(-) diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java index 640d68ed2..8b45e4f17 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -45,7 +46,7 @@ * {@link #getEvaluatorsToUse} * 2. For each dataType, randomly redistributing all units -- by batching units (of 10) and * sending each batch to an evaluator selected uniformly at random. - * {@link #uniformlyRandomDataRequestPerEvaluator} + * {@link #distributeDataAcrossEvaluators} * 3. For each dataType, creating transfer steps by greedily taking data needed to * satisfy the request from an evaluator from it's right side (in a circular array). * {@link #getTransferSteps} @@ -65,6 +66,8 @@ public static final class MaxEvaluatorsFraction implements Name { private static final Logger LOG = Logger.getLogger(RandomOptimizer.class.getName()); private final Random random = new Random(); + private final AtomicInteger newEvaluatorId = new AtomicInteger(0); + private final double minEvaluatorsFraction; private final double maxEvaluatorsFraction; @@ -84,7 +87,8 @@ private RandomOptimizer(@Parameter(MinEvaluatorsFraction.class) final double min } @Override - public Plan optimize(final Collection activeEvaluators, final int availableEvaluators) { + public Plan optimize(final Collection activeEvaluatorsCollection, + final int availableEvaluators) { if (availableEvaluators <= 0) { throw new IllegalArgumentException("availableEvaluators " + availableEvaluators + " must be > 0"); } @@ -93,14 +97,14 @@ public Plan optimize(final Collection activeEvaluators, fin final List evaluatorsToAdd; final List evaluatorsToDelete; - final List activeEvaluatorsList = new ArrayList<>(activeEvaluators); - if (numEvaluators > activeEvaluatorsList.size()) { + final List activeEvaluators = new ArrayList<>(activeEvaluatorsCollection); + if (numEvaluators > activeEvaluators.size()) { evaluatorsToDelete = new ArrayList<>(0); - evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluatorsList.size()); // Add to the tail - } else if (numEvaluators < activeEvaluatorsList.size()) { + evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluators.size()); // Add to the tail + } else if (numEvaluators < activeEvaluators.size()) { evaluatorsToAdd = new ArrayList<>(0); evaluatorsToDelete = new ArrayList<>( - activeEvaluatorsList.subList(numEvaluators, activeEvaluatorsList.size())); // Delete from the tail + activeEvaluators.subList(numEvaluators, activeEvaluators.size())); // Delete from the tail } else { evaluatorsToAdd = new ArrayList<>(0); evaluatorsToDelete = new ArrayList<>(0); @@ -110,31 +114,36 @@ public Plan optimize(final Collection activeEvaluators, fin .addEvaluatorsToAdd(getIds(evaluatorsToAdd)) .addEvaluatorsToDelete(getIds(evaluatorsToDelete)); - activeEvaluatorsList.addAll(evaluatorsToAdd); - final Collection dataTypes = getDataTypes(activeEvaluatorsList); + /* + * For each dataType: + * 1. Find the sum of the number of all units across the Evaluators. (getSumData) + * 2. Initialize an OptimizedEvaluator per Evaluator from the dataType's DataInfo. (initOptimizedEvaluators) + * An OptimizedEvaluator keeps track of data at an Evaluator while distributing data and scheduling transfers. + * 3. Randomly distribute all units across the non-deleted Evaluators. (distributeDataAcrossEvaluators) + * 4. Create transfer steps to satisfy the distribution of units. (getTransferSteps) + * Add these transfer steps to the plan. + */ + activeEvaluators.addAll(evaluatorsToAdd); + final Collection dataTypes = getDataTypes(activeEvaluators); for (final String dataType : dataTypes) { + final long sumData = getSumData(dataType, activeEvaluators); + final List evaluators = initOptimizedEvaluators(dataType, activeEvaluators); + distributeDataAcrossEvaluators(evaluators.subList(0, numEvaluators), sumData); - long sumData = 0; - for (final EvaluatorParameters evaluator : activeEvaluatorsList) { - for (final DataInfo dataInfo : evaluator.getDataInfos()) { - if (dataType.equals(dataInfo.getDataType())) { - sumData += dataInfo.getNumUnits(); - } + if (LOG.isLoggable(Level.FINE)) { + for (final OptimizedEvaluator evaluator : evaluators) { + LOG.log(Level.FINE, "RandomOptimizer data distribution: {0} {1}", + new Object[]{evaluator.id, evaluator.dataRequested}); } } - final List evaluators = getWrappedEvaluators(dataType, activeEvaluatorsList); - uniformlyRandomDataRequestPerEvaluator(evaluators.subList(0, numEvaluators), sumData); - - for (final OptimizedEvaluator evaluator : evaluators) { - LOG.log(Level.FINE, evaluator.toString()); - } - final List transferSteps = getTransferSteps(evaluators); planBuilder.addTransferSteps(transferSteps); - } - return planBuilder.build(); + + final Plan plan = planBuilder.build(); + LOG.log(Level.FINE, "RandomOptimizer Plan: {0}", plan); + return plan; } private int getEvaluatorsToUse(final int availableEvaluators) { @@ -151,13 +160,14 @@ private int getEvaluatorsToUse(final int availableEvaluators) { private List getNewEvaluators(final int numEvaluators) { final List newEvaluators = new ArrayList<>(); for (int i = 0; i < numEvaluators; i++) { - newEvaluators.add(getNewEvaluator(i)); + newEvaluators.add(getNewEvaluator()); } return newEvaluators; } - private EvaluatorParameters getNewEvaluator(final int index) { - return new EvaluatorParametersImpl("new-" + random.nextInt() + "-" + index, new ArrayList(0)); + private EvaluatorParameters getNewEvaluator() { + return new EvaluatorParametersImpl("newRandomOptimizerNode-" + newEvaluatorId.getAndIncrement(), + new ArrayList(0)); } private static Collection getDataTypes(final Collection evaluators) { @@ -170,9 +180,30 @@ private static Collection getDataTypes(final Collection getWrappedEvaluators(final String dataType, - final Collection evaluators) { - final List wrappedEvaluators = new ArrayList<>(evaluators.size()); + private static long getSumData(final String dataType, + final Collection evaluators) { + long sumData = 0; + for (final EvaluatorParameters evaluator : evaluators) { + for (final DataInfo dataInfo : evaluator.getDataInfos()) { + if (dataType.equals(dataInfo.getDataType())) { + sumData += dataInfo.getNumUnits(); + } + } + } + return sumData; + } + + /** + * Initialize an OptimizedEvaluator per Evaluator from the dataType's DataInfo. + * An OptimizedEvaluator keeps track of data at an Evaluator while distributing data and scheduling transfers. + * Checks for duplicate dataTypes. + * @param dataType + * @param evaluators EvaluatorParameters to create OptimizedEvaluators for + * @return list of OptimizedEvaluator's, one per EvaluatorParameter passed in + */ + private static List initOptimizedEvaluators(final String dataType, + final Collection evaluators) { + final List optimizedEvaluators = new ArrayList<>(evaluators.size()); for (final EvaluatorParameters parameters : evaluators) { boolean dataTypeAdded = false; for (final DataInfo dataInfo : parameters.getDataInfos()) { @@ -180,15 +211,15 @@ private static List getWrappedEvaluators(final String dataTy if (dataTypeAdded) { throw new IllegalArgumentException("Cannot have multiple infos for " + dataType); } - wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo)); + optimizedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo)); dataTypeAdded = true; } } if (!dataTypeAdded) { - wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataType)); + optimizedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataType)); } } - return wrappedEvaluators; + return optimizedEvaluators; } private static List getIds(final Collection evaluators) { @@ -199,24 +230,38 @@ private static List getIds(final Collection evaluat return ids; } - private void uniformlyRandomDataRequestPerEvaluator(final List evaluators, - final long totalData) { + /** + * Randomly redistribute all data units. + * The distribution is done by batching units (of 10) and + * sending each batch to an evaluator selected uniformly at random. + * @param evaluators Evaluators to distribute data to. Should *not* include to-be-deleted Evaluators. + * @param totalData the amount of data to distribute + */ + private void distributeDataAcrossEvaluators(final List evaluators, + final long totalData) { // Add each unit of data to a random evaluator. final int unit = 10; // Do remainder first final int remainder = (int) totalData % unit; - evaluators.get(randomEvaluatorIndex(evaluators.size())).setDataRequested(remainder); + evaluators.get(getRandomEvaluatorIndex(evaluators.size())).setDataRequested(remainder); // Do the rest in unit increments for (long i = 0; i < totalData / unit; i++) { - final OptimizedEvaluator evaluator = evaluators.get(randomEvaluatorIndex(evaluators.size())); + final OptimizedEvaluator evaluator = evaluators.get(getRandomEvaluatorIndex(evaluators.size())); evaluator.setDataRequested(evaluator.getDataRequested() + unit); } } + private int getRandomEvaluatorIndex(final int numEvaluators) { + return random.nextInt(numEvaluators); + } + /** + * Create transfer steps to satisfy redistributed data at the evaluators. * Iterate through the evaluators. When a evaluator needs more data, take it from the right side (circular). + * @param evaluators Evaluators that can participate in transfers. *Should* include to-be-deleted Evaluators. + * @return list of transfer steps */ private List getTransferSteps(final List evaluators) { final List transferSteps = new ArrayList<>(); @@ -224,9 +269,12 @@ private List getTransferSteps(final List evalu for (int i = 0; i < evaluators.size(); i++) { final OptimizedEvaluator dstEvaluator = evaluators.get(i); if (dstEvaluator.getDataRemaining() > 0) { + // Find srcEvaluator's with data units to transfer to dstEvaluator, + // starting from the next index and iterating through as a circular array. for (int j = 1; j < evaluators.size(); j++) { final OptimizedEvaluator srcEvaluator = evaluators.get((i + j) % evaluators.size()); if (srcEvaluator.getDataRemaining() < 0) { + // Mark all available units at srcEvaluator as transfer steps to be transferred to dstEvaluator. final int dataToTransfer = Math.min(dstEvaluator.getDataRemaining(), 0 - srcEvaluator.getDataRemaining()); srcEvaluator.sendData(dstEvaluator.getId(), dataToTransfer); dstEvaluator.receiveData(srcEvaluator.getId(), dataToTransfer); @@ -250,10 +298,6 @@ private List getTransferSteps(final List evalu return transferSteps; } - private int randomEvaluatorIndex(final int numEvaluators) { - return random.nextInt(numEvaluators); - } - private static class OptimizedEvaluator { private final String id; private final String dataType;