Skip to content

Add query limiter middleware for instant queries #11097

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Apr 22, 2025
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
* [ENHANCEMENT] Ingester/Distributor: Add `cortex_cost_attribution_*` metrics to observe the state of the cost-attribution trackers. #11112
* [ENHANCEMENT] gRPC/HTTP servers: Add `cortex_server_invalid_cluster_validation_label_requests_total` metric, that is increased for every request with an invalid cluster validation label. #11241 #11277
* [ENHANCEMENT] OTLP: Add support for converting OTel explicit bucket histograms to Prometheus native histograms with custom buckets using the `distributor.otel-convert-histograms-to-nhcb` flag. #11077
* [ENHANCEMENT] Add configurable per-tenant `limited_queries`, which you can only run at or less than an allowed frequency. #11097
* [BUGFIX] OTLP: Fix response body and Content-Type header to align with spec. #10852
* [BUGFIX] Compactor: fix issue where block becomes permanently stuck when the Compactor's block cleanup job partially deletes a block. #10888
* [BUGFIX] Storage: fix intermittent failures in S3 upload retries. #10952
Expand Down
10 changes: 10 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -4865,6 +4865,16 @@
"fieldType": "blocked_queries_config...",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "limited_queries",
"required": false,
"desc": "List of queries to limit and duration to limit them for.",
"fieldValue": null,
"fieldDefaultValue": [],
"fieldType": "list of query (string) and allowed_frequency (duration)",
"fieldCategory": "experimental"
},
{
"kind": "field",
"name": "blocked_requests",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3676,6 +3676,15 @@ The `limits` block configures default and per-tenant limits imposed by component
# (experimental) List of queries to block.
[blocked_queries: <blocked_queries_config...> | default = ]

# (experimental) List of queries to limit and duration to limit them for.
# Example:
# The following configuration limits the query "rate(metric_counter[5m])" to
# running, at most, every minute.
# limited_queries:
# - allowed_frequency: 1m
# query: rate(metric_counter[5m])
[limited_queries: <list of query (string) and allowed_frequency (duration)> | default = ]

# (experimental) List of http requests to block.
[blocked_requests: <blocked_requests_config...> | default = ]

Expand Down
15 changes: 14 additions & 1 deletion docs/sources/mimir/manage/mimir-runbooks/_index.md
Original file line number Diff line number Diff line change
Expand Up @@ -2688,7 +2688,20 @@ How it **works**:

How to **fix** it:

This error only occurs when an administrator has explicitly define a blocked list for a given tenant. After assessing whether or not the reason for blocking one or multiple queries you can update the tenant's limits and remove the pattern.
This error only occurs when an administrator has explicitly defined a blocked list for a given tenant. After assessing the reason for blocking one or multiple queries, you can update the tenant's limits and remove the pattern.

### err-mimir-query-limited

This error occurs when a query-frontend blocks a read request because the query matches at least one of the rules defined in the limits and the query is being run too frequently.

How it **works**:

- The query-frontend implements a middleware responsible for assessing whether the query should be limited and whether it has been run within the last allowed frequency.
- To configure the limit, set the block `limited_queries` in the `limits`.

How to **fix** it:

Consider running this query less frequently. This error only occurs when an administrator has explicitly defined a limited queries list for a given tenant. After assessing the reason for limiting one or multiple queries, you can update the tenant's limits and remove the pattern.

### err-mimir-request-blocked

Expand Down
7 changes: 7 additions & 0 deletions pkg/frontend/querymiddleware/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,10 @@ func newMaxQueryExpressionSizeBytesError(actualSizeBytes, maxQuerySizeBytes int)
func newQueryBlockedError() error {
return apierror.New(apierror.TypeBadData, globalerror.QueryBlocked.Message("the request has been blocked by the cluster administrator"))
}

func newQueryLimitedError(allowedFrequency time.Duration, tenantID string) error {
return apierror.New(
apierror.TypeTooManyRequests, globalerror.QueryLimited.Message(
fmt.Sprintf("the query has been limited by the cluster administrator, and is being run more frequently than the allowed frequency %s against tenant %s", allowedFrequency, tenantID),
))
}
3 changes: 3 additions & 0 deletions pkg/frontend/querymiddleware/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ type Limits interface {
// BlockedRequests returns the blocked http requests.
BlockedRequests(userID string) []*validation.BlockedRequest

// LimitedQueries returns the limited queries.
LimitedQueries(userID string) []*validation.LimitedQuery

// AlignQueriesWithStep returns if queries should be adjusted to be step-aligned
AlignQueriesWithStep(userID string) bool

Expand Down
9 changes: 9 additions & 0 deletions pkg/frontend/querymiddleware/limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,10 @@ func (m multiTenantMockLimits) BlockedQueries(userID string) []*validation.Block
return m.byTenant[userID].blockedQueries
}

func (m multiTenantMockLimits) LimitedQueries(userID string) []*validation.LimitedQuery {
return m.byTenant[userID].limitedQueries
}

func (m multiTenantMockLimits) CreationGracePeriod(userID string) time.Duration {
return m.byTenant[userID].creationGracePeriod
}
Expand Down Expand Up @@ -729,6 +733,7 @@ type mockLimits struct {
enabledPromQLExperimentalFunctions []string
prom2RangeCompat bool
blockedQueries []*validation.BlockedQuery
limitedQueries []*validation.LimitedQuery
blockedRequests []*validation.BlockedRequest
alignQueriesWithStep bool
queryIngestersWithin time.Duration
Expand Down Expand Up @@ -810,6 +815,10 @@ func (m mockLimits) BlockedQueries(string) []*validation.BlockedQuery {
return m.blockedQueries
}

func (m mockLimits) LimitedQueries(userID string) []*validation.LimitedQuery {
return m.limitedQueries
}

func (m mockLimits) ResultsCacheTTLForLabelsQuery(string) time.Duration {
return m.resultsCacheTTLForLabelsQuery
}
Expand Down
7 changes: 1 addition & 6 deletions pkg/frontend/querymiddleware/query_blocker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/go-kit/log/level"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/labels"
)

Expand All @@ -24,12 +23,8 @@ type queryBlockerMiddleware struct {
func newQueryBlockerMiddleware(
limits Limits,
logger log.Logger,
registerer prometheus.Registerer,
blockedQueriesCounter *prometheus.CounterVec,
) MetricsQueryMiddleware {
blockedQueriesCounter := promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_rejected_queries_total",
Help: "Number of queries that were rejected by the cluster administrator.",
}, []string{"user", "reason"})
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
return &queryBlockerMiddleware{
next: next,
Expand Down
15 changes: 12 additions & 3 deletions pkg/frontend/querymiddleware/query_blocker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/go-kit/log"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
promtest "github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -119,16 +120,20 @@ func TestQueryBlockerMiddleware_RangeAndInstantQuery(t *testing.T) {
"range query": &PrometheusRangeQueryRequest{
queryExpr: parseQuery(t, tt.query),
},
"instant query": &PrometheusRangeQueryRequest{
"instant query": &PrometheusInstantQueryRequest{
queryExpr: parseQuery(t, tt.query),
},
}

for reqType, req := range reqs {
t.Run(reqType, func(t *testing.T) {
reg := prometheus.NewPedanticRegistry()
blockedQueriesCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_rejected_queries_total",
Help: "Number of queries that were rejected by the cluster administrator.",
}, []string{"user", "reason"})
logger := log.NewNopLogger()
mw := newQueryBlockerMiddleware(tt.limits, logger, reg)
mw := newQueryBlockerMiddleware(tt.limits, logger, blockedQueriesCounter)
_, err := mw.Wrap(&mockNextHandler{t: t, shouldContinue: !tt.expectedBlocked}).Do(user.InjectOrgID(context.Background(), "test"), req)

if tt.expectedBlocked {
Expand Down Expand Up @@ -226,8 +231,12 @@ func TestQueryBlockerMiddleware_RemoteRead(t *testing.T) {
require.NoError(t, err)

reg := prometheus.NewPedanticRegistry()
blockedQueriesCounter := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_query_frontend_rejected_queries_total",
Help: "Number of queries that were rejected by the cluster administrator.",
}, []string{"user", "reason"})
logger := log.NewNopLogger()
mw := newQueryBlockerMiddleware(tt.limits, logger, reg)
mw := newQueryBlockerMiddleware(tt.limits, logger, blockedQueriesCounter)
_, err = mw.Wrap(&mockNextHandler{t: t, shouldContinue: !tt.expectedBlocked}).Do(user.InjectOrgID(context.Background(), "test"), req)

if tt.expectedBlocked {
Expand Down
103 changes: 103 additions & 0 deletions pkg/frontend/querymiddleware/query_limiter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// SPDX-License-Identifier: AGPL-3.0-only

package querymiddleware

import (
"context"
"encoding/base64"
"errors"
"strings"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/cache"
"github.com/grafana/dskit/tenant"
"github.com/prometheus/client_golang/prometheus"
"golang.org/x/crypto/blake2b"

"github.com/grafana/mimir/pkg/util/spanlogger"
"github.com/grafana/mimir/pkg/util/validation"
)

// queryLimiterMiddleware blocks a query if it is should not be executed more frequently than a configured interval,
// and allows it if enough time has passed since the last time the query was allowed.
// When blocked, the request is rejected with a "too many requests" error.
// The query limiter currently only matches exact queries, and does not check against query time, only execution time.
type queryLimiterMiddleware struct {
next MetricsQueryHandler
cache cache.Cache
keyGen CacheKeyGenerator
limits Limits
logger log.Logger
blockedQueriesCounter *prometheus.CounterVec
}

func newQueryLimiterMiddleware(
cache cache.Cache,
cacheKeyGen CacheKeyGenerator,
limits Limits,
logger log.Logger,
blockedQueriesCounter *prometheus.CounterVec,
) MetricsQueryMiddleware {
return MetricsQueryMiddlewareFunc(func(next MetricsQueryHandler) MetricsQueryHandler {
return &queryLimiterMiddleware{
next: next,
cache: cache,
keyGen: cacheKeyGen,
limits: limits,
logger: logger,
blockedQueriesCounter: blockedQueriesCounter,
}
})
}

func (ql *queryLimiterMiddleware) Do(ctx context.Context, req MetricsQueryRequest) (Response, error) {
spanLog := spanlogger.FromContext(ctx, ql.logger)
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return ql.next.Do(ctx, req)
}

key := ql.keyGen.QueryRequestLimiter(ctx, tenant.JoinTenantIDs(tenantIDs), req)
hashedKey := maybeHashCacheKey(key)
limitedQueryToEnforce := validation.LimitedQuery{}
query := strings.TrimSpace(req.GetQuery())
var tenantMinAllowedFrequency string

for _, tenantID := range tenantIDs {
for _, limitedQuery := range ql.limits.LimitedQueries(tenantID) {
if strings.TrimSpace(limitedQuery.Query) == query {
level.Debug(spanLog).Log("msg", "query limiter matched exact query", "query", query, "user", tenantID)
if limitedQueryToEnforce.Query == "" {
limitedQueryToEnforce.Query = query
}
if limitedQuery.AllowedFrequency > limitedQueryToEnforce.AllowedFrequency {
limitedQueryToEnforce.AllowedFrequency = limitedQuery.AllowedFrequency
tenantMinAllowedFrequency = tenantID
}
}
}
}
// If we found any matching limited query, we should try to cache it
if limitedQueryToEnforce.Query != "" {
if err := ql.cache.Add(ctx, hashedKey, []byte{}, limitedQueryToEnforce.AllowedFrequency); err != nil {
// If we receive ErrNotStored, the entry is still in the cache and the query should be blocked
if errors.Is(err, cache.ErrNotStored) {
ql.blockedQueriesCounter.WithLabelValues(tenantMinAllowedFrequency, "limited").Inc()
return nil, newQueryLimitedError(limitedQueryToEnforce.AllowedFrequency, tenantMinAllowedFrequency)
}
level.Warn(ql.logger).Log("msg", "error while adding to query limiter cache", "err", err)
}
}

return ql.next.Do(ctx, req)
}

func maybeHashCacheKey(key string) string {
if len(key) <= base64.RawURLEncoding.EncodedLen(blake2b.Size256) {
return key
}

sum := blake2b.Sum256([]byte(key))
return base64.RawURLEncoding.EncodeToString(sum[:blake2b.Size256])
}
Loading