Skip to content

Commit 3041797

Browse files
committed
MQE: include query planning and materialization in activity tracker
1 parent a3ab660 commit 3041797

File tree

3 files changed

+108
-50
lines changed

3 files changed

+108
-50
lines changed

Diff for: pkg/streamingpromql/engine_test.go

+69-50
Original file line numberDiff line numberDiff line change
@@ -1788,62 +1788,77 @@ func getHistogram(t *testing.T, reg *prometheus.Registry, name string) *dto.Hist
17881788
}
17891789

17901790
func TestActiveQueryTracker(t *testing.T) {
1791-
for _, shouldSucceed := range []bool{true, false} {
1792-
t.Run(fmt.Sprintf("successful query = %v", shouldSucceed), func(t *testing.T) {
1793-
opts := NewTestEngineOpts()
1794-
tracker := &testQueryTracker{}
1795-
opts.CommonOpts.ActiveQueryTracker = tracker
1796-
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), nil, log.NewNopLogger())
1797-
require.NoError(t, err)
1798-
1799-
innerStorage := promqltest.LoadedStorage(t, "")
1800-
t.Cleanup(func() { require.NoError(t, innerStorage.Close()) })
1801-
1802-
// Use a fake queryable as a way to check that the query is recorded as active while the query is in progress.
1803-
queryTrackingTestingQueryable := &activeQueryTrackerQueryable{
1804-
innerStorage: innerStorage,
1805-
tracker: tracker,
1806-
}
1807-
1808-
if !shouldSucceed {
1809-
queryTrackingTestingQueryable.err = errors.New("something went wrong inside the query")
1810-
}
1811-
1812-
queryTypes := map[string]func(expr string) (promql.Query, error){
1813-
"range": func(expr string) (promql.Query, error) {
1814-
return engine.NewRangeQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
1815-
},
1816-
"instant": func(expr string) (promql.Query, error) {
1817-
return engine.NewInstantQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0))
1818-
},
1819-
}
1820-
1821-
for queryType, createQuery := range queryTypes {
1822-
t.Run(queryType+" query", func(t *testing.T) {
1823-
expr := "test_" + queryType + "_query"
1824-
queryTrackingTestingQueryable.activeQueryAtQueryTime = trackedQuery{}
1825-
1826-
q, err := createQuery(expr)
1791+
for _, usePlanner := range []bool{true, false} {
1792+
t.Run(fmt.Sprintf("use planner = %v", usePlanner), func(t *testing.T) {
1793+
for _, shouldSucceed := range []bool{true, false} {
1794+
t.Run(fmt.Sprintf("successful query = %v", shouldSucceed), func(t *testing.T) {
1795+
opts := NewTestEngineOpts()
1796+
opts.UseQueryPlanning = usePlanner
1797+
tracker := &testQueryTracker{}
1798+
opts.CommonOpts.ActiveQueryTracker = tracker
1799+
planner := NewQueryPlanner(opts)
1800+
engine, err := NewEngine(opts, NewStaticQueryLimitsProvider(0), stats.NewQueryMetrics(nil), planner, log.NewNopLogger())
18271801
require.NoError(t, err)
1828-
defer q.Close()
18291802

1830-
res := q.Exec(context.Background())
1803+
innerStorage := promqltest.LoadedStorage(t, "")
1804+
t.Cleanup(func() { require.NoError(t, innerStorage.Close()) })
18311805

1832-
if shouldSucceed {
1833-
require.NoError(t, res.Err)
1834-
} else {
1835-
require.EqualError(t, res.Err, "something went wrong inside the query")
1806+
// Use a fake queryable as a way to check that the query is recorded as active while the query is in progress.
1807+
queryTrackingTestingQueryable := &activeQueryTrackerQueryable{
1808+
innerStorage: innerStorage,
1809+
tracker: tracker,
18361810
}
18371811

1838-
// Check that the query was active in the query tracker while the query was executing.
1839-
require.Equal(t, expr, queryTrackingTestingQueryable.activeQueryAtQueryTime.expr)
1840-
require.False(t, queryTrackingTestingQueryable.activeQueryAtQueryTime.deleted)
1812+
if !shouldSucceed {
1813+
queryTrackingTestingQueryable.err = errors.New("something went wrong inside the query")
1814+
}
18411815

1842-
// Check that the query has now been marked as deleted in the query tracker.
1843-
require.NotEmpty(t, tracker.queries)
1844-
trackedQuery := tracker.queries[len(tracker.queries)-1]
1845-
require.Equal(t, expr, trackedQuery.expr)
1846-
require.Equal(t, true, trackedQuery.deleted)
1816+
queryTypes := map[string]func(expr string) (promql.Query, error){
1817+
"range": func(expr string) (promql.Query, error) {
1818+
return engine.NewRangeQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0), timestamp.Time(0).Add(time.Hour), time.Minute)
1819+
},
1820+
"instant": func(expr string) (promql.Query, error) {
1821+
return engine.NewInstantQuery(context.Background(), queryTrackingTestingQueryable, nil, expr, timestamp.Time(0))
1822+
},
1823+
}
1824+
1825+
for queryType, createQuery := range queryTypes {
1826+
t.Run(queryType+" query", func(t *testing.T) {
1827+
expr := "test_" + queryType + "_query"
1828+
queryTrackingTestingQueryable.activeQueryAtQueryTime = trackedQuery{}
1829+
tracker.Clear()
1830+
1831+
q, err := createQuery(expr)
1832+
require.NoError(t, err)
1833+
defer q.Close()
1834+
1835+
if usePlanner {
1836+
expectedPlanningActivities := []trackedQuery{
1837+
{expr: expr + " # (planning)", deleted: true},
1838+
{expr: expr + " # (materialization)", deleted: true},
1839+
}
1840+
require.Equal(t, expectedPlanningActivities, tracker.queries)
1841+
}
1842+
1843+
res := q.Exec(context.Background())
1844+
1845+
if shouldSucceed {
1846+
require.NoError(t, res.Err)
1847+
} else {
1848+
require.EqualError(t, res.Err, "something went wrong inside the query")
1849+
}
1850+
1851+
// Check that the query was active in the query tracker while the query was executing.
1852+
require.Equal(t, expr, queryTrackingTestingQueryable.activeQueryAtQueryTime.expr)
1853+
require.False(t, queryTrackingTestingQueryable.activeQueryAtQueryTime.deleted)
1854+
1855+
// Check that the query has now been marked as deleted in the query tracker.
1856+
require.NotEmpty(t, tracker.queries)
1857+
trackedQuery := tracker.queries[len(tracker.queries)-1]
1858+
require.Equal(t, expr, trackedQuery.expr)
1859+
require.Equal(t, true, trackedQuery.deleted)
1860+
})
1861+
}
18471862
})
18481863
}
18491864
})
@@ -1880,6 +1895,10 @@ func (qt *testQueryTracker) Close() error {
18801895
return nil
18811896
}
18821897

1898+
func (qt *testQueryTracker) Clear() {
1899+
qt.queries = nil
1900+
}
1901+
18831902
type activeQueryTrackerQueryable struct {
18841903
tracker *testQueryTracker
18851904

Diff for: pkg/streamingpromql/planning.go

+20
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ var timeSince = time.Since
3535

3636
type QueryPlanner struct {
3737
features Features
38+
activeQueryTracker promql.QueryTracker
3839
noStepSubqueryIntervalFn func(rangeMillis int64) int64
3940
astOptimizationPasses []optimize.ASTOptimizationPass
4041
planOptimizationPasses []optimize.QueryPlanOptimizationPass
@@ -44,6 +45,7 @@ type QueryPlanner struct {
4445
func NewQueryPlanner(opts EngineOpts) *QueryPlanner {
4546
return &QueryPlanner{
4647
features: opts.Features,
48+
activeQueryTracker: opts.CommonOpts.ActiveQueryTracker,
4749
noStepSubqueryIntervalFn: opts.CommonOpts.NoStepSubqueryIntervalFn,
4850
planStageLatency: promauto.With(opts.CommonOpts.Reg).NewHistogramVec(prometheus.HistogramOpts{
4951
Name: "cortex_mimir_query_engine_plan_stage_latency_seconds",
@@ -75,6 +77,15 @@ type PlanningObserver interface {
7577
}
7678

7779
func (p *QueryPlanner) NewQueryPlan(ctx context.Context, qs string, timeRange types.QueryTimeRange, observer PlanningObserver) (*planning.QueryPlan, error) {
80+
if p.activeQueryTracker != nil {
81+
queryID, err := p.activeQueryTracker.Insert(ctx, qs+" # (planning)")
82+
if err != nil {
83+
return nil, err
84+
}
85+
86+
defer p.activeQueryTracker.Delete(queryID)
87+
}
88+
7889
expr, err := p.runASTStage("Parsing", observer, func() (parser.Expr, error) { return parser.ParseExpr(qs) })
7990
if err != nil {
8091
return nil, err
@@ -390,6 +401,15 @@ func (e *Engine) Materialize(ctx context.Context, plan *planning.QueryPlan, quer
390401
opts = promql.NewPrometheusQueryOpts(false, 0)
391402
}
392403

404+
if e.activeQueryTracker != nil {
405+
queryID, err := e.activeQueryTracker.Insert(ctx, plan.OriginalExpression+" # (materialization)")
406+
if err != nil {
407+
return nil, err
408+
}
409+
410+
defer e.activeQueryTracker.Delete(queryID)
411+
}
412+
393413
q, err := e.newQuery(ctx, queryable, opts, plan.TimeRange)
394414
if err != nil {
395415
return nil, err

Diff for: pkg/streamingpromql/planning_test.go

+19
Original file line numberDiff line numberDiff line change
@@ -1052,6 +1052,25 @@ func TestPlanCreationEncodingAndDecoding(t *testing.T) {
10521052
}
10531053
}
10541054

1055+
func TestQueryPlanner_ActivityTracking(t *testing.T) {
1056+
opts := NewTestEngineOpts()
1057+
opts.UseQueryPlanning = true
1058+
tracker := &testQueryTracker{}
1059+
opts.CommonOpts.ActiveQueryTracker = tracker
1060+
planner := NewQueryPlanner(opts)
1061+
1062+
expr := "test"
1063+
timeRange := types.NewInstantQueryTimeRange(time.Now())
1064+
_, err := planner.NewQueryPlan(context.Background(), expr, timeRange, NoopPlanningObserver{})
1065+
require.NoError(t, err)
1066+
1067+
expectedPlanningActivities := []trackedQuery{
1068+
{expr: "test # (planning)", deleted: true},
1069+
}
1070+
1071+
require.Equal(t, expectedPlanningActivities, tracker.queries)
1072+
}
1073+
10551074
func TestAnalysisHandler(t *testing.T) {
10561075
originalTimeSince := timeSince
10571076
timeSince = func(_ time.Time) time.Duration { return 1234 * time.Millisecond }

0 commit comments

Comments
 (0)