Skip to content

[8.19] Allow partial results by default in ES|QL - Take 2 (#127351) #127474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 28, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions docs/changelog/127351.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
pr: 127351
summary: Allow partial results by default in ES|QL
area: ES|QL
type: breaking
issues: [122802]

breaking:
title: Allow partial results by default in ES|QL
area: ES|QL
details: >-
In earlier versions of {es}, ES|QL would fail the entire query if it encountered any error. ES|QL now returns partial results instead of failing when encountering errors.

impact: >-
Callers should check the `is_partial` flag returned in the response to determine if the result is partial or complete. If returning partial results is not desired, this option can be overridden per request via an `allow_partial_results` parameter in the query URL or globally via the cluster setting `esql.query.allow_partial_results`.

notable: true
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@ static ElasticsearchCluster buildCluster() {
.module("test-esql-heap-attack")
.setting("xpack.security.enabled", "false")
.setting("xpack.license.self_generated.type", "trial")
.setting("esql.query.allow_partial_results", "false")
.jvmArg("-Xmx512m");
String javaVersion = JvmInfo.jvmInfo().version();
if (javaVersion.equals("20") || javaVersion.equals("21")) {
2 changes: 2 additions & 0 deletions x-pack/plugin/build.gradle
Original file line number Diff line number Diff line change
@@ -223,8 +223,10 @@ tasks.named("yamlRestTestV7CompatTransform").configure({ task ->
task.skipTest("esql/40_tsdb/from index pattern unsupported counter", "TODO: support for subset of metric fields")
task.skipTest("esql/40_unsupported_types/unsupported", "TODO: support for subset of metric fields")
task.skipTest("esql/40_unsupported_types/unsupported with sort", "TODO: support for subset of metric fields")
task.skipTest("esql/63_enrich_int_range/Invalid age as double", "TODO: require disable allow_partial_results")
})


tasks.named('yamlRestTestV7CompatTest').configure {
systemProperty 'es.queryable_built_in_roles_enabled', 'false'
}
Original file line number Diff line number Diff line change
@@ -83,6 +83,6 @@ private RestClient remoteClusterClient() throws IOException {

@Before
public void skipTestOnOldVersions() {
assumeTrue("skip on old versions", Clusters.localClusterVersion().equals(Version.V_8_16_0));
assumeTrue("skip on old versions", Clusters.localClusterVersion().equals(Version.V_8_19_0));
}
}
Original file line number Diff line number Diff line change
@@ -30,6 +30,7 @@
import org.junit.rules.TestRule;

import java.io.IOException;
import java.util.List;
import java.util.Map;

import static org.elasticsearch.test.MapMatcher.assertMap;
@@ -94,6 +95,12 @@ protected String from(String... indexName) {

@Override
public Map<String, Object> runEsql(RestEsqlTestCase.RequestObjectBuilder requestObject) throws IOException {
if (requestObject.allowPartialResults() != null) {
assumeTrue(
"require allow_partial_results on local cluster",
clusterHasCapability("POST", "/_query", List.of(), List.of("support_partial_results")).orElse(false)
);
}
requestObject.includeCCSMetadata(true);
return super.runEsql(requestObject);
}
Original file line number Diff line number Diff line change
@@ -110,7 +110,7 @@ public void testInvalidPragma() throws IOException {
request.setJsonEntity("{\"f\":" + i + "}");
assertOK(client().performRequest(request));
}
RequestObjectBuilder builder = requestObjectBuilder().query("from test-index | limit 1 | keep f");
RequestObjectBuilder builder = requestObjectBuilder().query("from test-index | limit 1 | keep f").allowPartialResults(false);
builder.pragmas(Settings.builder().put("data_partitioning", "invalid-option").build());
ResponseException re = expectThrows(ResponseException.class, () -> runEsqlSync(builder));
assertThat(EntityUtils.toString(re.getResponse().getEntity()), containsString("No enum constant"));
Original file line number Diff line number Diff line change
@@ -132,7 +132,7 @@ public static class RequestObjectBuilder {
private Boolean includeCCSMetadata = null;

private CheckedConsumer<XContentBuilder, IOException> filter;
private Boolean allPartialResults = null;
private Boolean allowPartialResults = null;

public RequestObjectBuilder() throws IOException {
this(randomFrom(XContentType.values()));
@@ -210,11 +210,15 @@ public RequestObjectBuilder filter(CheckedConsumer<XContentBuilder, IOException>
return this;
}

public RequestObjectBuilder allPartialResults(boolean allPartialResults) {
this.allPartialResults = allPartialResults;
public RequestObjectBuilder allowPartialResults(boolean allowPartialResults) {
this.allowPartialResults = allowPartialResults;
return this;
}

public Boolean allowPartialResults() {
return allowPartialResults;
}

public RequestObjectBuilder build() throws IOException {
if (isBuilt == false) {
if (tables != null) {
@@ -1369,8 +1373,8 @@ protected static Request prepareRequestWithOptions(RequestObjectBuilder requestO
requestObject.build();
Request request = prepareRequest(mode);
String mediaType = attachBody(requestObject, request);
if (requestObject.allPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allPartialResults));
if (requestObject.allowPartialResults != null) {
request.addParameter("allow_partial_results", String.valueOf(requestObject.allowPartialResults));
}

RequestOptions.Builder options = request.getOptions().toBuilder();
Original file line number Diff line number Diff line change
@@ -25,6 +25,7 @@
import org.elasticsearch.transport.RemoteClusterAware;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.plugin.EsqlPlugin;
import org.junit.After;
import org.junit.Before;

@@ -76,6 +77,11 @@ protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
return plugins;
}

@Override
protected Settings nodeSettings() {
return Settings.builder().put(super.nodeSettings()).put(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey(), false).build();
}

public static class InternalExchangePlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
Original file line number Diff line number Diff line change
@@ -139,6 +139,14 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
return CollectionUtils.appendToCopy(super.nodePlugins(), EsqlPlugin.class);
}

@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(EsqlPlugin.QUERY_ALLOW_PARTIAL_RESULTS.getKey(), false)
.build();
}

protected void setRequestCircuitBreakerLimit(ByteSizeValue limit) {
if (limit != null) {
assertAcked(
Original file line number Diff line number Diff line change
@@ -14,25 +14,20 @@
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.compute.operator.DriverTaskRunner;
import org.elasticsearch.compute.operator.exchange.ExchangeService;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.tasks.TaskInfo;
import org.elasticsearch.test.AbstractMultiClustersTestCase;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xcontent.XContentBuilder;
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.ComputeService;
import org.junit.After;
import org.junit.Before;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.TimeUnit;

@@ -44,44 +39,20 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;

public class CrossClusterCancellationIT extends AbstractMultiClustersTestCase {
public class CrossClusterCancellationIT extends AbstractCrossClusterTestCase {
private static final String REMOTE_CLUSTER = "cluster-a";

@Override
protected Collection<String> remoteClusterAlias() {
protected List<String> remoteClusterAlias() {
return List.of(REMOTE_CLUSTER);
}

@Override
protected Collection<Class<? extends Plugin>> nodePlugins(String clusterAlias) {
List<Class<? extends Plugin>> plugins = new ArrayList<>(super.nodePlugins(clusterAlias));
plugins.add(EsqlPluginWithEnterpriseOrTrialLicense.class);
plugins.add(InternalExchangePlugin.class);
plugins.add(SimplePauseFieldPlugin.class);
return plugins;
}

public static class InternalExchangePlugin extends Plugin {
@Override
public List<Setting<?>> getSettings() {
return List.of(
Setting.timeSetting(
ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING,
TimeValue.timeValueMillis(between(3000, 4000)),
Setting.Property.NodeScope
)
);
}
}

@Before
public void resetPlugin() {
SimplePauseFieldPlugin.resetPlugin();
}

@After
public void releasePlugin() {
SimplePauseFieldPlugin.release();
protected Settings nodeSettings() {
return Settings.builder()
.put(super.nodeSettings())
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(3000, 4000)))
.build();
}

@Override
Original file line number Diff line number Diff line change
@@ -69,6 +69,7 @@ protected Collection<Class<? extends Plugin>> nodePlugins() {
@Override
protected Settings nodeSettings(int nodeOrdinal, Settings otherSettings) {
return Settings.builder()
.put(super.nodeSettings(nodeOrdinal, otherSettings))
.put(ExchangeService.INACTIVE_SINKS_INTERVAL_SETTING, TimeValue.timeValueMillis(between(3000, 5000)))
.build();
}
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionListenerResponseHandler;
import org.elasticsearch.action.OriginalIndices;
@@ -17,7 +16,6 @@
import org.elasticsearch.compute.operator.exchange.ExchangeSourceHandler;
import org.elasticsearch.core.Releasable;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.tasks.CancellableTask;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.tasks.TaskCancelledException;
@@ -90,18 +88,12 @@ void startComputeOnRemoteCluster(
if (receivedResults == false && EsqlCCSUtils.shouldIgnoreRuntimeError(executionInfo, clusterAlias, e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.SKIPPED, e);
l.onResponse(List.of());
} else if (configuration.allowPartialResults()
&& (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(
executionInfo,
clusterAlias,
EsqlExecutionInfo.Cluster.Status.PARTIAL,
e
);
l.onResponse(List.of());
} else {
l.onFailure(e);
}
} else if (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e)) {
EsqlCCSUtils.markClusterWithFinalStateAndNoShards(executionInfo, clusterAlias, EsqlExecutionInfo.Cluster.Status.PARTIAL, e);
l.onResponse(List.of());
} else {
l.onFailure(e);
}
});
ExchangeService.openExchange(
transportService,
Original file line number Diff line number Diff line change
@@ -7,7 +7,6 @@

package org.elasticsearch.xpack.esql.plugin;

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
@@ -28,7 +27,6 @@
import org.elasticsearch.core.Releasables;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.logging.LogManager;
import org.elasticsearch.logging.Logger;
@@ -59,6 +57,7 @@
import org.elasticsearch.xpack.esql.planner.LocalExecutionPlanner;
import org.elasticsearch.xpack.esql.planner.PlannerUtils;
import org.elasticsearch.xpack.esql.session.Configuration;
import org.elasticsearch.xpack.esql.session.EsqlCCSUtils;
import org.elasticsearch.xpack.esql.session.Result;

import java.util.ArrayList;
@@ -276,8 +275,7 @@ public void execute(
);
dataNodesListener.onResponse(r.getProfiles());
}, e -> {
if (configuration.allowPartialResults()
&& (ExceptionsHelper.unwrapCause(e) instanceof IndexNotFoundException) == false) {
if (configuration.allowPartialResults() && EsqlCCSUtils.canAllowPartial(e)) {
execInfo.swapCluster(
LOCAL_CLUSTER,
(k, v) -> new EsqlExecutionInfo.Cluster.Builder(v).setStatus(
Original file line number Diff line number Diff line change
@@ -105,7 +105,7 @@ public class EsqlPlugin extends Plugin implements ActionPlugin {

public static final Setting<Boolean> QUERY_ALLOW_PARTIAL_RESULTS = Setting.boolSetting(
"esql.query.allow_partial_results",
false,
true,
Setting.Property.NodeScope,
Setting.Property.Dynamic
);
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@

package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.OriginalIndices;
@@ -16,6 +17,7 @@
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.indices.IndicesExpressionGrouper;
import org.elasticsearch.license.XPackLicenseState;
@@ -367,4 +369,16 @@ public static boolean shouldIgnoreRuntimeError(EsqlExecutionInfo executionInfo,

return ExceptionsHelper.isRemoteUnavailableException(e);
}

/**
* Check whether this exception can be tolerated when partial results are on, or should be treated as fatal.
* @return true if the exception can be tolerated, false if it should be treated as fatal.
*/
public static boolean canAllowPartial(Exception e) {
Throwable unwrapped = ExceptionsHelper.unwrapCause(e);
if (unwrapped instanceof IndexNotFoundException || unwrapped instanceof ElasticsearchSecurityException) {
return false;
}
return true;
}
}
Original file line number Diff line number Diff line change
@@ -192,8 +192,17 @@ teardown:

---
"Invalid age as double":
- requires:
test_runner_features: [ capabilities ]
capabilities:
- method: POST
path: /_query
parameters: [ ]
capabilities: [ support_partial_results ]
reason: "disable partial_results"
- do:
catch: /ENRICH range and input types are incompatible. range\[INTEGER\], input\[DOUBLE\]/
esql.query:
allow_partial_results: false
body:
query: 'FROM employees | ENRICH ages-policy ON salary | STATS count=COUNT(*) BY description | SORT count DESC, description ASC'