From 453c2b01ec3eaaca98fbd984bf2d0b1d942b3ab9 Mon Sep 17 00:00:00 2001 From: "Brian (Sunghoon) Cho" Date: Mon, 28 Sep 2015 00:01:24 +0900 Subject: [PATCH] Add comments and refactor RandomOptimizer for better readability (addressing comments). --- .../em/optimizer/impl/RandomOptimizer.java | 126 ++++++++++++------ 1 file changed, 85 insertions(+), 41 deletions(-) diff --git a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java index 640d68ed2..8b45e4f17 100644 --- a/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java +++ b/services/elastic-memory/src/main/java/edu/snu/cay/services/em/optimizer/impl/RandomOptimizer.java @@ -33,6 +33,7 @@ import java.util.List; import java.util.Random; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; @@ -45,7 +46,7 @@ * {@link #getEvaluatorsToUse} * 2. For each dataType, randomly redistributing all units -- by batching units (of 10) and * sending each batch to an evaluator selected uniformly at random. - * {@link #uniformlyRandomDataRequestPerEvaluator} + * {@link #distributeDataAcrossEvaluators} * 3. For each dataType, creating transfer steps by greedily taking data needed to * satisfy the request from an evaluator from it's right side (in a circular array). * {@link #getTransferSteps} @@ -65,6 +66,8 @@ public static final class MaxEvaluatorsFraction implements Name { private static final Logger LOG = Logger.getLogger(RandomOptimizer.class.getName()); private final Random random = new Random(); + private final AtomicInteger newEvaluatorId = new AtomicInteger(0); + private final double minEvaluatorsFraction; private final double maxEvaluatorsFraction; @@ -84,7 +87,8 @@ private RandomOptimizer(@Parameter(MinEvaluatorsFraction.class) final double min } @Override - public Plan optimize(final Collection activeEvaluators, final int availableEvaluators) { + public Plan optimize(final Collection activeEvaluatorsCollection, + final int availableEvaluators) { if (availableEvaluators <= 0) { throw new IllegalArgumentException("availableEvaluators " + availableEvaluators + " must be > 0"); } @@ -93,14 +97,14 @@ public Plan optimize(final Collection activeEvaluators, fin final List evaluatorsToAdd; final List evaluatorsToDelete; - final List activeEvaluatorsList = new ArrayList<>(activeEvaluators); - if (numEvaluators > activeEvaluatorsList.size()) { + final List activeEvaluators = new ArrayList<>(activeEvaluatorsCollection); + if (numEvaluators > activeEvaluators.size()) { evaluatorsToDelete = new ArrayList<>(0); - evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluatorsList.size()); // Add to the tail - } else if (numEvaluators < activeEvaluatorsList.size()) { + evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluators.size()); // Add to the tail + } else if (numEvaluators < activeEvaluators.size()) { evaluatorsToAdd = new ArrayList<>(0); evaluatorsToDelete = new ArrayList<>( - activeEvaluatorsList.subList(numEvaluators, activeEvaluatorsList.size())); // Delete from the tail + activeEvaluators.subList(numEvaluators, activeEvaluators.size())); // Delete from the tail } else { evaluatorsToAdd = new ArrayList<>(0); evaluatorsToDelete = new ArrayList<>(0); @@ -110,31 +114,36 @@ public Plan optimize(final Collection activeEvaluators, fin .addEvaluatorsToAdd(getIds(evaluatorsToAdd)) .addEvaluatorsToDelete(getIds(evaluatorsToDelete)); - activeEvaluatorsList.addAll(evaluatorsToAdd); - final Collection dataTypes = getDataTypes(activeEvaluatorsList); + /* + * For each dataType: + * 1. Find the sum of the number of all units across the Evaluators. (getSumData) + * 2. Initialize an OptimizedEvaluator per Evaluator from the dataType's DataInfo. (initOptimizedEvaluators) + * An OptimizedEvaluator keeps track of data at an Evaluator while distributing data and scheduling transfers. + * 3. Randomly distribute all units across the non-deleted Evaluators. (distributeDataAcrossEvaluators) + * 4. Create transfer steps to satisfy the distribution of units. (getTransferSteps) + * Add these transfer steps to the plan. + */ + activeEvaluators.addAll(evaluatorsToAdd); + final Collection dataTypes = getDataTypes(activeEvaluators); for (final String dataType : dataTypes) { + final long sumData = getSumData(dataType, activeEvaluators); + final List evaluators = initOptimizedEvaluators(dataType, activeEvaluators); + distributeDataAcrossEvaluators(evaluators.subList(0, numEvaluators), sumData); - long sumData = 0; - for (final EvaluatorParameters evaluator : activeEvaluatorsList) { - for (final DataInfo dataInfo : evaluator.getDataInfos()) { - if (dataType.equals(dataInfo.getDataType())) { - sumData += dataInfo.getNumUnits(); - } + if (LOG.isLoggable(Level.FINE)) { + for (final OptimizedEvaluator evaluator : evaluators) { + LOG.log(Level.FINE, "RandomOptimizer data distribution: {0} {1}", + new Object[]{evaluator.id, evaluator.dataRequested}); } } - final List evaluators = getWrappedEvaluators(dataType, activeEvaluatorsList); - uniformlyRandomDataRequestPerEvaluator(evaluators.subList(0, numEvaluators), sumData); - - for (final OptimizedEvaluator evaluator : evaluators) { - LOG.log(Level.FINE, evaluator.toString()); - } - final List transferSteps = getTransferSteps(evaluators); planBuilder.addTransferSteps(transferSteps); - } - return planBuilder.build(); + + final Plan plan = planBuilder.build(); + LOG.log(Level.FINE, "RandomOptimizer Plan: {0}", plan); + return plan; } private int getEvaluatorsToUse(final int availableEvaluators) { @@ -151,13 +160,14 @@ private int getEvaluatorsToUse(final int availableEvaluators) { private List getNewEvaluators(final int numEvaluators) { final List newEvaluators = new ArrayList<>(); for (int i = 0; i < numEvaluators; i++) { - newEvaluators.add(getNewEvaluator(i)); + newEvaluators.add(getNewEvaluator()); } return newEvaluators; } - private EvaluatorParameters getNewEvaluator(final int index) { - return new EvaluatorParametersImpl("new-" + random.nextInt() + "-" + index, new ArrayList(0)); + private EvaluatorParameters getNewEvaluator() { + return new EvaluatorParametersImpl("newRandomOptimizerNode-" + newEvaluatorId.getAndIncrement(), + new ArrayList(0)); } private static Collection getDataTypes(final Collection evaluators) { @@ -170,9 +180,30 @@ private static Collection getDataTypes(final Collection getWrappedEvaluators(final String dataType, - final Collection evaluators) { - final List wrappedEvaluators = new ArrayList<>(evaluators.size()); + private static long getSumData(final String dataType, + final Collection evaluators) { + long sumData = 0; + for (final EvaluatorParameters evaluator : evaluators) { + for (final DataInfo dataInfo : evaluator.getDataInfos()) { + if (dataType.equals(dataInfo.getDataType())) { + sumData += dataInfo.getNumUnits(); + } + } + } + return sumData; + } + + /** + * Initialize an OptimizedEvaluator per Evaluator from the dataType's DataInfo. + * An OptimizedEvaluator keeps track of data at an Evaluator while distributing data and scheduling transfers. + * Checks for duplicate dataTypes. + * @param dataType + * @param evaluators EvaluatorParameters to create OptimizedEvaluators for + * @return list of OptimizedEvaluator's, one per EvaluatorParameter passed in + */ + private static List initOptimizedEvaluators(final String dataType, + final Collection evaluators) { + final List optimizedEvaluators = new ArrayList<>(evaluators.size()); for (final EvaluatorParameters parameters : evaluators) { boolean dataTypeAdded = false; for (final DataInfo dataInfo : parameters.getDataInfos()) { @@ -180,15 +211,15 @@ private static List getWrappedEvaluators(final String dataTy if (dataTypeAdded) { throw new IllegalArgumentException("Cannot have multiple infos for " + dataType); } - wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo)); + optimizedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo)); dataTypeAdded = true; } } if (!dataTypeAdded) { - wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataType)); + optimizedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataType)); } } - return wrappedEvaluators; + return optimizedEvaluators; } private static List getIds(final Collection evaluators) { @@ -199,24 +230,38 @@ private static List getIds(final Collection evaluat return ids; } - private void uniformlyRandomDataRequestPerEvaluator(final List evaluators, - final long totalData) { + /** + * Randomly redistribute all data units. + * The distribution is done by batching units (of 10) and + * sending each batch to an evaluator selected uniformly at random. + * @param evaluators Evaluators to distribute data to. Should *not* include to-be-deleted Evaluators. + * @param totalData the amount of data to distribute + */ + private void distributeDataAcrossEvaluators(final List evaluators, + final long totalData) { // Add each unit of data to a random evaluator. final int unit = 10; // Do remainder first final int remainder = (int) totalData % unit; - evaluators.get(randomEvaluatorIndex(evaluators.size())).setDataRequested(remainder); + evaluators.get(getRandomEvaluatorIndex(evaluators.size())).setDataRequested(remainder); // Do the rest in unit increments for (long i = 0; i < totalData / unit; i++) { - final OptimizedEvaluator evaluator = evaluators.get(randomEvaluatorIndex(evaluators.size())); + final OptimizedEvaluator evaluator = evaluators.get(getRandomEvaluatorIndex(evaluators.size())); evaluator.setDataRequested(evaluator.getDataRequested() + unit); } } + private int getRandomEvaluatorIndex(final int numEvaluators) { + return random.nextInt(numEvaluators); + } + /** + * Create transfer steps to satisfy redistributed data at the evaluators. * Iterate through the evaluators. When a evaluator needs more data, take it from the right side (circular). + * @param evaluators Evaluators that can participate in transfers. *Should* include to-be-deleted Evaluators. + * @return list of transfer steps */ private List getTransferSteps(final List evaluators) { final List transferSteps = new ArrayList<>(); @@ -224,9 +269,12 @@ private List getTransferSteps(final List evalu for (int i = 0; i < evaluators.size(); i++) { final OptimizedEvaluator dstEvaluator = evaluators.get(i); if (dstEvaluator.getDataRemaining() > 0) { + // Find srcEvaluator's with data units to transfer to dstEvaluator, + // starting from the next index and iterating through as a circular array. for (int j = 1; j < evaluators.size(); j++) { final OptimizedEvaluator srcEvaluator = evaluators.get((i + j) % evaluators.size()); if (srcEvaluator.getDataRemaining() < 0) { + // Mark all available units at srcEvaluator as transfer steps to be transferred to dstEvaluator. final int dataToTransfer = Math.min(dstEvaluator.getDataRemaining(), 0 - srcEvaluator.getDataRemaining()); srcEvaluator.sendData(dstEvaluator.getId(), dataToTransfer); dstEvaluator.receiveData(srcEvaluator.getId(), dataToTransfer); @@ -250,10 +298,6 @@ private List getTransferSteps(final List evalu return transferSteps; } - private int randomEvaluatorIndex(final int numEvaluators) { - return random.nextInt(numEvaluators); - } - private static class OptimizedEvaluator { private final String id; private final String dataType;