-
Notifications
You must be signed in to change notification settings - Fork 25.2k
Allow balancing weights to be set per tier #126091
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Allow balancing weights to be set per tier #126091
Conversation
) { | ||
this.writeLoadForecaster = writeLoadForecaster; | ||
this.allocation = allocation; | ||
this.routingNodes = allocation.routingNodes(); | ||
this.metadata = allocation.metadata(); | ||
this.weightFunction = weightFunction; | ||
this.threshold = threshold; | ||
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes); | ||
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes); | ||
avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left the averages calculated globally, I did experiment with making them local to the partition, but I'm not sure of the benefit of this. I think it might be more costly to filter the nodes and shards by partition to calculate these numbers. But definitely something we can pursue if we think it's worthwhile.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the split would be significant if we had substantially differently-sized search and indexing tiers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe this wouldn't be costly -- just more complex -- if we pre-calculated primary vs replica counts when setting up ProjectMetadata#totalNumberOfShards
? Similarly with the other values. But.. that is quite a bit of fiddly code to maintain.
I wonder, though, whether this code could already be doing weird things to our balance calculations in serverless. For example, hypothetically, if the search tier has 3x the number of shards -- say 3 replica copies per shard -- then the index tier nodes are going to increase the shard weight component (balancing-factor-constant x difference-from-average).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we would probably want to base the counting on ShardRole
, because perhaps there might be a future where primaries live in the search tier for read-only indices? (wild speculation there, but you never know). That would keep the logic for deciding "which shards are search/indexing shards" in one place, in the StatelessShardRoutingRoleStrategy
.
I wonder, though, whether this code could already be doing weird things to our balance calculations in serverless.
Yes, it's a good point. Even just if there are different sized indexing and search tiers, some weights could never be balanced. e.g. if there were 5 indexing nodes and 1 search node, with an index with 5 shards, 1 replica. The average shard count would be 10/6 = 1.66~ so the indexing nodes would all be under-weight and the search node massively overweight in terms of shard count (I think).
// Balance each partition | ||
for (NodeSorter nodeSorter : partitionedNodeSorter.allNodeSorters()) { | ||
balanceByWeights(nodeSorter); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We balance once for each NodeSorter
(i.e. once for each partition)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be more clear to use the node sorter in a "partition" class/holder so we're going through the partitions, rather than the sorter. It might also be a naming thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I know what you mean. I've made it now so NodeSorters
is an Iterable<NodeSorter>
the "partition" terminology has faded from the implementation somewhat, but I think it's an important concept.
I added some javadoc to NodeSorter
in b9403bb to make it clear it's scoped to a partition. I did have the partition terminology more front-and-centre in earlier versions of the PR but it also didn't look quite right.
Also, we don't always need NodeSorter
s. See in NodeAllocationStatsAndWeightsCalculator
we use the BalancingWeightsFactory
to create the BalancingWeights
but we don't ever call createNodeSorters
, so refactoring to just return e.g. a PartitionedCluster
that contained Partition
s which each hold a NodeSorter
and a WeightFunction
, though conceptually cleanest, won't fit the way we use the API.
…hts and BalancedShardsAllocator
…hts and BalancedShardsAllocator
Hi @nicktindall, I've created a changelog YAML for you. |
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
@@ -1323,7 +1299,7 @@ public boolean containsShard(ShardRouting shard) { | |||
} | |||
} | |||
|
|||
static final class NodeSorter extends IntroSorter { | |||
public static final class NodeSorter extends IntroSorter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
annoyingly these need to become public so the StatelessBalancingWeightsFactory
can see/instantiate them
# Conflicts: # server/src/main/java/org/elasticsearch/cluster/ClusterModule.java # server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java # server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java # server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java # server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java # server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java # server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java # test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Great work!
// Balance each partition | ||
for (NodeSorter nodeSorter : partitionedNodeSorter.allNodeSorters()) { | ||
balanceByWeights(nodeSorter); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if it would be more clear to use the node sorter in a "partition" class/holder so we're going through the partitions, rather than the sorter. It might also be a naming thing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@henningandersen made the good point on my first attempt at this, that that approach did make some assumptions about the way the allocation deciders would govern shard movement. This iteration is an attempt to make those assumptions more explicit by introducing actual partitioning of the nodes.
Approach
Instead of assuming the
BalancedShardsAllocator
applies to the entire cluster, I've added the concept of "partitions" into the balancing. The partitions must be mutually disjoint subsets of the shards and nodes - i.e. the set of shards in a partition are only ever allocated to the set of nodes in the same partition, as is the case in serverless.WeightFunction
s andNodeSorter
s are scoped to partitions.The status quo behaviour is defined by the
GlobalPartitionedClusterFactory
, it produces a single global partition. The serverless version is calledTieredPartitionedClusterFactory
and will define a partition for each of the search and indexing tiers.The serverless code is in here for now, but it will be moved to serverless if we think this is a sound approach.
Alternatives
It would be nice if we could just do the partitioning on the
RoutingAllocation
and just run aBalancer
for each of the partitions (search and indexing), but the contents of e.g.Metadata
andRoutingNodes
is so heavily cross-referenced and aggregated it might be tricky to pull apart.A better approach might be to do the partitioning at the point that e.g. the
RoutingNodes
is being generated from theGlobalRoutingTable
. But that seems like a more impactful refactor.