Skip to content

Allow balancing weights to be set per tier #126091

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 34 commits into
base: main
Choose a base branch
from

Conversation

nicktindall
Copy link
Contributor

@nicktindall nicktindall commented Apr 2, 2025

@henningandersen made the good point on my first attempt at this, that that approach did make some assumptions about the way the allocation deciders would govern shard movement. This iteration is an attempt to make those assumptions more explicit by introducing actual partitioning of the nodes.

Approach

Instead of assuming the BalancedShardsAllocator applies to the entire cluster, I've added the concept of "partitions" into the balancing. The partitions must be mutually disjoint subsets of the shards and nodes - i.e. the set of shards in a partition are only ever allocated to the set of nodes in the same partition, as is the case in serverless. WeightFunctions and NodeSorters are scoped to partitions.

The status quo behaviour is defined by the GlobalPartitionedClusterFactory, it produces a single global partition. The serverless version is called TieredPartitionedClusterFactory and will define a partition for each of the search and indexing tiers.

The serverless code is in here for now, but it will be moved to serverless if we think this is a sound approach.

Alternatives

It would be nice if we could just do the partitioning on the RoutingAllocation and just run a Balancer for each of the partitions (search and indexing), but the contents of e.g. Metadata and RoutingNodes is so heavily cross-referenced and aggregated it might be tricky to pull apart.

A better approach might be to do the partitioning at the point that e.g. the RoutingNodes is being generated from the GlobalRoutingTable. But that seems like a more impactful refactor.

) {
this.writeLoadForecaster = writeLoadForecaster;
this.allocation = allocation;
this.routingNodes = allocation.routingNodes();
this.metadata = allocation.metadata();
this.weightFunction = weightFunction;
this.threshold = threshold;
avgShardsPerNode = WeightFunction.avgShardPerNode(metadata, routingNodes);
avgWriteLoadPerNode = WeightFunction.avgWriteLoadPerNode(writeLoadForecaster, metadata, routingNodes);
avgDiskUsageInBytesPerNode = WeightFunction.avgDiskUsageInBytesPerNode(allocation.clusterInfo(), metadata, routingNodes);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left the averages calculated globally, I did experiment with making them local to the partition, but I'm not sure of the benefit of this. I think it might be more costly to filter the nodes and shards by partition to calculate these numbers. But definitely something we can pursue if we think it's worthwhile.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if the split would be significant if we had substantially differently-sized search and indexing tiers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe this wouldn't be costly -- just more complex -- if we pre-calculated primary vs replica counts when setting up ProjectMetadata#totalNumberOfShards? Similarly with the other values. But.. that is quite a bit of fiddly code to maintain.

I wonder, though, whether this code could already be doing weird things to our balance calculations in serverless. For example, hypothetically, if the search tier has 3x the number of shards -- say 3 replica copies per shard -- then the index tier nodes are going to increase the shard weight component (balancing-factor-constant x difference-from-average).

Copy link
Contributor Author

@nicktindall nicktindall Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we would probably want to base the counting on ShardRole, because perhaps there might be a future where primaries live in the search tier for read-only indices? (wild speculation there, but you never know). That would keep the logic for deciding "which shards are search/indexing shards" in one place, in the StatelessShardRoutingRoleStrategy.

I wonder, though, whether this code could already be doing weird things to our balance calculations in serverless.

Yes, it's a good point. Even just if there are different sized indexing and search tiers, some weights could never be balanced. e.g. if there were 5 indexing nodes and 1 search node, with an index with 5 shards, 1 replica. The average shard count would be 10/6 = 1.66~ so the indexing nodes would all be under-weight and the search node massively overweight in terms of shard count (I think).

// Balance each partition
for (NodeSorter nodeSorter : partitionedNodeSorter.allNodeSorters()) {
balanceByWeights(nodeSorter);
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We balance once for each NodeSorter (i.e. once for each partition)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be more clear to use the node sorter in a "partition" class/holder so we're going through the partitions, rather than the sorter. It might also be a naming thing.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I know what you mean. I've made it now so NodeSorters is an Iterable<NodeSorter> the "partition" terminology has faded from the implementation somewhat, but I think it's an important concept.

I added some javadoc to NodeSorter in b9403bb to make it clear it's scoped to a partition. I did have the partition terminology more front-and-centre in earlier versions of the PR but it also didn't look quite right.

Also, we don't always need NodeSorters. See in NodeAllocationStatsAndWeightsCalculator we use the BalancingWeightsFactory to create the BalancingWeights but we don't ever call createNodeSorters, so refactoring to just return e.g. a PartitionedCluster that contained Partitions which each hold a NodeSorter and a WeightFunction, though conceptually cleanest, won't fit the way we use the API.

@nicktindall nicktindall added :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. >enhancement :Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) and removed :Distributed Coordination/Distributed A catch all label for anything in the Distributed Coordination area. Please avoid if you can. labels Apr 8, 2025
@elasticsearchmachine
Copy link
Collaborator

Hi @nicktindall, I've created a changelog YAML for you.

@nicktindall nicktindall marked this pull request as ready for review April 8, 2025 06:12
@nicktindall nicktindall requested a review from a team as a code owner April 8, 2025 06:12
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@elasticsearchmachine elasticsearchmachine added the Team:Distributed Coordination Meta label for Distributed Coordination team label Apr 8, 2025
@@ -1323,7 +1299,7 @@ public boolean containsShard(ShardRouting shard) {
}
}

static final class NodeSorter extends IntroSorter {
public static final class NodeSorter extends IntroSorter {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

annoyingly these need to become public so the StatelessBalancingWeightsFactory can see/instantiate them

# Conflicts:
#	server/src/main/java/org/elasticsearch/cluster/ClusterModule.java
#	server/src/main/java/org/elasticsearch/cluster/routing/allocation/NodeAllocationStatsAndWeightsCalculator.java
#	server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java
#	server/src/main/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancerSettings.java
#	server/src/test/java/org/elasticsearch/cluster/routing/allocation/AllocationStatsServiceTests.java
#	server/src/test/java/org/elasticsearch/cluster/routing/allocation/BalanceConfigurationTests.java
#	server/src/test/java/org/elasticsearch/cluster/routing/allocation/allocator/BalancedShardsAllocatorTests.java
#	test/framework/src/main/java/org/elasticsearch/cluster/ESAllocationTestCase.java
Copy link
Member

@pxsalehi pxsalehi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Great work!

// Balance each partition
for (NodeSorter nodeSorter : partitionedNodeSorter.allNodeSorters()) {
balanceByWeights(nodeSorter);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it would be more clear to use the node sorter in a "partition" class/holder so we're going through the partitions, rather than the sorter. It might also be a naming thing.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
:Distributed Coordination/Allocation All issues relating to the decision making around placing a shard (both master logic & on the nodes) >enhancement serverless-linked Added by automation, don't add manually Team:Distributed Coordination Meta label for Distributed Coordination team v9.1.0
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants