Skip to content

Refactor scheduler to run plugins #677

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 22, 2025

Conversation

liu-cong
Copy link
Contributor

@liu-cong liu-cong commented Apr 10, 2025

Inspired by the kube scheduler framework, this PR adds the following scheduler "plugins" that run in the following order:

  • PreSchedule: A list of plugins, runs at the beginning of each scheduling request. This is a noop in current scheduler. In the prefix caching followup PR, I will use this to pre-calculate data such as request block hashes for the prefix scorer to consume later on.
  • Filter: A list, filters down the list of available pods. This is the same with current scheduler filter interface.
  • Score: A list of scorers to run for each pod, the final score is calculated as a weighted sum. In the follow up PR, I will use this to score pods based on prefix matching, queue depth and kv-cache.
  • Picker, a single plugin, that picks the final pod. Currently this randomly picks a pod. We will have a "topK" picker that picks pods with top scores.
    • PostSchedule: A list, runs after a scheduling decision is made (a targetPod is picked). Currently a noop. In the follow up prefix caching PR, I will use this to update the cache lookup table.

This is a pure refactor.

This should be an incremental step towards making the scheduler more pluggable and even dynamically configurable.

benchmark

I ran a benchmark to make sure there are no regressions. In the benchmark run, the refactored EPP performed slightly better . But this could just be variances in different benchmark runs. I don't expect changes in performance as this is a pure refactor.

Baseline:
image
image

Refactor:
image
image

@k8s-ci-robot k8s-ci-robot added the cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. label Apr 10, 2025
@k8s-ci-robot k8s-ci-robot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Apr 10, 2025
Copy link

netlify bot commented Apr 10, 2025

Deploy Preview for gateway-api-inference-extension ready!

Name Link
🔨 Latest commit 83589eb
🔍 Latest deploy log https://app.netlify.com/sites/gateway-api-inference-extension/deploys/67fd56715fad4300087208b4
😎 Deploy Preview https://deploy-preview-677--gateway-api-inference-extension.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify site configuration.

@liu-cong
Copy link
Contributor Author

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is moved from scheduler.go.

@@ -257,23 +254,46 @@ func loRASoftAffinityFilterFunc(ctx *types.Context, pods []*types.PodMetrics) ([
return filtered_available, nil
}

var HasCapacityFilter = &BasicFilter{
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are moved from scheduler.go. It's cleaner to group all filters here.

}

func (rp *RandomPicker) Pick(ctx *types.Context, pods []types.Pod) (*types.Result, error) {
ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This moved from the existing "random picking" behavior in scheduler.go

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as noted in other comment, this seems to belong in scheduler. I would put back there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RandomPicker is a scheduler plugin, so IMO it makes sense to have it here.

return "DefaultPlugin"
}

func (p *defaultPlugin) Filter(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaultPlugin implements the existing scheduler filter, and the random picking behavior. Other plugin interfaces are all noop.

Copy link
Contributor

@kaushikmitr kaushikmitr Apr 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we incorporate scoring semantics to the current filtering logic? That way we can integrate it with future scoring plugins if needed? To clarify the kube documentaiton says: "The scheduler will call each scoring plugin for each node. There will be a well defined range of integers representing the minimum and maximum scores. After the NormalizeScore phase, the scheduler will combine node scores from all plugins according to the configured plugin weights.": I believe the current default filter does not assign any scores?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we incorporate scoring semantics to the current filtering logic?

I am not sure I get what you mean, if I get it wrong , please clarify! We have 2 different plugins: Filter, which removes "ineligible" pods. Scorer, which gives each pod (analogous to the node in kube scheduler) a score. And finally we have a Picker which can pick pods based on the scores.

They have different purposes. That said, you can use some "score" mechanisms in a particular filter implementation. However that's implementation detail and not part of the Filter interface, if that's what you are asking.

The current filters are more of "soft filters". The idea is to move them to the Scorer interface if applicable. For example, the minKvCache filter can be easily converted to a scorer which just ranks pods based on kv cache utilization.

}
return pm
}

// Result captures the scheduler result.
type Result struct {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Creating a Result struct so it will be easier to extend in the future (such as adding fallback)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get the idea of trying to built it for future changes. I think at this point this is more confusing than helping.
for example when looking on picker interface and more specifically on the func decleration:

Pick(ctx *Context, pods []Pod) (*Result, error)

I was expecting to see that Pod is selected and not *Result.
basically it's more or less the same comment as pre/post schedule.. as long as it's not used, it's not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Result gives us flexibility to add more, such as "reason for this pod is picked", or the "fallback pod" use case. I wanted to do this now because as we add more plugin interfaces and implementations, changing the return type will become harder. Also from a readability perspective, returning a Result is pretty readable IMO.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we decide to keep the result, I would at least rename to SchedulingResult or something like that, as Result is very general term.

Copy link
Contributor

@ahg-g ahg-g left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good start, I think we need to define a configuration API for those plugins, but that can come as a followup.

@@ -96,8 +96,7 @@ func (s *StreamingServer) HandleRequestBody(
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber))

logger.V(logutil.DEFAULT).Info("Request handled",
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics",
fmt.Sprintf("%+v", target))
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logging the metrics of the picked endpoint is useful here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My concern is that the metric list may grow, making this log very long. I am not sure how useful it is, because the decision is made upon multiple pods, you will need to compare this pod to other pods to determine whether the decision is good or not (if this is the intention of having this log)

@@ -14,91 +14,88 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling
package plugins

import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was expecting to see a different structure.
I expected to see filter dir (and package), under plugins.
then a file filter.go with the interface and some general definitions and types.
then for each filter I expect to see a different file.
it may end up with multiple files but in terms of readability and maintainability I think it's much more easier to maintain and understand

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then a file filter.go with the interface and some general definitions and types.

I prefer defining all the plugin interfaces in one place (currently in interfaces.go). Open to feedback but I feel this is the most discoverable approach.

then for each filter I expect to see a different file.

I like this. I think this is what this will look like eventually. However I'd like to defer this to minimize the changes.
Currently we have plugins/filter.go which is a pretty small file, and we may deprecate some of these filters to scorers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I'd like to defer this to minimize the changes.

+1, this PR is already large enough.

SchedulerPluginProcessingLatencies = compbasemetrics.NewHistogramVec(
&compbasemetrics.HistogramOpts{
Subsystem: EPPComponent,
Name: "scheduler_plugin_duration_seconds",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's great!

@@ -107,12 +104,12 @@ func (f *decisionTreeFilter) Filter(ctx *types.Context, pods []*types.PodMetrics
}

// filterFunc filters a set of input pods to a subset.
type filterFunc func(ctx *types.Context, pods []*types.PodMetrics) ([]*types.PodMetrics, error)
type filterFunc func(ctx *types.Context, pods []types.Pod) ([]types.Pod, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I noticed that error is not used anywhere other than the following two places:

  • DropRequestFilter.
  • toFilterFunc function only in case no pods are left after the filter is applied.

I think this is a wrong usage of the filter terminology and returning error here may be very confusing for the reader.

filter by definition gets a set of pods, apply some conditional check to each and return a subset of the pods (the ones that returned true). DropRequest is not a filter by this definition.
toFilterFunc can return empty slice of pods and the return value should be checked in the caller (instead of checking for err, we should check if len(pods)==0).

with that said, the filter decision tree becomes redundant, cause there are no errors when applying filters. therefore, there should be no "NextOnFailure". the decision tree can become just a chain of filters without the use of error in the return value and with the addition of checking for length of the returned slice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

applying a filter that ends up with 0 pods is not an error :)
the filter succeeded.
the result may be not exactly what the caller wanted, but there is no error in the filter.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get your point. Again would like to defer such changes to minimize changes. Would you like to do a follow up PR or just open an issue for this?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's do both. :)
please create an issue with pointer to this comment so we won't forget and I'm happy to do a followup PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked out your code and played with the idea of removing errors. it's was quite straight forward with no issues. I can create a PR to your branch if you'd like.

// NoopPlugin provides a default, no-operation implementation of the Plugin interface.
// It can be embedded in other plugin implementations to avoid boilerplate code for
// unused methods.
type NoopPlugin struct{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this used somewhere?
I didn't find any usage.

In general, this doesn't seem to belong here. probably good for testing purposes and belongs under one of the test files (internally in the test setup).

// Picker picks the final pod(s) to send the request to.
type Picker interface {
Plugin
Pick(ctx *Context, pods []Pod) (*Result, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why pick returns an error?
there is no error I can think of in picking an entry from a slice.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liu-cong does your KV cache code have a use case for returning an error?

Comment on lines +169 to +175
for _, pod := range pods {
score, err := runScorersForPod(ctx, s.scorers, pod)
if err != nil {
return err
}
pod.SetScore(score)
}
Copy link
Contributor

@nirrozenbaum nirrozenbaum Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this logic is missing the score normalization phase and the scorer weight.
normalization part: not all scorers are necessarily using the same score range. we need to normalize the scores of all scorers to the same range and use some formula for calculating weighted score, e.g.,: weightedScore = w1s1 + w2s2 ..., where w1 is the weight of scorer1 and s1 is the NORMALIZED score of scorer1.
weight: not all scorers necessarily have the same weight. we might want to define different weights for different scorers.

postSchedulePlugins []types.PostSchedule
filters []types.Filter
scorers []types.Scorer
picker types.Picker
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reading this again, and Picker interface seems redundant to me.
the scheduler itself IS the picker.
scheduler runs filters to remove pods, scorers to score the filtered pods, and then pick a pod from the result list based on the calculated score of each.
maybe it would make sense to put something like SchedulerStrategy. I can image two possible strategies:

  • pick the highest, fallback to second, etc.
  • pick randomly while using the score for probabilities.
    in current code the only option is random and therefore I wouldn't do the above suggestion in this PR, but only when (and if) needed.

Comment on lines +153 to +155
before := time.Now()
filteredPods, err := filter.Filter(ctx, pods)
metrics.RecordSchedulerPluginProcessingLatency(types.FilterPluginType, filter.Name(), time.Since(before))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not measurement of each filter. since the filter is defined as a decision tree, these lines measures the decision tree time, means it calculates all filters.
this is another reason for transitioning to filter chain and not decision tree (when removing error from filter as noted in the other comment). we should be able to see how much time each filter took.

}

// Iterate through each scorer in the chain and accumulate the scores.
func runScorersForPod(ctx *types.Context, scorers []types.Scorer, pod types.Pod) (float64, error) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be part of the Scheduler functions, e.g., func (s *Scheduler) runScorersForPod and then we shouldn't pass scorers as argument.

// Scorer defines the interface for scoring pods based on context.
type Scorer interface {
Plugin
Score(ctx *Context, pod Pod) (float64, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as comments on filter error and picker error. I don't think scorer should ever return an error.
if there is not real reason for using error in returned values we should remove those.
adding error return values in all interfaces make code harder to follow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think scorer should ever return an error. If there is not real reason for using error in returned values we should remove those.

@liu-cong does your KV cache code have a use case for returning an error for the Score() method?

*backendmetrics.Pod
*backendmetrics.Metrics
}

func NewContext(ctx context.Context, req *LLMRequest, pods []*PodMetrics) *Context {
func NewContext(ctx context.Context, req *LLMRequest, pods []Pod) *Context {
Copy link
Contributor

@nirrozenbaum nirrozenbaum Apr 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consider renaming to SchedulerContext or something like that.
when first seeing context it was confusing (I was expecting the golang context.Context).


func (rp *RandomPicker) Pick(ctx *types.Context, pods []types.Pod) (*types.Result, error) {
ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods))
i := rand.Intn(len(pods))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a general random logic without any usage of the scores. it misses the whole point of scoring pods.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liu-cong correct me if I'm wrong, but this PR's intent is to refactor for the scheduler plugin architecture. Can you confirm that a ScorerPicker will be introduced to pick a Pod based on the score set by runScorerPlugins() in a follow-on PR?

@nirrozenbaum
Copy link
Contributor

/assign

Copy link
Contributor

@danehans danehans left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other than a few nits that can be resolved in a follow-on PR, LGTM.

@@ -14,91 +14,88 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package scheduling
package plugins

import (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I'd like to defer this to minimize the changes.

+1, this PR is already large enough.

func ToSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []*PodMetrics {
pm := make([]*PodMetrics, 0, len(pods))
func ToSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []Pod {
pm := make([]Pod, 0, len(pods))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since the return type is changed, consider renaming the var from pm to p or something similar to the return type.

// Scorer defines the interface for scoring pods based on context.
type Scorer interface {
Plugin
Score(ctx *Context, pod Pod) (float64, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think scorer should ever return an error. If there is not real reason for using error in returned values we should remove those.

@liu-cong does your KV cache code have a use case for returning an error for the Score() method?

// Picker picks the final pod(s) to send the request to.
type Picker interface {
Plugin
Pick(ctx *Context, pods []Pod) (*Result, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@liu-cong does your KV cache code have a use case for returning an error?

@danehans
Copy link
Contributor

@liu-cong when you have a moment, can you resolve or respond to @nirrozenbaum's feedback?

@kfswain
Copy link
Collaborator

kfswain commented Apr 21, 2025

This PR seems to be in a workable state for further iterations.

/lgtm
/approve

@k8s-ci-robot k8s-ci-robot added the lgtm "Looks good to me", indicates that a PR is ready to be merged. label Apr 21, 2025
@k8s-ci-robot
Copy link
Contributor

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: kfswain, liu-cong

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@k8s-ci-robot k8s-ci-robot added the approved Indicates a PR has been approved by an approver from all required OWNERS files. label Apr 21, 2025
@ahg-g
Copy link
Contributor

ahg-g commented Apr 21, 2025

/hold

I would like to ensure we don't have a performance regression, holding to have a quick discussion on this

@k8s-ci-robot k8s-ci-robot added the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Apr 21, 2025
@ahg-g
Copy link
Contributor

ahg-g commented Apr 22, 2025

I ran the benchmark again, I think the regression is not related to the refactor PR, it is an earlier change. I will try to find which exact PR caused the regression, but I think we can move forward with either Nir's slimmed down PR or this PR

@kfswain
Copy link
Collaborator

kfswain commented Apr 22, 2025

/unhold

@k8s-ci-robot k8s-ci-robot removed the do-not-merge/hold Indicates that a PR should not merge because someone has issued a /hold command. label Apr 22, 2025
@k8s-ci-robot k8s-ci-robot merged commit 45209f6 into kubernetes-sigs:main Apr 22, 2025
8 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved Indicates a PR has been approved by an approver from all required OWNERS files. cncf-cla: yes Indicates the PR's author has signed the CNCF CLA. lgtm "Looks good to me", indicates that a PR is ready to be merged. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

7 participants