Skip to content

Commit 45209f6

Browse files
authored
Refactor scheduler to run plugins (#677)
* Refactor scheduler to run plugins * Add scheduler plugin latency metric * Address comments * Address comments
1 parent 9114b35 commit 45209f6

File tree

15 files changed

+969
-246
lines changed

15 files changed

+969
-246
lines changed

pkg/epp/backend/metrics/types.go

+6
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,9 @@ func (p *Pod) String() string {
7979
}
8080

8181
func (p *Pod) Clone() *Pod {
82+
if p == nil {
83+
return nil
84+
}
8285
return &Pod{
8386
NamespacedName: types.NamespacedName{
8487
Name: p.NamespacedName.Name,
@@ -118,6 +121,9 @@ func (m *Metrics) String() string {
118121
}
119122

120123
func (m *Metrics) Clone() *Metrics {
124+
if m == nil {
125+
return nil
126+
}
121127
cm := make(map[string]int, len(m.ActiveModels))
122128
for k, v := range m.ActiveModels {
123129
cm[k] = v

pkg/epp/handlers/request.go

+4-5
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (s *StreamingServer) HandleRequestBody(
6767
ResolvedTargetModel: modelName,
6868
Critical: modelObj.Spec.Criticality != nil && *modelObj.Spec.Criticality == v1alpha2.Critical,
6969
}
70-
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical)
70+
logger.V(logutil.DEBUG).Info("LLM request assembled", "request", llmReq)
7171

7272
var err error
7373
// Update target models in the body.
@@ -81,11 +81,11 @@ func (s *StreamingServer) HandleRequestBody(
8181
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
8282
}
8383

84-
target, err := s.scheduler.Schedule(ctx, llmReq)
84+
res, err := s.scheduler.Schedule(ctx, llmReq)
8585
if err != nil {
8686
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
8787
}
88-
targetPod := target.GetPod()
88+
targetPod := res.TargetPod.GetPod()
8989

9090
// Insert target endpoint to instruct Envoy to route requests to the specified target pod.
9191
// Attach the port number
@@ -96,8 +96,7 @@ func (s *StreamingServer) HandleRequestBody(
9696
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))
9797

9898
logger.V(logutil.DEFAULT).Info("Request handled",
99-
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics",
100-
fmt.Sprintf("%+v", target))
99+
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod)
101100

102101
reqCtx.Model = llmReq.Model
103102
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel

pkg/epp/handlers/server.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ type StreamingServer struct {
6565
}
6666

6767
type Scheduler interface {
68-
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (targetPod schedulingtypes.Pod, err error)
68+
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.Result, err error)
6969
}
7070

7171
// RequestContext stores context information during the life time of an HTTP request.

pkg/epp/metrics/metrics.go

+22
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
const (
3131
InferenceModelComponent = "inference_model"
3232
InferencePoolComponent = "inference_pool"
33+
EPPComponent = "endpoint_picker"
3334
)
3435

3536
var (
@@ -176,6 +177,20 @@ var (
176177
},
177178
[]string{"name"},
178179
)
180+
181+
// Scheduler Plugin Metrics
182+
SchedulerPluginProcessingLatencies = compbasemetrics.NewHistogramVec(
183+
&compbasemetrics.HistogramOpts{
184+
Subsystem: EPPComponent,
185+
Name: "scheduler_plugin_duration_seconds",
186+
Help: "Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.",
187+
Buckets: []float64{
188+
0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
189+
},
190+
StabilityLevel: compbasemetrics.ALPHA,
191+
},
192+
[]string{"plugin_type", "plugin_name"},
193+
)
179194
)
180195

181196
var registerMetrics sync.Once
@@ -196,6 +211,8 @@ func Register() {
196211
legacyregistry.MustRegister(inferencePoolAvgKVCache)
197212
legacyregistry.MustRegister(inferencePoolAvgQueueSize)
198213
legacyregistry.MustRegister(inferencePoolReadyPods)
214+
215+
legacyregistry.MustRegister(SchedulerPluginProcessingLatencies)
199216
})
200217
}
201218

@@ -293,3 +310,8 @@ func RecordInferencePoolAvgQueueSize(name string, queueSize float64) {
293310
func RecordinferencePoolReadyPods(name string, runningPods float64) {
294311
inferencePoolReadyPods.WithLabelValues(name).Set(runningPods)
295312
}
313+
314+
// RecordSchedulerPluginProcessingLatency records the processing latency for a scheduler plugin.
315+
func RecordSchedulerPluginProcessingLatency(pluginType, pluginName string, duration time.Duration) {
316+
SchedulerPluginProcessingLatencies.WithLabelValues(pluginType, pluginName).Observe(duration.Seconds())
317+
}

pkg/epp/metrics/metrics_test.go

+64
Original file line numberDiff line numberDiff line change
@@ -556,3 +556,67 @@ func TestInferencePoolMetrics(t *testing.T) {
556556
})
557557
}
558558
}
559+
560+
func TestSchedulerPluginProcessingLatencies(t *testing.T) {
561+
type pluginLatency struct {
562+
pluginType string
563+
pluginName string
564+
duration time.Duration
565+
}
566+
scenarios := []struct {
567+
name string
568+
latencies []pluginLatency
569+
}{
570+
{
571+
name: "multiple plugins",
572+
latencies: []pluginLatency{
573+
{
574+
pluginType: "PreSchedule",
575+
pluginName: "PluginA",
576+
duration: 100 * time.Millisecond,
577+
},
578+
{
579+
pluginType: "PostSchedule",
580+
pluginName: "PluginB",
581+
duration: 200 * time.Millisecond,
582+
},
583+
{
584+
pluginType: "Filter",
585+
pluginName: "PluginC",
586+
duration: 50 * time.Millisecond,
587+
},
588+
{
589+
pluginType: "Scorer",
590+
pluginName: "PluginD",
591+
duration: 10 * time.Millisecond,
592+
},
593+
{
594+
pluginType: "Picker",
595+
pluginName: "PluginE",
596+
duration: 10 * time.Microsecond,
597+
},
598+
},
599+
},
600+
}
601+
Register()
602+
for _, scenario := range scenarios {
603+
t.Run(scenario.name, func(t *testing.T) {
604+
for _, latency := range scenario.latencies {
605+
RecordSchedulerPluginProcessingLatency(latency.pluginType, latency.pluginName, latency.duration)
606+
}
607+
608+
wantPluginLatencies, err := os.Open("testdata/scheduler_plugin_processing_latencies_metric")
609+
defer func() {
610+
if err := wantPluginLatencies.Close(); err != nil {
611+
t.Error(err)
612+
}
613+
}()
614+
if err != nil {
615+
t.Fatal(err)
616+
}
617+
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantPluginLatencies, "endpoint_picker_scheduler_plugin_processing_latencies"); err != nil {
618+
t.Error(err)
619+
}
620+
})
621+
}
622+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# HELP endpoint_picker_scheduler_plugin_duration_seconds [ALPHA] Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.
2+
# TYPE endpoint_picker_scheduler_plugin_duration_seconds histogram
3+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0001"} 0
4+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0002"} 0
5+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0005"} 0
6+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.001"} 0
7+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.002"} 0
8+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.005"} 0
9+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.01"} 0
10+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.02"} 0
11+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.05"} 0
12+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.1"} 1
13+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="+Inf"} 1
14+
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginA",plugin_type="PreSchedule"} 0.1
15+
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginA",plugin_type="PreSchedule"} 1
16+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0001"} 0
17+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0002"} 0
18+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0005"} 0
19+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.001"} 0
20+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.002"} 0
21+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.005"} 0
22+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.01"} 0
23+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.02"} 0
24+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.05"} 0
25+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.1"} 0
26+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="+Inf"} 1
27+
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginB",plugin_type="PostSchedule"} 0.2
28+
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginB",plugin_type="PostSchedule"} 1
29+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0001"} 0
30+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0002"} 0
31+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0005"} 0
32+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.001"} 0
33+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.002"} 0
34+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.005"} 0
35+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.01"} 0
36+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.02"} 0
37+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.05"} 1
38+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.1"} 1
39+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="+Inf"} 1
40+
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginC",plugin_type="Filter"} 0.05
41+
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginC",plugin_type="Filter"} 1
42+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0001"} 0
43+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0002"} 0
44+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0005"} 0
45+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.001"} 0
46+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.002"} 0
47+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.005"} 0
48+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.01"} 1
49+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.02"} 1
50+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.05"} 1
51+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.1"} 1
52+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="+Inf"} 1
53+
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginD",plugin_type="Scorer"} 0.01
54+
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginD",plugin_type="Scorer"} 1
55+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0001"} 1
56+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0002"} 1
57+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0005"} 1
58+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.001"} 1
59+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.002"} 1
60+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.005"} 1
61+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.01"} 1
62+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.02"} 1
63+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.05"} 1
64+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.1"} 1
65+
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="+Inf"} 1
66+
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginE",plugin_type="Picker"} 1e-05
67+
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginE",plugin_type="Picker"} 1

pkg/epp/scheduling/config/config.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
Copyright 2025 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package config
18+
19+
import (
20+
"sigs.k8s.io/controller-runtime/pkg/log"
21+
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
22+
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
23+
)
24+
25+
// Config holds all the configuration values for the scheduler
26+
type Config struct {
27+
KVCacheThreshold float64
28+
QueueThresholdCritical int
29+
QueueingThresholdLoRA int
30+
LoraAffinityThreshold float64
31+
}
32+
33+
const (
34+
// Default values to use if environment variables are not set
35+
defaultKVCacheThreshold = 0.8
36+
defaultQueueThresholdCritical = 5
37+
defaultQueueingThresholdLoRA = 128
38+
defaultLoraAffinityThreshold = 0.999
39+
)
40+
41+
// LoadConfig loads configuration from environment variables
42+
func LoadConfig() Config {
43+
// Use a default logger for initial configuration loading
44+
baseLogger := log.Log.WithName("scheduling-config")
45+
46+
config := Config{
47+
KVCacheThreshold: envutil.GetEnvFloat("KV_CACHE_THRESHOLD", defaultKVCacheThreshold, baseLogger),
48+
QueueThresholdCritical: envutil.GetEnvInt("QUEUE_THRESHOLD_CRITICAL", defaultQueueThresholdCritical, baseLogger),
49+
QueueingThresholdLoRA: envutil.GetEnvInt("QUEUING_THRESHOLD_LORA", defaultQueueingThresholdLoRA, baseLogger),
50+
LoraAffinityThreshold: envutil.GetEnvFloat("LORA_AFFINITY_THRESHOLD", defaultLoraAffinityThreshold, baseLogger),
51+
}
52+
53+
baseLogger.V(logutil.DEFAULT).Info("Scheduler configuration loaded", "config", config)
54+
55+
return config
56+
}
57+
58+
var Conf = LoadConfig()

0 commit comments

Comments
 (0)