Skip to content

Commit

Permalink
Add comments and refactor RandomOptimizer for better readability (add…
Browse files Browse the repository at this point in the history
…ressing comments).
  • Loading branch information
bchocho committed Sep 27, 2015
1 parent 82508c5 commit 453c2b0
Showing 1 changed file with 85 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand All @@ -45,7 +46,7 @@
* {@link #getEvaluatorsToUse}
* 2. For each dataType, randomly redistributing all units -- by batching units (of 10) and
* sending each batch to an evaluator selected uniformly at random.
* {@link #uniformlyRandomDataRequestPerEvaluator}
* {@link #distributeDataAcrossEvaluators}
* 3. For each dataType, creating transfer steps by greedily taking data needed to
* satisfy the request from an evaluator from it's right side (in a circular array).
* {@link #getTransferSteps}
Expand All @@ -65,6 +66,8 @@ public static final class MaxEvaluatorsFraction implements Name<Double> {
private static final Logger LOG = Logger.getLogger(RandomOptimizer.class.getName());

private final Random random = new Random();
private final AtomicInteger newEvaluatorId = new AtomicInteger(0);

private final double minEvaluatorsFraction;
private final double maxEvaluatorsFraction;

Expand All @@ -84,7 +87,8 @@ private RandomOptimizer(@Parameter(MinEvaluatorsFraction.class) final double min
}

@Override
public Plan optimize(final Collection<EvaluatorParameters> activeEvaluators, final int availableEvaluators) {
public Plan optimize(final Collection<EvaluatorParameters> activeEvaluatorsCollection,
final int availableEvaluators) {
if (availableEvaluators <= 0) {
throw new IllegalArgumentException("availableEvaluators " + availableEvaluators + " must be > 0");
}
Expand All @@ -93,14 +97,14 @@ public Plan optimize(final Collection<EvaluatorParameters> activeEvaluators, fin

final List<EvaluatorParameters> evaluatorsToAdd;
final List<EvaluatorParameters> evaluatorsToDelete;
final List<EvaluatorParameters> activeEvaluatorsList = new ArrayList<>(activeEvaluators);
if (numEvaluators > activeEvaluatorsList.size()) {
final List<EvaluatorParameters> activeEvaluators = new ArrayList<>(activeEvaluatorsCollection);
if (numEvaluators > activeEvaluators.size()) {
evaluatorsToDelete = new ArrayList<>(0);
evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluatorsList.size()); // Add to the tail
} else if (numEvaluators < activeEvaluatorsList.size()) {
evaluatorsToAdd = getNewEvaluators(numEvaluators - activeEvaluators.size()); // Add to the tail
} else if (numEvaluators < activeEvaluators.size()) {
evaluatorsToAdd = new ArrayList<>(0);
evaluatorsToDelete = new ArrayList<>(
activeEvaluatorsList.subList(numEvaluators, activeEvaluatorsList.size())); // Delete from the tail
activeEvaluators.subList(numEvaluators, activeEvaluators.size())); // Delete from the tail
} else {
evaluatorsToAdd = new ArrayList<>(0);
evaluatorsToDelete = new ArrayList<>(0);
Expand All @@ -110,31 +114,36 @@ public Plan optimize(final Collection<EvaluatorParameters> activeEvaluators, fin
.addEvaluatorsToAdd(getIds(evaluatorsToAdd))
.addEvaluatorsToDelete(getIds(evaluatorsToDelete));

activeEvaluatorsList.addAll(evaluatorsToAdd);
final Collection<String> dataTypes = getDataTypes(activeEvaluatorsList);
/*
* For each dataType:
* 1. Find the sum of the number of all units across the Evaluators. (getSumData)
* 2. Initialize an OptimizedEvaluator per Evaluator from the dataType's DataInfo. (initOptimizedEvaluators)
* An OptimizedEvaluator keeps track of data at an Evaluator while distributing data and scheduling transfers.
* 3. Randomly distribute all units across the non-deleted Evaluators. (distributeDataAcrossEvaluators)
* 4. Create transfer steps to satisfy the distribution of units. (getTransferSteps)
* Add these transfer steps to the plan.
*/
activeEvaluators.addAll(evaluatorsToAdd);
final Collection<String> dataTypes = getDataTypes(activeEvaluators);
for (final String dataType : dataTypes) {
final long sumData = getSumData(dataType, activeEvaluators);
final List<OptimizedEvaluator> evaluators = initOptimizedEvaluators(dataType, activeEvaluators);
distributeDataAcrossEvaluators(evaluators.subList(0, numEvaluators), sumData);

long sumData = 0;
for (final EvaluatorParameters evaluator : activeEvaluatorsList) {
for (final DataInfo dataInfo : evaluator.getDataInfos()) {
if (dataType.equals(dataInfo.getDataType())) {
sumData += dataInfo.getNumUnits();
}
if (LOG.isLoggable(Level.FINE)) {
for (final OptimizedEvaluator evaluator : evaluators) {
LOG.log(Level.FINE, "RandomOptimizer data distribution: {0} {1}",
new Object[]{evaluator.id, evaluator.dataRequested});
}
}

final List<OptimizedEvaluator> evaluators = getWrappedEvaluators(dataType, activeEvaluatorsList);
uniformlyRandomDataRequestPerEvaluator(evaluators.subList(0, numEvaluators), sumData);

for (final OptimizedEvaluator evaluator : evaluators) {
LOG.log(Level.FINE, evaluator.toString());
}

final List<TransferStep> transferSteps = getTransferSteps(evaluators);
planBuilder.addTransferSteps(transferSteps);

}
return planBuilder.build();

final Plan plan = planBuilder.build();
LOG.log(Level.FINE, "RandomOptimizer Plan: {0}", plan);
return plan;
}

private int getEvaluatorsToUse(final int availableEvaluators) {
Expand All @@ -151,13 +160,14 @@ private int getEvaluatorsToUse(final int availableEvaluators) {
private List<EvaluatorParameters> getNewEvaluators(final int numEvaluators) {
final List<EvaluatorParameters> newEvaluators = new ArrayList<>();
for (int i = 0; i < numEvaluators; i++) {
newEvaluators.add(getNewEvaluator(i));
newEvaluators.add(getNewEvaluator());
}
return newEvaluators;
}

private EvaluatorParameters getNewEvaluator(final int index) {
return new EvaluatorParametersImpl("new-" + random.nextInt() + "-" + index, new ArrayList<DataInfo>(0));
private EvaluatorParameters getNewEvaluator() {
return new EvaluatorParametersImpl("newRandomOptimizerNode-" + newEvaluatorId.getAndIncrement(),
new ArrayList<DataInfo>(0));
}

private static Collection<String> getDataTypes(final Collection<EvaluatorParameters> evaluators) {
Expand All @@ -170,25 +180,46 @@ private static Collection<String> getDataTypes(final Collection<EvaluatorParamet
return dataTypes;
}

private static List<OptimizedEvaluator> getWrappedEvaluators(final String dataType,
final Collection<EvaluatorParameters> evaluators) {
final List<OptimizedEvaluator> wrappedEvaluators = new ArrayList<>(evaluators.size());
private static long getSumData(final String dataType,
final Collection<EvaluatorParameters> evaluators) {
long sumData = 0;
for (final EvaluatorParameters evaluator : evaluators) {
for (final DataInfo dataInfo : evaluator.getDataInfos()) {
if (dataType.equals(dataInfo.getDataType())) {
sumData += dataInfo.getNumUnits();
}
}
}
return sumData;
}

/**
* Initialize an OptimizedEvaluator per Evaluator from the dataType's DataInfo.
* An OptimizedEvaluator keeps track of data at an Evaluator while distributing data and scheduling transfers.
* Checks for duplicate dataTypes.
* @param dataType
* @param evaluators EvaluatorParameters to create OptimizedEvaluators for
* @return list of OptimizedEvaluator's, one per EvaluatorParameter passed in
*/
private static List<OptimizedEvaluator> initOptimizedEvaluators(final String dataType,
final Collection<EvaluatorParameters> evaluators) {
final List<OptimizedEvaluator> optimizedEvaluators = new ArrayList<>(evaluators.size());
for (final EvaluatorParameters parameters : evaluators) {
boolean dataTypeAdded = false;
for (final DataInfo dataInfo : parameters.getDataInfos()) {
if (dataType.equals(dataInfo.getDataType())) {
if (dataTypeAdded) {
throw new IllegalArgumentException("Cannot have multiple infos for " + dataType);
}
wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo));
optimizedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataInfo));
dataTypeAdded = true;
}
}
if (!dataTypeAdded) {
wrappedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataType));
optimizedEvaluators.add(new OptimizedEvaluator(parameters.getId(), dataType));
}
}
return wrappedEvaluators;
return optimizedEvaluators;
}

private static List<String> getIds(final Collection<EvaluatorParameters> evaluators) {
Expand All @@ -199,34 +230,51 @@ private static List<String> getIds(final Collection<EvaluatorParameters> evaluat
return ids;
}

private void uniformlyRandomDataRequestPerEvaluator(final List<OptimizedEvaluator> evaluators,
final long totalData) {
/**
* Randomly redistribute all data units.
* The distribution is done by batching units (of 10) and
* sending each batch to an evaluator selected uniformly at random.
* @param evaluators Evaluators to distribute data to. Should *not* include to-be-deleted Evaluators.
* @param totalData the amount of data to distribute
*/
private void distributeDataAcrossEvaluators(final List<OptimizedEvaluator> evaluators,
final long totalData) {
// Add each unit of data to a random evaluator.
final int unit = 10;

// Do remainder first
final int remainder = (int) totalData % unit;
evaluators.get(randomEvaluatorIndex(evaluators.size())).setDataRequested(remainder);
evaluators.get(getRandomEvaluatorIndex(evaluators.size())).setDataRequested(remainder);

// Do the rest in unit increments
for (long i = 0; i < totalData / unit; i++) {
final OptimizedEvaluator evaluator = evaluators.get(randomEvaluatorIndex(evaluators.size()));
final OptimizedEvaluator evaluator = evaluators.get(getRandomEvaluatorIndex(evaluators.size()));
evaluator.setDataRequested(evaluator.getDataRequested() + unit);
}
}

private int getRandomEvaluatorIndex(final int numEvaluators) {
return random.nextInt(numEvaluators);
}

/**
* Create transfer steps to satisfy redistributed data at the evaluators.
* Iterate through the evaluators. When a evaluator needs more data, take it from the right side (circular).
* @param evaluators Evaluators that can participate in transfers. *Should* include to-be-deleted Evaluators.
* @return list of transfer steps
*/
private List<TransferStep> getTransferSteps(final List<OptimizedEvaluator> evaluators) {
final List<TransferStep> transferSteps = new ArrayList<>();

for (int i = 0; i < evaluators.size(); i++) {
final OptimizedEvaluator dstEvaluator = evaluators.get(i);
if (dstEvaluator.getDataRemaining() > 0) {
// Find srcEvaluator's with data units to transfer to dstEvaluator,
// starting from the next index and iterating through as a circular array.
for (int j = 1; j < evaluators.size(); j++) {
final OptimizedEvaluator srcEvaluator = evaluators.get((i + j) % evaluators.size());
if (srcEvaluator.getDataRemaining() < 0) {
// Mark all available units at srcEvaluator as transfer steps to be transferred to dstEvaluator.
final int dataToTransfer = Math.min(dstEvaluator.getDataRemaining(), 0 - srcEvaluator.getDataRemaining());
srcEvaluator.sendData(dstEvaluator.getId(), dataToTransfer);
dstEvaluator.receiveData(srcEvaluator.getId(), dataToTransfer);
Expand All @@ -250,10 +298,6 @@ private List<TransferStep> getTransferSteps(final List<OptimizedEvaluator> evalu
return transferSteps;
}

private int randomEvaluatorIndex(final int numEvaluators) {
return random.nextInt(numEvaluators);
}

private static class OptimizedEvaluator {
private final String id;
private final String dataType;
Expand Down

0 comments on commit 453c2b0

Please sign in to comment.