Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix primary shard balance parameter #17352

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,8 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryThresholdWeight);
return params.weight(constraints);
}

public Constraint getConstraint(String constraint) {
return this.constraints.get(constraint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
*/
public class Constraint implements Predicate<Constraint.ConstraintParams> {

private boolean enable;
private Predicate<ConstraintParams> predicate;
private volatile boolean enable;
private final Predicate<ConstraintParams> predicate;

public Constraint(Predicate<ConstraintParams> constraintPredicate) {
this.predicate = constraintPredicate;
Expand All @@ -40,6 +40,10 @@ public void setEnable(boolean enable) {
this.enable = enable;
}

public boolean isEnable() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isEnabled

return this.enable;
}

static class ConstraintParams {
private ShardsBalancer balancer;
private BalancedShardsAllocator.ModelNode node;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ public long weight(ShardsBalancer balancer, BalancedShardsAllocator.ModelNode no
Constraint.ConstraintParams params = new Constraint.ConstraintParams(balancer, node, index, primaryConstraintThreshold);
return params.weight(constraints);
}

public Constraint getConstraint(String constraint) {
return this.constraints.get(constraint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -204,12 +204,8 @@ public class BalancedShardsAllocator implements ShardsAllocator {

private volatile boolean preferPrimaryShardBalance;
private volatile boolean preferPrimaryShardRebalance;
private volatile float preferPrimaryShardRebalanceBuffer;
private volatile float indexBalanceFactor;
private volatile float shardBalanceFactor;
private volatile WeightFunction weightFunction;
private final WeightFunction weightFunction;
private volatile float threshold;
private volatile long primaryConstraintThreshold;

private volatile boolean ignoreThrottleInRestore;
private volatile TimeValue allocatorTimeout;
Expand All @@ -222,11 +218,13 @@ public BalancedShardsAllocator(Settings settings) {

@Inject
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) {
setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings));
setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings));
setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings));
this.weightFunction = new WeightFunction(
INDEX_BALANCE_FACTOR_SETTING.get(settings),
SHARD_BALANCE_FACTOR_SETTING.get(settings),
PRIMARY_SHARD_REBALANCE_BUFFER.get(settings),
PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings)
);
setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings));
updateWeightFunction();
setThreshold(THRESHOLD_SETTING.get(settings));
setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings));
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings));
Expand All @@ -236,9 +234,9 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst);
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::setIndexBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalanceFactor);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer);
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance);
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold);
clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting);
Expand Down Expand Up @@ -278,54 +276,31 @@ private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrateg
}

private void setIndexBalanceFactor(float indexBalanceFactor) {
this.indexBalanceFactor = indexBalanceFactor;
this.weightFunction.setIndexBalance(indexBalanceFactor);
}

private void setShardBalanceFactor(float shardBalanceFactor) {
this.shardBalanceFactor = shardBalanceFactor;
}

private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebalanceBuffer) {
this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardRebalanceBuffer;
}

private void updateIndexBalanceFactor(float indexBalanceFactor) {
this.indexBalanceFactor = indexBalanceFactor;
updateWeightFunction();
this.weightFunction.setShardBalance(shardBalanceFactor);
}

private void updateShardBalanceFactor(float shardBalanceFactor) {
this.shardBalanceFactor = shardBalanceFactor;
updateWeightFunction();
}

private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) {
this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardBalanceBuffer;
updateWeightFunction();
}

private void updateWeightFunction() {
weightFunction = new WeightFunction(
this.indexBalanceFactor,
this.shardBalanceFactor,
this.preferPrimaryShardRebalanceBuffer,
this.primaryConstraintThreshold
);
void setPreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) {
this.weightFunction.setPreferPrimaryBalanceBuffer(preferPrimaryShardBalanceBuffer);
setPreferPrimaryShardRebalance(this.preferPrimaryShardRebalance);
}

/**
* When primary shards balance is desired, enable primary shard balancing constraints
* @param preferPrimaryShardBalance boolean to prefer balancing by primary shard
*/
private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) {
this.preferPrimaryShardBalance = preferPrimaryShardBalance;
this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another concern, do we need to ensure that we update all the constraints in AllocationConstraints or RebalanceConstraints at the same time to avoid having only some of the constraints enabled during allocation or rebalance

this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance);
}

private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) {
void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) {
this.preferPrimaryShardRebalance = preferPrimaryShardRebalance;
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance);
this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance);
}

Expand All @@ -334,8 +309,7 @@ private void setThreshold(float threshold) {
}

private void setPrimaryConstraintThresholdSetting(long threshold) {
this.primaryConstraintThreshold = threshold;
this.weightFunction.updatePrimaryConstraintThreshold(threshold);
this.weightFunction.setPrimaryConstraintThreshold(threshold);
}

private void setAllocatorTimeout(TimeValue allocatorTimeout) {
Expand Down Expand Up @@ -461,14 +435,18 @@ public float getThreshold() {
* Returns the index related weight factor.
*/
public float getIndexBalance() {
return weightFunction.indexBalance;
return weightFunction.balanceFactor.indexBalance;
}

/**
* Returns the shard related weight factor.
*/
public float getShardBalance() {
return weightFunction.shardBalance;
return weightFunction.balanceFactor.shardBalance;
}

WeightFunction getWeightFunction() {
return weightFunction;
}

/**
Expand Down Expand Up @@ -505,35 +483,23 @@ public boolean getPreferPrimaryBalance() {
* package-private for testing
*/
static class WeightFunction {

private final float indexBalance;
private final float shardBalance;
private final float theta0;
private final float theta1;
private long primaryConstraintThreshold;
private AllocationConstraints constraints;
private RebalanceConstraints rebalanceConstraints;
private volatile BalanceFactor balanceFactor;
private volatile long primaryConstraintThreshold;
private final AllocationConstraints allocationConstraints;
private volatile RebalanceConstraints rebalanceConstraints;

WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
this.balanceFactor = new BalanceFactor(indexBalance, shardBalance);
this.primaryConstraintThreshold = primaryConstraintThreshold;
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.constraints = new AllocationConstraints();
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
this.allocationConstraints = new AllocationConstraints();
setPreferPrimaryBalanceBuffer(preferPrimaryBalanceBuffer);
// Enable index shard per node breach constraint
updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true);
}

public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) {
float balancerWeight = weight(balancer, node, index);
return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold);
return balancerWeight + allocationConstraints.weight(balancer, node, index, primaryConstraintThreshold);
}

public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) {
Expand All @@ -544,20 +510,59 @@ public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode n
float weight(ShardsBalancer balancer, ModelNode node, String index) {
final float weightShard = node.numShards() - balancer.avgShardsPerNode();
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index);
return theta0 * weightShard + theta1 * weightIndex;
return this.balanceFactor.theta0 * weightShard + this.balanceFactor.theta1 * weightIndex;
}

void updateAllocationConstraint(String constraint, boolean enable) {
this.constraints.updateAllocationConstraint(constraint, enable);
this.allocationConstraints.updateAllocationConstraint(constraint, enable);
}

void updateRebalanceConstraint(String constraint, boolean add) {
this.rebalanceConstraints.updateRebalanceConstraint(constraint, add);
}

void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) {
void setPreferPrimaryBalanceBuffer(float preferPrimaryBalanceBuffer) {
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer);
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter);
}

void setIndexBalance(float indexBalance) {
this.balanceFactor = new BalanceFactor(indexBalance, this.balanceFactor.shardBalance);
}

void setShardBalance(float shardBalance) {
this.balanceFactor = new BalanceFactor(this.balanceFactor.indexBalance, shardBalance);
}

void setPrimaryConstraintThreshold(long primaryConstraintThreshold) {
this.primaryConstraintThreshold = primaryConstraintThreshold;
}

AllocationConstraints getAllocationConstraints() {
return allocationConstraints;
}

RebalanceConstraints getRebalanceConstraints() {
return rebalanceConstraints;
}

static class BalanceFactor {
final float indexBalance;
final float shardBalance;
final float theta0;
final float theta1;

BalanceFactor(float indexBalance, float shardBalance) {
float sum = indexBalance + shardBalance;
if (sum <= 0.0f) {
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum);
}
theta0 = shardBalance / sum;
theta1 = indexBalance / sum;
this.indexBalance = indexBalance;
this.shardBalance = shardBalance;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import org.opensearch.cluster.routing.RoutingTable;
import org.opensearch.cluster.routing.ShardRouting;
import org.opensearch.cluster.routing.ShardRoutingState;
import org.opensearch.cluster.routing.allocation.AllocationConstraints;
import org.opensearch.cluster.routing.allocation.RebalanceConstraints;
import org.opensearch.cluster.routing.allocation.RoutingAllocation;
import org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.WeightFunction;
import org.opensearch.cluster.routing.allocation.decider.AllocationDecider;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
Expand All @@ -45,7 +48,12 @@

import static org.opensearch.cluster.routing.ShardRoutingState.INITIALIZING;
import static org.opensearch.cluster.routing.ShardRoutingState.STARTED;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.ConstraintTypes.INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.ALLOCATOR_TIMEOUT_SETTING;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.INDEX_BALANCE_FACTOR_SETTING;
import static org.opensearch.cluster.routing.allocation.allocator.BalancedShardsAllocator.PREFER_PRIMARY_SHARD_BALANCE;

public class TimeBoundBalancedShardsAllocatorTests extends OpenSearchAllocationTestCase {

Expand Down Expand Up @@ -561,6 +569,29 @@ public void testAllocatorTimeout() {
assertEquals(-1, ALLOCATOR_TIMEOUT_SETTING.get(build).getMillis());
}

public void testUpdateWeightFunction() {
Settings settings = Settings.builder().put(PREFER_PRIMARY_SHARD_BALANCE.getKey(), true).build();
BalancedShardsAllocator allocator = new BalancedShardsAllocator(settings);
assertEquals(INDEX_BALANCE_FACTOR_SETTING.get(Settings.EMPTY), allocator.getIndexBalance(), 0);
WeightFunction weightFunction = allocator.getWeightFunction();
AllocationConstraints constraints = weightFunction.getAllocationConstraints();
assertTrue(constraints.getConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID).isEnable());
assertTrue(constraints.getConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID).isEnable());
RebalanceConstraints rebalanceConstraints = weightFunction.getRebalanceConstraints();
assertFalse(rebalanceConstraints.getConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID).isEnable());
assertFalse(rebalanceConstraints.getConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID).isEnable());

allocator.setPreferPrimaryShardRebalance(true);
rebalanceConstraints = weightFunction.getRebalanceConstraints();
assertTrue(rebalanceConstraints.getConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID).isEnable());
assertTrue(rebalanceConstraints.getConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID).isEnable());

allocator.setPreferPrimaryShardBalanceBuffer(0.5f);
rebalanceConstraints = weightFunction.getRebalanceConstraints();
assertTrue(rebalanceConstraints.getConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID).isEnable());
assertTrue(rebalanceConstraints.getConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID).isEnable());
}

private RoutingTable buildRoutingTable(Metadata metadata) {
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
for (Map.Entry<String, IndexMetadata> entry : metadata.getIndices().entrySet()) {
Expand Down
Loading