-
Notifications
You must be signed in to change notification settings - Fork 66
Refactor scheduler to run plugins #677
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
✅ Deploy Preview for gateway-api-inference-extension ready!
To edit notification comments on pull requests, go to your Netlify site configuration. |
pkg/epp/scheduling/config/config.go
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved from scheduler.go.
pkg/epp/scheduling/plugins/filter.go
Outdated
@@ -257,23 +254,46 @@ func loRASoftAffinityFilterFunc(ctx *types.Context, pods []*types.PodMetrics) ([ | |||
return filtered_available, nil | |||
} | |||
|
|||
var HasCapacityFilter = &BasicFilter{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are moved from scheduler.go. It's cleaner to group all filters here.
} | ||
|
||
func (rp *RandomPicker) Pick(ctx *types.Context, pods []types.Pod) (*types.Result, error) { | ||
ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This moved from the existing "random picking" behavior in scheduler.go
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as noted in other comment, this seems to belong in scheduler. I would put back there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
RandomPicker
is a scheduler plugin, so IMO it makes sense to have it here.
return "DefaultPlugin" | ||
} | ||
|
||
func (p *defaultPlugin) Filter(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The defaultPlugin implements the existing scheduler filter, and the random picking behavior. Other plugin interfaces are all noop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we incorporate scoring semantics to the current filtering logic? That way we can integrate it with future scoring plugins if needed? To clarify the kube documentaiton says: "The scheduler will call each scoring plugin for each node. There will be a well defined range of integers representing the minimum and maximum scores. After the NormalizeScore phase, the scheduler will combine node scores from all plugins according to the configured plugin weights.": I believe the current default filter does not assign any scores?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we incorporate scoring semantics to the current filtering logic?
I am not sure I get what you mean, if I get it wrong , please clarify! We have 2 different plugins: Filter, which removes "ineligible" pods. Scorer, which gives each pod (analogous to the node in kube scheduler) a score. And finally we have a Picker which can pick pods based on the scores.
They have different purposes. That said, you can use some "score" mechanisms in a particular filter implementation. However that's implementation detail and not part of the Filter interface, if that's what you are asking.
The current filters are more of "soft filters". The idea is to move them to the Scorer interface if applicable. For example, the minKvCache filter can be easily converted to a scorer which just ranks pods based on kv cache utilization.
} | ||
return pm | ||
} | ||
|
||
// Result captures the scheduler result. | ||
type Result struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Creating a Result struct so it will be easier to extend in the future (such as adding fallback)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get the idea of trying to built it for future changes. I think at this point this is more confusing than helping.
for example when looking on picker interface and more specifically on the func decleration:
Pick(ctx *Context, pods []Pod) (*Result, error)
I was expecting to see that Pod
is selected and not *Result.
basically it's more or less the same comment as pre/post schedule.. as long as it's not used, it's not needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Result
gives us flexibility to add more, such as "reason for this pod is picked", or the "fallback pod" use case. I wanted to do this now because as we add more plugin interfaces and implementations, changing the return type will become harder. Also from a readability perspective, returning a Result
is pretty readable IMO.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we decide to keep the result, I would at least rename to SchedulingResult
or something like that, as Result is very general term.
72fbea6
to
ba41961
Compare
ba41961
to
71c0f0c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good start, I think we need to define a configuration API for those plugins, but that can come as a followup.
@@ -96,8 +96,7 @@ func (s *StreamingServer) HandleRequestBody( | |||
endpoint := targetPod.Address + ":" + strconv.Itoa(int(pool.Spec.TargetPortNumber)) | |||
|
|||
logger.V(logutil.DEFAULT).Info("Request handled", | |||
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod, "endpoint metrics", | |||
fmt.Sprintf("%+v", target)) | |||
"model", llmReq.Model, "targetModel", llmReq.ResolvedTargetModel, "endpoint", targetPod) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logging the metrics of the picked endpoint is useful here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern is that the metric list may grow, making this log very long. I am not sure how useful it is, because the decision is made upon multiple pods, you will need to compare this pod to other pods to determine whether the decision is good or not (if this is the intention of having this log)
@@ -14,91 +14,88 @@ See the License for the specific language governing permissions and | |||
limitations under the License. | |||
*/ | |||
|
|||
package scheduling | |||
package plugins | |||
|
|||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was expecting to see a different structure.
I expected to see filter dir (and package), under plugins.
then a file filter.go with the interface and some general definitions and types.
then for each filter I expect to see a different file.
it may end up with multiple files but in terms of readability and maintainability I think it's much more easier to maintain and understand
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
then a file filter.go with the interface and some general definitions and types.
I prefer defining all the plugin interfaces in one place (currently in interfaces.go). Open to feedback but I feel this is the most discoverable approach.
then for each filter I expect to see a different file.
I like this. I think this is what this will look like eventually. However I'd like to defer this to minimize the changes.
Currently we have plugins/filter.go which is a pretty small file, and we may deprecate some of these filters to scorers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However I'd like to defer this to minimize the changes.
+1, this PR is already large enough.
SchedulerPluginProcessingLatencies = compbasemetrics.NewHistogramVec( | ||
&compbasemetrics.HistogramOpts{ | ||
Subsystem: EPPComponent, | ||
Name: "scheduler_plugin_duration_seconds", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's great!
@@ -107,12 +104,12 @@ func (f *decisionTreeFilter) Filter(ctx *types.Context, pods []*types.PodMetrics | |||
} | |||
|
|||
// filterFunc filters a set of input pods to a subset. | |||
type filterFunc func(ctx *types.Context, pods []*types.PodMetrics) ([]*types.PodMetrics, error) | |||
type filterFunc func(ctx *types.Context, pods []types.Pod) ([]types.Pod, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed that error is not used anywhere other than the following two places:
- DropRequestFilter.
- toFilterFunc function only in case no pods are left after the filter is applied.
I think this is a wrong usage of the filter terminology and returning error here may be very confusing for the reader.
filter by definition gets a set of pods, apply some conditional check to each and return a subset of the pods (the ones that returned true). DropRequest is not a filter by this definition.
toFilterFunc can return empty slice of pods and the return value should be checked in the caller (instead of checking for err, we should check if len(pods)==0).
with that said, the filter decision tree becomes redundant, cause there are no errors when applying filters. therefore, there should be no "NextOnFailure". the decision tree can become just a chain of filters without the use of error in the return value and with the addition of checking for length of the returned slice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applying a filter that ends up with 0 pods is not an error :)
the filter succeeded.
the result may be not exactly what the caller wanted, but there is no error in the filter.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get your point. Again would like to defer such changes to minimize changes. Would you like to do a follow up PR or just open an issue for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's do both. :)
please create an issue with pointer to this comment so we won't forget and I'm happy to do a followup PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I checked out your code and played with the idea of removing errors. it's was quite straight forward with no issues. I can create a PR to your branch if you'd like.
// NoopPlugin provides a default, no-operation implementation of the Plugin interface. | ||
// It can be embedded in other plugin implementations to avoid boilerplate code for | ||
// unused methods. | ||
type NoopPlugin struct{} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this used somewhere?
I didn't find any usage.
In general, this doesn't seem to belong here. probably good for testing purposes and belongs under one of the test files (internally in the test setup).
// Picker picks the final pod(s) to send the request to. | ||
type Picker interface { | ||
Plugin | ||
Pick(ctx *Context, pods []Pod) (*Result, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why pick returns an error?
there is no error I can think of in picking an entry from a slice.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liu-cong does your KV cache code have a use case for returning an error?
for _, pod := range pods { | ||
score, err := runScorersForPod(ctx, s.scorers, pod) | ||
if err != nil { | ||
return err | ||
} | ||
pod.SetScore(score) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this logic is missing the score normalization phase and the scorer weight.
normalization part: not all scorers are necessarily using the same score range. we need to normalize the scores of all scorers to the same range and use some formula for calculating weighted score, e.g.,: weightedScore = w1s1 + w2s2 ...
, where w1
is the weight of scorer1 and s1
is the NORMALIZED score of scorer1.
weight: not all scorers necessarily have the same weight. we might want to define different weights for different scorers.
postSchedulePlugins []types.PostSchedule | ||
filters []types.Filter | ||
scorers []types.Scorer | ||
picker types.Picker |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reading this again, and Picker interface seems redundant to me.
the scheduler itself IS the picker.
scheduler runs filters to remove pods, scorers to score the filtered pods, and then pick a pod from the result list based on the calculated score of each.
maybe it would make sense to put something like SchedulerStrategy. I can image two possible strategies:
- pick the highest, fallback to second, etc.
- pick randomly while using the score for probabilities.
in current code the only option is random and therefore I wouldn't do the above suggestion in this PR, but only when (and if) needed.
before := time.Now() | ||
filteredPods, err := filter.Filter(ctx, pods) | ||
metrics.RecordSchedulerPluginProcessingLatency(types.FilterPluginType, filter.Name(), time.Since(before)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not measurement of each filter. since the filter is defined as a decision tree, these lines measures the decision tree time, means it calculates all filters.
this is another reason for transitioning to filter chain and not decision tree (when removing error from filter as noted in the other comment). we should be able to see how much time each filter took.
} | ||
|
||
// Iterate through each scorer in the chain and accumulate the scores. | ||
func runScorersForPod(ctx *types.Context, scorers []types.Scorer, pod types.Pod) (float64, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this should be part of the Scheduler functions, e.g., func (s *Scheduler) runScorersForPod
and then we shouldn't pass scorers as argument.
// Scorer defines the interface for scoring pods based on context. | ||
type Scorer interface { | ||
Plugin | ||
Score(ctx *Context, pod Pod) (float64, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as comments on filter error and picker error. I don't think scorer should ever return an error.
if there is not real reason for using error in returned values we should remove those.
adding error return values in all interfaces make code harder to follow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think scorer should ever return an error. If there is not real reason for using error in returned values we should remove those.
@liu-cong does your KV cache code have a use case for returning an error for the Score()
method?
*backendmetrics.Pod | ||
*backendmetrics.Metrics | ||
} | ||
|
||
func NewContext(ctx context.Context, req *LLMRequest, pods []*PodMetrics) *Context { | ||
func NewContext(ctx context.Context, req *LLMRequest, pods []Pod) *Context { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider renaming to SchedulerContext
or something like that.
when first seeing context it was confusing (I was expecting the golang context.Context).
|
||
func (rp *RandomPicker) Pick(ctx *types.Context, pods []types.Pod) (*types.Result, error) { | ||
ctx.Logger.V(logutil.DEBUG).Info(fmt.Sprintf("Selecting a random pod from %d candidates: %+v", len(pods), pods)) | ||
i := rand.Intn(len(pods)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a general random logic without any usage of the scores. it misses the whole point of scoring pods.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liu-cong correct me if I'm wrong, but this PR's intent is to refactor for the scheduler plugin architecture. Can you confirm that a ScorerPicker
will be introduced to pick a Pod based on the score set by runScorerPlugins()
in a follow-on PR?
/assign |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other than a few nits that can be resolved in a follow-on PR, LGTM.
@@ -14,91 +14,88 @@ See the License for the specific language governing permissions and | |||
limitations under the License. | |||
*/ | |||
|
|||
package scheduling | |||
package plugins | |||
|
|||
import ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However I'd like to defer this to minimize the changes.
+1, this PR is already large enough.
func ToSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []*PodMetrics { | ||
pm := make([]*PodMetrics, 0, len(pods)) | ||
func ToSchedulerPodMetrics(pods []backendmetrics.PodMetrics) []Pod { | ||
pm := make([]Pod, 0, len(pods)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the return type is changed, consider renaming the var from pm
to p
or something similar to the return type.
// Scorer defines the interface for scoring pods based on context. | ||
type Scorer interface { | ||
Plugin | ||
Score(ctx *Context, pod Pod) (float64, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think scorer should ever return an error. If there is not real reason for using error in returned values we should remove those.
@liu-cong does your KV cache code have a use case for returning an error for the Score()
method?
// Picker picks the final pod(s) to send the request to. | ||
type Picker interface { | ||
Plugin | ||
Pick(ctx *Context, pods []Pod) (*Result, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@liu-cong does your KV cache code have a use case for returning an error?
@liu-cong when you have a moment, can you resolve or respond to @nirrozenbaum's feedback? |
This PR seems to be in a workable state for further iterations. /lgtm |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: kfswain, liu-cong The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
/hold I would like to ensure we don't have a performance regression, holding to have a quick discussion on this |
I ran the benchmark again, I think the regression is not related to the refactor PR, it is an earlier change. I will try to find which exact PR caused the regression, but I think we can move forward with either Nir's slimmed down PR or this PR |
/unhold |
Inspired by the kube scheduler framework, this PR adds the following scheduler "plugins" that run in the following order:
PreSchedule
: A list of plugins, runs at the beginning of each scheduling request. This is a noop in current scheduler. In the prefix caching followup PR, I will use this to pre-calculate data such as request block hashes for the prefix scorer to consume later on.Filter
: A list, filters down the list of available pods. This is the same with current scheduler filter interface.Score
: A list of scorers to run for each pod, the final score is calculated as a weighted sum. In the follow up PR, I will use this to score pods based on prefix matching, queue depth and kv-cache.Picker
, a single plugin, that picks the final pod. Currently this randomly picks a pod. We will have a "topK" picker that picks pods with top scores.PostSchedule
: A list, runs after a scheduling decision is made (a targetPod is picked). Currently a noop. In the follow up prefix caching PR, I will use this to update the cache lookup table.This is a pure refactor.
This should be an incremental step towards making the scheduler more pluggable and even dynamically configurable.
benchmark
I ran a benchmark to make sure there are no regressions. In the benchmark run, the refactored EPP performed slightly better . But this could just be variances in different benchmark runs. I don't expect changes in performance as this is a pure refactor.
Baseline:


Refactor:

