Skip to content

Refactor scheduler to run plugins #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/epp/backend/metrics/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func (p *Pod) String() string {
}

func (p *Pod) Clone() *Pod {
if p == nil {
return nil
}
return &Pod{
NamespacedName: types.NamespacedName{
Name: p.NamespacedName.Name,
Expand Down Expand Up @@ -118,6 +121,9 @@ func (m *Metrics) String() string {
}

func (m *Metrics) Clone() *Metrics {
if m == nil {
return nil
}
cm := make(map[string]int, len(m.ActiveModels))
for k, v := range m.ActiveModels {
cm[k] = v
Expand Down
9 changes: 4 additions & 5 deletions pkg/epp/handlers/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ func (s *StreamingServer) HandleRequestBody(
ResolvedTargetModel: modelName,
Critical: modelObj.Spec.Criticality != nil && *modelObj.Spec.Criticality == v1alpha2.Critical,
}
logger.V(logutil.DEBUG).Info("LLM request assembled", "model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "critical", llmReq.Critical)
logger.V(logutil.DEBUG).Info("LLM request assembled", "request", llmReq)

var err error
// Update target models in the body.
Expand All @@ -81,11 +81,11 @@ func (s *StreamingServer) HandleRequestBody(
return reqCtx, errutil.Error{Code: errutil.Internal, Msg: fmt.Sprintf("error marshaling request body: %v", err)}
}

target, err := s.scheduler.Schedule(ctx, llmReq)
res, err := s.scheduler.Schedule(ctx, llmReq)
if err != nil {
return reqCtx, errutil.Error{Code: errutil.InferencePoolResourceExhausted, Msg: fmt.Errorf("failed to find target pod: %w", err).Error()}
}
targetPod := target.GetPod()
targetPod := res.TargetPod.GetPod()

// Insert target endpoint to instruct Envoy to route requests to the specified target pod.
// Attach the port number
Expand All @@ -96,8 +96,7 @@ func (s *StreamingServer) HandleRequestBody(
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))

logger.V(logutil.DEFAULT).Info("Request handled",
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics",
fmt.Sprintf("%+v", target))
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the metrics of the picked endpoint is useful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that the metric list may grow, making this log very long. I am not sure how useful it is, because the decision is made upon multiple pods, you will need to compare this pod to other pods to determine whether the decision is good or not (if this is the intention of having this log)


reqCtx.Model = llmReq.Model
reqCtx.ResolvedTargetModel = llmReq.ResolvedTargetModel
Expand Down
2 changes: 1 addition & 1 deletion pkg/epp/handlers/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ type StreamingServer struct {
}

type Scheduler interface {
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (targetPod schedulingtypes.Pod, err error)
Schedule(ctx context.Context, b *schedulingtypes.LLMRequest) (result *schedulingtypes.Result, err error)
}

// RequestContext stores context information during the life time of an HTTP request.
Expand Down
22 changes: 22 additions & 0 deletions pkg/epp/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
const (
InferenceModelComponent = "inference_model"
InferencePoolComponent = "inference_pool"
EPPComponent = "endpoint_picker"
)

var (
Expand Down Expand Up @@ -176,6 +177,20 @@ var (
},
[]string{"name"},
)

// Scheduler Plugin Metrics
SchedulerPluginProcessingLatencies = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Subsystem: EPPComponent,
Name: "scheduler_plugin_duration_seconds",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's great!

Help: "Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.",
Buckets: []float64{
0.0001, 0.0002, 0.0005, 0.001, 0.002, 0.005, 0.01, 0.02, 0.05, 0.1,
},
StabilityLevel: compbasemetrics.ALPHA,
},
[]string{"plugin_type", "plugin_name"},
)
)

var registerMetrics sync.Once
Expand All @@ -196,6 +211,8 @@ func Register() {
legacyregistry.MustRegister(inferencePoolAvgKVCache)
legacyregistry.MustRegister(inferencePoolAvgQueueSize)
legacyregistry.MustRegister(inferencePoolReadyPods)

legacyregistry.MustRegister(SchedulerPluginProcessingLatencies)
})
}

Expand Down Expand Up @@ -293,3 +310,8 @@ func RecordInferencePoolAvgQueueSize(name string, queueSize float64) {
func RecordinferencePoolReadyPods(name string, runningPods float64) {
inferencePoolReadyPods.WithLabelValues(name).Set(runningPods)
}

// RecordSchedulerPluginProcessingLatency records the processing latency for a scheduler plugin.
func RecordSchedulerPluginProcessingLatency(pluginType, pluginName string, duration time.Duration) {
SchedulerPluginProcessingLatencies.WithLabelValues(pluginType, pluginName).Observe(duration.Seconds())
}
64 changes: 64 additions & 0 deletions pkg/epp/metrics/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -556,3 +556,67 @@ func TestInferencePoolMetrics(t *testing.T) {
})
}
}

func TestSchedulerPluginProcessingLatencies(t *testing.T) {
type pluginLatency struct {
pluginType string
pluginName string
duration time.Duration
}
scenarios := []struct {
name string
latencies []pluginLatency
}{
{
name: "multiple plugins",
latencies: []pluginLatency{
{
pluginType: "PreSchedule",
pluginName: "PluginA",
duration: 100 * time.Millisecond,
},
{
pluginType: "PostSchedule",
pluginName: "PluginB",
duration: 200 * time.Millisecond,
},
{
pluginType: "Filter",
pluginName: "PluginC",
duration: 50 * time.Millisecond,
},
{
pluginType: "Scorer",
pluginName: "PluginD",
duration: 10 * time.Millisecond,
},
{
pluginType: "Picker",
pluginName: "PluginE",
duration: 10 * time.Microsecond,
},
},
},
}
Register()
for _, scenario := range scenarios {
t.Run(scenario.name, func(t *testing.T) {
for _, latency := range scenario.latencies {
RecordSchedulerPluginProcessingLatency(latency.pluginType, latency.pluginName, latency.duration)
}

wantPluginLatencies, err := os.Open("testdata/scheduler_plugin_processing_latencies_metric")
defer func() {
if err := wantPluginLatencies.Close(); err != nil {
t.Error(err)
}
}()
if err != nil {
t.Fatal(err)
}
if err := testutil.GatherAndCompare(legacyregistry.DefaultGatherer, wantPluginLatencies, "endpoint_picker_scheduler_plugin_processing_latencies"); err != nil {
t.Error(err)
}
})
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# HELP endpoint_picker_scheduler_plugin_duration_seconds [ALPHA] Scheduler plugin processing latency distribution in seconds for each plugin type and plugin name.
# TYPE endpoint_picker_scheduler_plugin_duration_seconds histogram
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.0005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.01"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.02"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.05"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="0.1"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginA",plugin_type="PreSchedule",le="+Inf"} 1
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginA",plugin_type="PreSchedule"} 0.1
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginA",plugin_type="PreSchedule"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.0005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.01"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.02"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.05"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="0.1"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginB",plugin_type="PostSchedule",le="+Inf"} 1
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginB",plugin_type="PostSchedule"} 0.2
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginB",plugin_type="PostSchedule"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.0005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.01"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.02"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.05"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="0.1"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginC",plugin_type="Filter",le="+Inf"} 1
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginC",plugin_type="Filter"} 0.05
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginC",plugin_type="Filter"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.0005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.001"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.002"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.005"} 0
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.01"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.02"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.05"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="0.1"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginD",plugin_type="Scorer",le="+Inf"} 1
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginD",plugin_type="Scorer"} 0.01
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginD",plugin_type="Scorer"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0001"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0002"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.0005"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.001"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.002"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.005"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.01"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.02"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.05"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="0.1"} 1
endpoint_picker_scheduler_plugin_duration_seconds_bucket{plugin_name="PluginE",plugin_type="Picker",le="+Inf"} 1
endpoint_picker_scheduler_plugin_duration_seconds_sum{plugin_name="PluginE",plugin_type="Picker"} 1e-05
endpoint_picker_scheduler_plugin_duration_seconds_count{plugin_name="PluginE",plugin_type="Picker"} 1
58 changes: 58 additions & 0 deletions pkg/epp/scheduling/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
Copyright 2025 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
"sigs.k8s.io/controller-runtime/pkg/log"
envutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/env"
logutil "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/util/logging"
)

// Config holds all the configuration values for the scheduler
type Config struct {
KVCacheThreshold float64
QueueThresholdCritical int
QueueingThresholdLoRA int
LoraAffinityThreshold float64
}

const (
// Default values to use if environment variables are not set
defaultKVCacheThreshold = 0.8
defaultQueueThresholdCritical = 5
defaultQueueingThresholdLoRA = 128
defaultLoraAffinityThreshold = 0.999
)

// LoadConfig loads configuration from environment variables
func LoadConfig() Config {
// Use a default logger for initial configuration loading
baseLogger := log.Log.WithName("scheduling-config")

config := Config{
KVCacheThreshold: envutil.GetEnvFloat("KV_CACHE_THRESHOLD", defaultKVCacheThreshold, baseLogger),
QueueThresholdCritical: envutil.GetEnvInt("QUEUE_THRESHOLD_CRITICAL", defaultQueueThresholdCritical, baseLogger),
QueueingThresholdLoRA: envutil.GetEnvInt("QUEUING_THRESHOLD_LORA", defaultQueueingThresholdLoRA, baseLogger),
LoraAffinityThreshold: envutil.GetEnvFloat("LORA_AFFINITY_THRESHOLD", defaultLoraAffinityThreshold, baseLogger),
}

baseLogger.V(logutil.DEFAULT).Info("Scheduler configuration loaded", "config", config)

return config
}

var Conf = LoadConfig()
Loading