Skip to content

Fix DiskThresholdDecider average disk usage with huge filesystems #100599

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/100599.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100599
summary: Fix `DiskThresholdDecider` average disk usage with huge filesystems
area: Allocation
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, Routing
if (allocation.metadata().index(shardRouting.index()).ignoreDiskWatermarks()) {
return YES_DISK_WATERMARKS_IGNORED;
}
if (useAverageDiskUsage(node, usages) == false) {
return YES_AVERAGE_DISK_USAGE_UNAVAILABLE;
}

// subtractLeavingShards is passed as false here, because they still use disk space, and therefore we should be extra careful
// and take the size into account
Expand Down Expand Up @@ -353,6 +356,9 @@ public Decision canForceAllocateDuringReplace(ShardRouting shardRouting, Routing
if (allocation.metadata().index(shardRouting.index()).ignoreDiskWatermarks()) {
return YES_DISK_WATERMARKS_IGNORED;
}
if (useAverageDiskUsage(node, usages) == false) {
return YES_AVERAGE_DISK_USAGE_UNAVAILABLE;
}

final DiskUsageWithRelocations usage = getDiskUsage(node, allocation, usages, false);
final long shardSize = getExpectedShardSize(shardRouting, 0L, allocation);
Expand Down Expand Up @@ -393,6 +399,9 @@ public Decision canRemain(IndexMetadata indexMetadata, ShardRouting shardRouting
if (indexMetadata.ignoreDiskWatermarks()) {
return YES_DISK_WATERMARKS_IGNORED;
}
if (useAverageDiskUsage(node, usages) == false) {
return YES_AVERAGE_DISK_USAGE_UNAVAILABLE;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is added before every call getDiskUsage that might compute the average.
Is there a value of making getDiskUsage return null if overflow happens and have it as a signal to return above decision? This way the addition of total and free could be performed only once (in getDiskUsage) vs twice (in getDiskUsage and the check above).


// subtractLeavingShards is passed as true here, since this is only for shards remaining, we will *eventually* have enough disk
// since shards are moving away. No new shards will be incoming since in canAllocate we pass false for this check.
Expand Down Expand Up @@ -503,6 +512,32 @@ private static DiskUsageWithRelocations getDiskUsage(
return diskUsageWithRelocations;
}

/**
* Returns if an average disk usage can be computed from the disk usage map.
*
* @param usages map of nodeId to DiskUsage for all known nodes
* @param node Node to return an averaged DiskUsage object for
* @return true if the average disk usage can be computed from the disk usage map
*/
static boolean useAverageDiskUsage(RoutingNode node, Map<String, DiskUsage> usages) {
assert usages.isEmpty() == false;
if (usages.containsKey(node.nodeId()) == false) {
long total = 0L;
long free = 0L;
for (DiskUsage du : usages.values()) {
total += du.getTotalBytes();
if (total < 0L) {
return false;
}
free += du.getFreeBytes();
if (free < 0L) {
return false;
}
}
}
return true;
}

/**
* Returns a {@link DiskUsage} for the {@link RoutingNode} using the
* average usage of other nodes in the disk usage map.
Expand All @@ -511,9 +546,8 @@ private static DiskUsageWithRelocations getDiskUsage(
* @return DiskUsage representing given node using the average disk usage
*/
static DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {
if (usages.size() == 0) {
return new DiskUsage(node.nodeId(), node.node().getName(), "_na_", 0, 0);
}
assert usages.containsKey(node.nodeId()) == false;
assert usages.isEmpty() == false : usages;
long totalBytes = 0;
long freeBytes = 0;
for (DiskUsage du : usages.values()) {
Expand All @@ -527,13 +561,19 @@ static DiskUsage averageUsage(RoutingNode node, Map<String, DiskUsage> usages) {

private static final Decision YES_USAGES_UNAVAILABLE = Decision.single(Decision.Type.YES, NAME, "disk usages are unavailable");

private static final Decision YES_AVERAGE_DISK_USAGE_UNAVAILABLE = Decision.single(
Decision.Type.YES,
NAME,
"average disk usage cannot be computed"
);

private Decision earlyTerminate(Map<String, DiskUsage> usages) {
// Always allow allocation if the decider is disabled
if (diskThresholdSettings.isEnabled() == false) {
return YES_DISABLED;
}

// Fail open if there are no disk usages available
// Allow allocation if there are no disk usages available
if (usages.isEmpty()) {
logger.trace("unable to determine disk usages for disk-aware allocation, allowing allocation");
return YES_USAGES_UNAVAILABLE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,74 @@ public void testCanAllocateUsesMaxAvailableSpace() {
);
}

public void testAverageDiskUsage() {
var metadata = Metadata.builder()
.put(IndexMetadata.builder("index").settings(settings(IndexVersion.current())).numberOfShards(1).numberOfReplicas(0))
.build();
var clusterState = ClusterState.builder(ClusterName.DEFAULT)
.metadata(metadata)
.nodes(
DiscoveryNodes.builder()
.add(DiscoveryNodeUtils.builder("node_0").roles(new HashSet<>(DiscoveryNodeRole.roles())).build())
.add(DiscoveryNodeUtils.builder("node_1").roles(new HashSet<>(DiscoveryNodeRole.roles())).build())
.add(DiscoveryNodeUtils.builder("node_2").roles(new HashSet<>(DiscoveryNodeRole.roles())).build())
Comment on lines +141 to +143
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT:

Suggested change
.add(DiscoveryNodeUtils.builder("node_0").roles(new HashSet<>(DiscoveryNodeRole.roles())).build())
.add(DiscoveryNodeUtils.builder("node_1").roles(new HashSet<>(DiscoveryNodeRole.roles())).build())
.add(DiscoveryNodeUtils.builder("node_2").roles(new HashSet<>(DiscoveryNodeRole.roles())).build())
.add(DiscoveryNodeUtils.create("node_0"))
.add(DiscoveryNodeUtils.create("node_1"))
.add(DiscoveryNodeUtils.create("node_2"))

I believe all roles are added by default so no need to manually set them

)
.routingTable(RoutingTable.builder(TestShardRoutingRoleStrategies.DEFAULT_ROLE_ONLY).addAsNew(metadata.index("index")).build())
.build();
final Map<String, DiskUsage> availableSpaceUsage = new HashMap<>();
if (randomBoolean()) {
availableSpaceUsage.put("node_0", new DiskUsage("node_0", "node_0", "_na_", Long.MAX_VALUE, randomNonNegativeLong()));
availableSpaceUsage.put("node_1", new DiskUsage("node_0", "node_0", "_na_", Long.MAX_VALUE, randomNonNegativeLong()));
}
var clusterInfo = new ClusterInfo(
availableSpaceUsage,
availableSpaceUsage,
Map.of("[index][0][p]", ByteSizeValue.ofGb(50L).getBytes()),
Map.of(),
Map.of(),
Map.of()
);
var decider = new DiskThresholdDecider(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit:

Suggested change
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
ClusterSettings.createBuiltInClusterSettings()

);
var allocation = new RoutingAllocation(
new AllocationDeciders(Collections.singleton(decider)),
clusterState,
clusterInfo,
null,
System.nanoTime()
);
allocation.debugDecision(true);

final var node = RoutingNodesHelper.routingNode("node_2", clusterState.nodes().resolveNode("node_2"));
var unassignedShard = ShardRouting.newUnassigned(
new ShardId(clusterState.metadata().index("index").getIndex(), 0),
true,
EmptyStoreRecoverySource.INSTANCE,
new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, "create index"),
ShardRouting.Role.DEFAULT
);
final Decision decision = switch (randomInt(2)) {
case 0 -> decider.canForceAllocateDuringReplace(unassignedShard, node, allocation);
case 1 -> decider.canAllocate(unassignedShard, node, allocation);
case 2 -> decider.canRemain(
clusterState.metadata().index("index"),
ShardRoutingHelper.moveToStarted(ShardRoutingHelper.initialize(unassignedShard, "node_2")),
node,
allocation
);
default -> throw new AssertionError();
};
assertEquals(Decision.Type.YES, decision.type());
assertThat(
decision.getExplanation(),
availableSpaceUsage.isEmpty()
? containsString("disk usages are unavailable")
: containsString("average disk usage cannot be computed")
);
}

private void doTestCannotAllocateDueToLackOfDiskResources(boolean testMaxHeadroom) {
ClusterSettings nss = new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
DiskThresholdDecider decider = new DiskThresholdDecider(Settings.EMPTY, nss);
Expand Down