Skip to content

MQE: include query planning and materialization in activity tracker #11212

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion pkg/streamingpromql/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"context"
"errors"
"fmt"
"math"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -54,11 +55,16 @@ func NewEngine(opts EngineOpts, limitsProvider QueryLimitsProvider, metrics *sta
return nil, errors.New("query planning enabled but no planner provided")
}

activeQueryTracker := opts.CommonOpts.ActiveQueryTracker
if activeQueryTracker == nil {
activeQueryTracker = &NoopQueryTracker{}
}

return &Engine{
lookbackDelta: lookbackDelta,
timeout: opts.CommonOpts.Timeout,
limitsProvider: limitsProvider,
activeQueryTracker: opts.CommonOpts.ActiveQueryTracker,
activeQueryTracker: activeQueryTracker,
noStepSubqueryIntervalFn: opts.CommonOpts.NoStepSubqueryIntervalFn,

logger: logger,
Expand Down Expand Up @@ -157,3 +163,23 @@ type staticQueryLimitsProvider struct {
func (p staticQueryLimitsProvider) GetMaxEstimatedMemoryConsumptionPerQuery(_ context.Context) (uint64, error) {
return p.maxEstimatedMemoryConsumptionPerQuery, nil
}

type NoopQueryTracker struct{}

func (n *NoopQueryTracker) GetMaxConcurrent() int {
return math.MaxInt
}

func (n *NoopQueryTracker) Insert(_ context.Context, _ string) (int, error) {
// Nothing to do.
return 0, nil
}

func (n *NoopQueryTracker) Delete(_ int) {
// Nothing to do.
}

func (n *NoopQueryTracker) Close() error {
// Nothing to do.
return nil
}
152 changes: 103 additions & 49 deletions pkg/streamingpromql/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1742,69 +1742,119 @@ func getHistogram(t *testing.T, reg *prometheus.Registry, name string) *dto.Hist
return m[0].Histogram
}

func TestActiveQueryTracker(t *testing.T) {
for _, shouldSucceed := range []bool{true, false} {
t.Run(fmt.Sprintf("successful query = %v", shouldSucceed), func(t *testing.T) {
opts := NewTestEngineOpts()
tracker := &testQueryTracker{}
opts.CommonOpts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), nil, log.NewNopLogger())
require.NoError(t, err)
func TestActiveQueryTracker_SuccessfulQuery_WithoutQueryPlanner(t *testing.T) {
opts := NewTestEngineOpts()
tracker := &testQueryTracker{}
opts.CommonOpts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), nil, log.NewNopLogger())
require.NoError(t, err)

innerStorage := promqltest.LoadedStorage(t, "")
t.Cleanup(func() { require.NoError(t, innerStorage.Close()) })
testActiveQueryTracker(t, engine, tracker)
}

// Use a fake queryable as a way to check that the query is recorded as active while the query is in progress.
queryTrackingTestingQueryable := &activeQueryTrackerQueryable{
innerStorage: innerStorage,
tracker: tracker,
}
func TestActiveQueryTracker_SuccessfulQuery_WithQueryPlanner(t *testing.T) {
opts := NewTestEngineOpts()

if !shouldSucceed {
queryTrackingTestingQueryable.err = errors.New("something went wrong inside the query")
}
tracker := &testQueryTracker{}
opts.CommonOpts.ActiveQueryTracker = tracker

queryTypes := map[string]func(expr string) (promql.Query, error){
"range": func(expr string) (promql.Query, error) {
return engine.NewRangeQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
},
"instant": func(expr string) (promql.Query, error) {
return engine.NewInstantQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0))
},
}
opts.UseQueryPlanning = true
planner := NewQueryPlanner(opts)

for queryType, createQuery := range queryTypes {
t.Run(queryType+" query", func(t *testing.T) {
expr := "test_" + queryType + "_query"
queryTrackingTestingQueryable.activeQueryAtQueryTime = trackedQuery{}
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner, log.NewNopLogger())
require.NoError(t, err)

q, err := createQuery(expr)
require.NoError(t, err)
defer q.Close()
testActiveQueryTracker(
t, engine, tracker,
trackedQuery{expr: "test_query # (planning)", deleted: true},
trackedQuery{expr: "test_query # (materialization)", deleted: true},
)
}

res := q.Exec(context.Background())
func testActiveQueryTracker(t *testing.T, engine *Engine, tracker *testQueryTracker, expectedCreationActivities ...trackedQuery) {
innerStorage := promqltest.LoadedStorage(t, "")
t.Cleanup(func() { require.NoError(t, innerStorage.Close()) })

if shouldSucceed {
require.NoError(t, res.Err)
} else {
require.EqualError(t, res.Err, "something went wrong inside the query")
}
// Use a fake queryable as a way to check that the query is recorded as active while the query is in progress.
queryTrackingTestingQueryable := &activeQueryTrackerQueryable{
innerStorage: innerStorage,
tracker: tracker,
}

// Check that the query was active in the query tracker while the query was executing.
require.Equal(t, expr, queryTrackingTestingQueryable.activeQueryAtQueryTime.expr)
require.False(t, queryTrackingTestingQueryable.activeQueryAtQueryTime.deleted)
queryTypes := map[string]func(expr string) (promql.Query, error){
"range": func(expr string) (promql.Query, error) {
return engine.NewRangeQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
},
"instant": func(expr string) (promql.Query, error) {
return engine.NewInstantQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0))
},
}

// Check that the query has now been marked as deleted in the query tracker.
require.NotEmpty(t, tracker.queries)
trackedQuery := tracker.queries[len(tracker.queries)-1]
require.Equal(t, expr, trackedQuery.expr)
require.Equal(t, true, trackedQuery.deleted)
})
}
for queryType, createQuery := range queryTypes {
t.Run(queryType+" query", func(t *testing.T) {
expr := "test_query"
queryTrackingTestingQueryable.activeQueryAtQueryTime = trackedQuery{}
tracker.Clear()

q, err := createQuery(expr)
require.NoError(t, err)
defer q.Close()

require.Equal(t, expectedCreationActivities, tracker.queries)

res := q.Exec(context.Background())
require.NoError(t, res.Err)

// Check that the query was active in the query tracker while the query was executing.
require.Equal(t, expr, queryTrackingTestingQueryable.activeQueryAtQueryTime.expr)
require.False(t, queryTrackingTestingQueryable.activeQueryAtQueryTime.deleted)

// Check that the query has now been marked as deleted in the query tracker.
require.NotEmpty(t, tracker.queries)
trackedQuery := tracker.queries[len(tracker.queries)-1]
require.Equal(t, expr, trackedQuery.expr)
require.Equal(t, true, trackedQuery.deleted)
})
}
}

func TestActiveQueryTracker_FailedQuery(t *testing.T) {
opts := NewTestEngineOpts()
tracker := &testQueryTracker{}
opts.CommonOpts.ActiveQueryTracker = tracker
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), nil, log.NewNopLogger())
require.NoError(t, err)

innerStorage := promqltest.LoadedStorage(t, "")
t.Cleanup(func() { require.NoError(t, innerStorage.Close()) })

// Use a fake queryable as a way to check that the query is recorded as active while the query is in progress,
// and to inject an error that causes the query to fail.
queryTrackingTestingQueryable := &activeQueryTrackerQueryable{
innerStorage: innerStorage,
tracker: tracker,
err: errors.New("something went wrong inside the query"),
}

expr := "test_metric"
q, err := engine.NewInstantQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0))
require.NoError(t, err)
defer q.Close()

res := q.Exec(context.Background())
require.EqualError(t, res.Err, "something went wrong inside the query")

// Check that the query was active in the query tracker while the query was executing.
require.Equal(t, expr, queryTrackingTestingQueryable.activeQueryAtQueryTime.expr)
require.False(t, queryTrackingTestingQueryable.activeQueryAtQueryTime.deleted)

// Check that the query has now been marked as deleted in the query tracker.
require.NotEmpty(t, tracker.queries)
trackedQuery := tracker.queries[len(tracker.queries)-1]
require.Equal(t, expr, trackedQuery.expr)
require.Equal(t, true, trackedQuery.deleted)
}

type testQueryTracker struct {
queries []trackedQuery
}
Expand Down Expand Up @@ -1835,6 +1885,10 @@ func (qt *testQueryTracker) Close() error {
return nil
}

func (qt *testQueryTracker) Clear() {
qt.queries = nil
}

type activeQueryTrackerQueryable struct {
tracker *testQueryTracker

Expand Down
21 changes: 21 additions & 0 deletions pkg/streamingpromql/planning.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,21 @@ import (
var timeSince = time.Since

type QueryPlanner struct {
activeQueryTracker promql.QueryTracker
noStepSubqueryIntervalFn func(rangeMillis int64) int64
astOptimizationPasses []optimize.ASTOptimizationPass
planOptimizationPasses []optimize.QueryPlanOptimizationPass
planStageLatency *prometheus.HistogramVec
}

func NewQueryPlanner(opts EngineOpts) *QueryPlanner {
activeQueryTracker := opts.CommonOpts.ActiveQueryTracker
if activeQueryTracker == nil {
activeQueryTracker = &NoopQueryTracker{}
}

return &QueryPlanner{
activeQueryTracker: activeQueryTracker,
noStepSubqueryIntervalFn: opts.CommonOpts.NoStepSubqueryIntervalFn,
planStageLatency: promauto.With(opts.CommonOpts.Reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_mimir_query_engine_plan_stage_latency_seconds",
Expand Down Expand Up @@ -72,6 +79,13 @@ type PlanningObserver interface {
}

func (p *QueryPlanner) NewQueryPlan(ctx context.Context, qs string, timeRange types.QueryTimeRange, observer PlanningObserver) (*planning.QueryPlan, error) {
queryID, err := p.activeQueryTracker.Insert(ctx, qs+" # (planning)")
if err != nil {
return nil, err
}

defer p.activeQueryTracker.Delete(queryID)

expr, err := p.runASTStage("Parsing", observer, func() (parser.Expr, error) { return parser.ParseExpr(qs) })
if err != nil {
return nil, err
Expand Down Expand Up @@ -381,6 +395,13 @@ func (e *Engine) Materialize(ctx context.Context, plan *planning.QueryPlan, quer
opts = promql.NewPrometheusQueryOpts(false, 0)
}

queryID, err := e.activeQueryTracker.Insert(ctx, plan.OriginalExpression+" # (materialization)")
if err != nil {
return nil, err
}

defer e.activeQueryTracker.Delete(queryID)

q, err := e.newQuery(ctx, queryable, opts, plan.TimeRange)
if err != nil {
return nil, err
Expand Down
19 changes: 19 additions & 0 deletions pkg/streamingpromql/planning_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,25 @@ func TestPlanCreationEncodingAndDecoding(t *testing.T) {
}
}

func TestQueryPlanner_ActivityTracking(t *testing.T) {
opts := NewTestEngineOpts()
opts.UseQueryPlanning = true
tracker := &testQueryTracker{}
opts.CommonOpts.ActiveQueryTracker = tracker
planner := NewQueryPlanner(opts)

expr := "test"
timeRange := types.NewInstantQueryTimeRange(time.Now())
_, err := planner.NewQueryPlan(context.Background(), expr, timeRange, NoopPlanningObserver{})
require.NoError(t, err)

expectedPlanningActivities := []trackedQuery{
{expr: "test # (planning)", deleted: true},
}

require.Equal(t, expectedPlanningActivities, tracker.queries)
}

func TestAnalysisHandler(t *testing.T) {
originalTimeSince := timeSince
timeSince = func(_ time.Time) time.Duration { return 1234 * time.Millisecond }
Expand Down
12 changes: 5 additions & 7 deletions pkg/streamingpromql/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -567,15 +567,13 @@ func (q *Query) Exec(ctx context.Context) *promql.Result {
// (so that it runs before the cancellation of the context with timeout created above).
defer cancel(errQueryFinished)

if q.engine.activeQueryTracker != nil {
queryID, err := q.engine.activeQueryTracker.Insert(ctx, q.originalExpression)
if err != nil {
return &promql.Result{Err: err}
}

defer q.engine.activeQueryTracker.Delete(queryID)
queryID, err := q.engine.activeQueryTracker.Insert(ctx, q.originalExpression)
if err != nil {
return &promql.Result{Err: err}
}

defer q.engine.activeQueryTracker.Delete(queryID)

defer func() {
logger := spanlogger.FromContext(ctx, q.engine.logger)
msg := make([]interface{}, 0, 2*(3+4)) // 3 fields for all query types, plus worst case of 4 fields for range queries
Expand Down