-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix primary shard balance parameter #17352
Open
bugmakerrrrrr
wants to merge
1
commit into
opensearch-project:main
Choose a base branch
from
bugmakerrrrrr:balance_param
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+120
−72
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -204,12 +204,8 @@ public class BalancedShardsAllocator implements ShardsAllocator { | |
|
||
private volatile boolean preferPrimaryShardBalance; | ||
private volatile boolean preferPrimaryShardRebalance; | ||
private volatile float preferPrimaryShardRebalanceBuffer; | ||
private volatile float indexBalanceFactor; | ||
private volatile float shardBalanceFactor; | ||
private volatile WeightFunction weightFunction; | ||
private final WeightFunction weightFunction; | ||
private volatile float threshold; | ||
private volatile long primaryConstraintThreshold; | ||
|
||
private volatile boolean ignoreThrottleInRestore; | ||
private volatile TimeValue allocatorTimeout; | ||
|
@@ -222,11 +218,13 @@ public BalancedShardsAllocator(Settings settings) { | |
|
||
@Inject | ||
public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSettings) { | ||
setShardBalanceFactor(SHARD_BALANCE_FACTOR_SETTING.get(settings)); | ||
setIndexBalanceFactor(INDEX_BALANCE_FACTOR_SETTING.get(settings)); | ||
setPreferPrimaryShardRebalanceBuffer(PRIMARY_SHARD_REBALANCE_BUFFER.get(settings)); | ||
this.weightFunction = new WeightFunction( | ||
INDEX_BALANCE_FACTOR_SETTING.get(settings), | ||
SHARD_BALANCE_FACTOR_SETTING.get(settings), | ||
PRIMARY_SHARD_REBALANCE_BUFFER.get(settings), | ||
PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings) | ||
); | ||
setIgnoreThrottleInRestore(IGNORE_THROTTLE_FOR_REMOTE_RESTORE.get(settings)); | ||
updateWeightFunction(); | ||
setThreshold(THRESHOLD_SETTING.get(settings)); | ||
setPrimaryConstraintThresholdSetting(PRIMARY_CONSTRAINT_THRESHOLD_SETTING.get(settings)); | ||
setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); | ||
|
@@ -236,9 +234,9 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting | |
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); | ||
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); | ||
clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); | ||
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::updateIndexBalanceFactor); | ||
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::updateShardBalanceFactor); | ||
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); | ||
clusterSettings.addSettingsUpdateConsumer(INDEX_BALANCE_FACTOR_SETTING, this::setIndexBalanceFactor); | ||
clusterSettings.addSettingsUpdateConsumer(SHARD_BALANCE_FACTOR_SETTING, this::setShardBalanceFactor); | ||
clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::setPreferPrimaryShardBalanceBuffer); | ||
clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); | ||
clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); | ||
clusterSettings.addSettingsUpdateConsumer(PRIMARY_CONSTRAINT_THRESHOLD_SETTING, this::setPrimaryConstraintThresholdSetting); | ||
|
@@ -278,54 +276,31 @@ private void setShardMovementStrategy(ShardMovementStrategy shardMovementStrateg | |
} | ||
|
||
private void setIndexBalanceFactor(float indexBalanceFactor) { | ||
this.indexBalanceFactor = indexBalanceFactor; | ||
this.weightFunction.setIndexBalance(indexBalanceFactor); | ||
} | ||
|
||
private void setShardBalanceFactor(float shardBalanceFactor) { | ||
this.shardBalanceFactor = shardBalanceFactor; | ||
} | ||
|
||
private void setPreferPrimaryShardRebalanceBuffer(float preferPrimaryShardRebalanceBuffer) { | ||
this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardRebalanceBuffer; | ||
} | ||
|
||
private void updateIndexBalanceFactor(float indexBalanceFactor) { | ||
this.indexBalanceFactor = indexBalanceFactor; | ||
updateWeightFunction(); | ||
this.weightFunction.setShardBalance(shardBalanceFactor); | ||
} | ||
|
||
private void updateShardBalanceFactor(float shardBalanceFactor) { | ||
this.shardBalanceFactor = shardBalanceFactor; | ||
updateWeightFunction(); | ||
} | ||
|
||
private void updatePreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) { | ||
this.preferPrimaryShardRebalanceBuffer = preferPrimaryShardBalanceBuffer; | ||
updateWeightFunction(); | ||
} | ||
|
||
private void updateWeightFunction() { | ||
weightFunction = new WeightFunction( | ||
this.indexBalanceFactor, | ||
this.shardBalanceFactor, | ||
this.preferPrimaryShardRebalanceBuffer, | ||
this.primaryConstraintThreshold | ||
); | ||
void setPreferPrimaryShardBalanceBuffer(float preferPrimaryShardBalanceBuffer) { | ||
this.weightFunction.setPreferPrimaryBalanceBuffer(preferPrimaryShardBalanceBuffer); | ||
setPreferPrimaryShardRebalance(this.preferPrimaryShardRebalance); | ||
} | ||
|
||
/** | ||
* When primary shards balance is desired, enable primary shard balancing constraints | ||
* @param preferPrimaryShardBalance boolean to prefer balancing by primary shard | ||
*/ | ||
private void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { | ||
void setPreferPrimaryShardBalance(boolean preferPrimaryShardBalance) { | ||
this.preferPrimaryShardBalance = preferPrimaryShardBalance; | ||
this.weightFunction.updateAllocationConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. another concern, do we need to ensure that we update all the constraints in |
||
this.weightFunction.updateAllocationConstraint(CLUSTER_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); | ||
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardBalance); | ||
} | ||
|
||
private void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) { | ||
void setPreferPrimaryShardRebalance(boolean preferPrimaryShardRebalance) { | ||
this.preferPrimaryShardRebalance = preferPrimaryShardRebalance; | ||
this.weightFunction.updateRebalanceConstraint(INDEX_PRIMARY_SHARD_BALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance); | ||
this.weightFunction.updateRebalanceConstraint(CLUSTER_PRIMARY_SHARD_REBALANCE_CONSTRAINT_ID, preferPrimaryShardRebalance); | ||
} | ||
|
||
|
@@ -334,8 +309,7 @@ private void setThreshold(float threshold) { | |
} | ||
|
||
private void setPrimaryConstraintThresholdSetting(long threshold) { | ||
this.primaryConstraintThreshold = threshold; | ||
this.weightFunction.updatePrimaryConstraintThreshold(threshold); | ||
this.weightFunction.setPrimaryConstraintThreshold(threshold); | ||
} | ||
|
||
private void setAllocatorTimeout(TimeValue allocatorTimeout) { | ||
|
@@ -461,14 +435,18 @@ public float getThreshold() { | |
* Returns the index related weight factor. | ||
*/ | ||
public float getIndexBalance() { | ||
return weightFunction.indexBalance; | ||
return weightFunction.balanceFactor.indexBalance; | ||
} | ||
|
||
/** | ||
* Returns the shard related weight factor. | ||
*/ | ||
public float getShardBalance() { | ||
return weightFunction.shardBalance; | ||
return weightFunction.balanceFactor.shardBalance; | ||
} | ||
|
||
WeightFunction getWeightFunction() { | ||
return weightFunction; | ||
} | ||
|
||
/** | ||
|
@@ -505,35 +483,23 @@ public boolean getPreferPrimaryBalance() { | |
* package-private for testing | ||
*/ | ||
static class WeightFunction { | ||
|
||
private final float indexBalance; | ||
private final float shardBalance; | ||
private final float theta0; | ||
private final float theta1; | ||
private long primaryConstraintThreshold; | ||
private AllocationConstraints constraints; | ||
private RebalanceConstraints rebalanceConstraints; | ||
private volatile BalanceFactor balanceFactor; | ||
private volatile long primaryConstraintThreshold; | ||
private final AllocationConstraints allocationConstraints; | ||
private volatile RebalanceConstraints rebalanceConstraints; | ||
|
||
WeightFunction(float indexBalance, float shardBalance, float preferPrimaryBalanceBuffer, long primaryConstraintThreshold) { | ||
float sum = indexBalance + shardBalance; | ||
if (sum <= 0.0f) { | ||
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); | ||
} | ||
theta0 = shardBalance / sum; | ||
theta1 = indexBalance / sum; | ||
this.indexBalance = indexBalance; | ||
this.shardBalance = shardBalance; | ||
this.balanceFactor = new BalanceFactor(indexBalance, shardBalance); | ||
this.primaryConstraintThreshold = primaryConstraintThreshold; | ||
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); | ||
this.constraints = new AllocationConstraints(); | ||
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); | ||
this.allocationConstraints = new AllocationConstraints(); | ||
setPreferPrimaryBalanceBuffer(preferPrimaryBalanceBuffer); | ||
// Enable index shard per node breach constraint | ||
updateAllocationConstraint(INDEX_SHARD_PER_NODE_BREACH_CONSTRAINT_ID, true); | ||
} | ||
|
||
public float weightWithAllocationConstraints(ShardsBalancer balancer, ModelNode node, String index) { | ||
float balancerWeight = weight(balancer, node, index); | ||
return balancerWeight + constraints.weight(balancer, node, index, primaryConstraintThreshold); | ||
return balancerWeight + allocationConstraints.weight(balancer, node, index, primaryConstraintThreshold); | ||
} | ||
|
||
public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode node, String index) { | ||
|
@@ -544,20 +510,59 @@ public float weightWithRebalanceConstraints(ShardsBalancer balancer, ModelNode n | |
float weight(ShardsBalancer balancer, ModelNode node, String index) { | ||
final float weightShard = node.numShards() - balancer.avgShardsPerNode(); | ||
final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); | ||
return theta0 * weightShard + theta1 * weightIndex; | ||
return this.balanceFactor.theta0 * weightShard + this.balanceFactor.theta1 * weightIndex; | ||
} | ||
|
||
void updateAllocationConstraint(String constraint, boolean enable) { | ||
this.constraints.updateAllocationConstraint(constraint, enable); | ||
this.allocationConstraints.updateAllocationConstraint(constraint, enable); | ||
} | ||
|
||
void updateRebalanceConstraint(String constraint, boolean add) { | ||
this.rebalanceConstraints.updateRebalanceConstraint(constraint, add); | ||
} | ||
|
||
void updatePrimaryConstraintThreshold(long primaryConstraintThreshold) { | ||
void setPreferPrimaryBalanceBuffer(float preferPrimaryBalanceBuffer) { | ||
RebalanceParameter rebalanceParameter = new RebalanceParameter(preferPrimaryBalanceBuffer); | ||
this.rebalanceConstraints = new RebalanceConstraints(rebalanceParameter); | ||
} | ||
|
||
void setIndexBalance(float indexBalance) { | ||
this.balanceFactor = new BalanceFactor(indexBalance, this.balanceFactor.shardBalance); | ||
} | ||
|
||
void setShardBalance(float shardBalance) { | ||
this.balanceFactor = new BalanceFactor(this.balanceFactor.indexBalance, shardBalance); | ||
} | ||
|
||
void setPrimaryConstraintThreshold(long primaryConstraintThreshold) { | ||
this.primaryConstraintThreshold = primaryConstraintThreshold; | ||
} | ||
|
||
AllocationConstraints getAllocationConstraints() { | ||
return allocationConstraints; | ||
} | ||
|
||
RebalanceConstraints getRebalanceConstraints() { | ||
return rebalanceConstraints; | ||
} | ||
|
||
static class BalanceFactor { | ||
final float indexBalance; | ||
final float shardBalance; | ||
final float theta0; | ||
final float theta1; | ||
|
||
BalanceFactor(float indexBalance, float shardBalance) { | ||
float sum = indexBalance + shardBalance; | ||
if (sum <= 0.0f) { | ||
throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); | ||
} | ||
theta0 = shardBalance / sum; | ||
theta1 = indexBalance / sum; | ||
this.indexBalance = indexBalance; | ||
this.shardBalance = shardBalance; | ||
} | ||
} | ||
} | ||
|
||
/** | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
isEnabled