Skip to content

LMDeploy Distserve #3304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 56 commits into
base: main
Choose a base branch
from
Open

LMDeploy Distserve #3304

wants to merge 56 commits into from

Conversation

JimyMa
Copy link

@JimyMa JimyMa commented Mar 22, 2025

What the lmdeploy-distserve is included:

  1. A Simple Router for PD Scheduling
  2. A Transfer Engine for KVCache Migration
  3. A Migration Scheduler for Overlaping Migration and Decode

State of lmdeploy-distserve:

  1. pass the functional validation (in H800 with RoCE Supported).

Next Step

  • substitute the router to LMDeploy Proxy.
  • Support multi Prefill and multi Decode Engines and cross node LLM like deepseek v3.
  • We Learned a lot from https://github.com/bytedance/InfiniStore when build our minimum implementation. @thesues. Besides, we have strong motivation to adapte infinStore because of its pythonic and well python async support.
  • Some micro benchmark.

Initialization

image

The PD Consolidation process outlines the sequential steps for establishing peer-to-peer (P2P) connections between system components. The process begins with the Router , which acts as the central orchestrator. First, the Router initiates the connection setup by sending a p2p_initialize message to both the Prefill Server and the Decode Server . This ensures that all necessary components are prepared for the subsequent connection phase.

Once the initialization phase is complete for both the Prefill Server and the Decode Server , the Router proceeds to establish the actual P2P connections. It sends a p2p_connect message to the Prefill Server to finalize the connection, followed by another p2p_connect message to the Decode Server . This systematic approach ensures that all components are properly initialized before any connections are established, forming a cohesive network during the system's startup phase.

Control Plane

image

The diagram illustrates the workflow and interactions between various components involved in the system's prefill and decode processes. This process is designed to manage tasks efficiently, ensuring smooth operation and scalability.

Prefill Process:

The Prefill Server initiates the prefill process by sending a Prefill Message to the Prefill Engine .
The Prefill Engine processes the request and generates an EngineOutput, which includes details such as FirstToken and Cahce BlockIds.
The Prefill Scheduler receives the output from the Prefill Engine and manages task scheduling. Tasks are placed into a Waiting Queue with a status of Status.WAITING.
Once ready, the tasks are forwarded to the Forward Executor , which processes them with a status of Status.RUNNING. The status will be converted to Status.ToBeMigrated and will be free when decode engine migration done.

Decode Process:

The Decode Server sends requests to the Decode Engine , which processes the input and generates an EngineOutput. This output may include details like GenToken. The Decode Scheduler manages the decoded tasks and places them into a Migration Queue with a status of Status.WaitingMigration. The Migration Executor processes these tasks, transitioning their status to Status.Running. Completed tasks are then sent back to the Forward Executor for further processing (Prefill Engine cache_free).

Key Features

  • Task Management: Both the Prefill Scheduler and Decode Scheduler play crucial roles in managing task queues and ensuring efficient execution.
  • Executor Roles: The Forward Executor and Migration Executor handle the execution of tasks in different stages of the pipeline.
  • Scalability and Monitoring (Future work): The inclusion of load balancing, offline profiling, online monitoring, and scaling mechanisms ensures the system remains robust and adaptable under varying workloads.

This structured approach enables seamless coordination between components, facilitating efficient task execution and system control within the Control Plane .

Data Plane

image

The diagram illustrates the workflow and interactions between key components responsible for managing cache operations, migration, and load balancing. This process is designed to optimize data handling and ensure efficient resource utilization.

Prefill CacheEngine:

The Prefill CacheEngine handles caching operations for prefill tasks. It interacts with the MigrationBackend.Store to store cached data, which can be migrated or loaded as needed.

Decode CacheEngine:

The Decode CacheEngine manages caching operations for decode tasks. It interacts with the MigrationBackend.Load to retrieve cached data when required.

Optional Store Component:

An optional Store component is included, which can be utilized for additional storage needs.This component may interact with the MigrationBackend.Store to manage persistent storage or backup mechanisms.

Migration Operations:

Both the Prefill CacheEngine and Decode CacheEngine utilize the MigrationBackend.Migrate functionality to migrate cached data as necessary. This ensures that cached data can be efficiently moved between different storage locations or systems, maintaining data consistency and availability.

Key Features

  • Cache Management: The Prefill CacheEngine and Decode CacheEngine are responsible for managing cached data during prefill and decode operations, respectively.
  • Migration and Load Balancing: The MigrationBackend facilitates data migration and loading, ensuring that cached data is appropriately managed and accessible.
  • Scalability and Flexibility: The inclusion of an optional Store component allows for additional storage flexibility, while the Router (Load Balance) ensures efficient distribution of workloads.

This structured approach enables seamless coordination between components, facilitating efficient data handling and system control within the Data Plane .

How to build

pip install dlslime==0.0.1.post2
pip install -v -e .

How to Run

  • in Server Side
    Step 1. Start Prefill Engine
lmdeploy serve api_server Qwen/Qwen2.5-72B --server-port 23333 --role Prefill --tp 8 --cache-block-seq-len 256

Step 2. Start Decode Engine

lmdeploy serve api_server Qwen/Qwen2.5-72B --server-port 23333 --role Prefill --tp 8 --cache-block-seq-len 256

Step 3. Start Router

python -m lmdeploy.disagg.router --host 0.0.0.0 --port 5000 --prefill-endpoint http://10.130.8.139:23333 --decode-endpoint http://10.130.8.139:23334
  • in Client Side
curl -X POST "http://localhost:5000/v1/completions" \
-H "Content-Type: application/json" \
-d '{"model": "Qwen/Qwen2.5-72B", "prompt": "Shanghai is a city that", "max_tokens": 32, "stream": true}'

@lvhan028 lvhan028 added the enhancement New feature or request label Mar 24, 2025
@JimyMa JimyMa force-pushed the distserve branch 2 times, most recently from 636fb5b to 94eee2b Compare April 1, 2025 03:27
@@ -1,5 +1,5 @@
# Copyright (c) OpenMMLab. All rights reserved.

from lmdeploy.disagg.messages import EngineRole, MigrationBackend, MigrationTransportProtocol
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can put this after line 307 to avoid unnecessary importing time

JimyMa and others added 2 commits April 14, 2025 07:31
1. [PD Connection more efficiently][High Priority] In DSV3 DP + EP condition, we need to concurrently construct prefill_dp_size (for exampe 32) * decode_dp_size(for example 144) links. We add a function `pd_consolidation_multi_thread` to do this. However, we need to check if the construction operation is thread safe.
2. [Combine with proxy] Maybe we should save conn_config to avoid repeatly reconnection of PD Link.
3. [PD Control Plane][High Priority]  For DP + EP, we need to reconstruct DisaggEngineConfig to record more information (e.g. dp_idx, tp_idx ...)
4. [Combine with router][Important] How to perform PD Load Balance in disaggregated LLM Serving.
5. [PD Data Plane] adapt to Open Source KVCache manager like Mooncake, infiniStore or NiXL and more transport media.
default='DLSlime',
choices=['DLSlime', 'Mooncake', 'InfiniStore'],
help='kvcache migration management backend when PD disaggregation')
parser.add_argument('--available-nics',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what "nics" refer to?

type=str,
choices=['Hybrid', 'DistServe'],
default='Hybrid',
help='the strategy to dispatch requests to nodes')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May clarify the help info. It is the same as --routing-strategy.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The api_server is assigned a specific "role" in this PR. I propose updating the communication protocol between the api_server and the proxy_server to include this role information.

Benefits:

  • The proxy server can make decisions based on the api_server's role.
  • This change would allow us to remove the --serving-strategy option, simplifying the argument list.

Would this approach be feasible? I’d appreciate any feedback or suggestions.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. --serving-strategy can be removed and pd mode can be inferred from engine role in backend config.

# For Disaggregation
parser.add_argument('--migration-protocol',
type=str,
choices=['TCP', 'RDMA', 'NVLINK'],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support all of them?

help='transport protocol of KV migration')
parser.add_argument('--link-type',
type=str,
choices=['Ethernet', 'IB'],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is RoCE a link type?

choices=['Ethernet', 'IB'],
default='Ethernet',
help='RDMA Link Type')
parser.add_argument('--disable-gdr',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is gdr short for GPU Direct Memory Access?
If open the option--disable-gdr, what --link-type should we choose?

@@ -309,7 +347,10 @@ def api_server(args):
quant_policy=args.quant_policy,
eager_mode=args.eager_mode,
max_prefill_token_num=args.max_prefill_token_num,
enable_microbatch=args.enable_microbatch)
enable_microbatch=args.enable_microbatch,
role=EngineRole.__members__[args.role],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use EngineRole[args.role]?


``` shell
lmdeploy serve proxy
--server-name 10.130.8.139
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better not specify a real IP in the user guide.
The default 0.0.0.0 is used when this option is not provided.
Therefore, we can remove this option in this guide

Copy link
Collaborator

@lvhan028 lvhan028 Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default server_port of api_server is 8000. I recommend using the default value here.
A little reminder, 5000 int he API Usage section should be changed to 8000, too

```

### Check NVSHMEM configuration:
Make sure to verify NVSHMEM installation.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you kindly provide the checking method or related url links?


@register_migration_backend(MigrationBackend.InfiniStore)
class InfiniStoreBackend(MigrationBackendImpl):
def p2p_initialize(self, init_request: DistServeInitRequest):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this backend is not supported, we can remove it from choices in cli arguments


@register_migration_backend(MigrationBackend.Mooncake)
class MooncakeBackend(MigrationBackendImpl):
def p2p_initialize(self, init_request: DistServeInitRequest):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this backend is not supported, we can remove it from choices in cli arguments

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RunningLeon Let's keep it. @JimyMa will work with mooncake team to support it.
@JimyMa Could we update the choices of --migration-backend in the CLI?
Once Mooncake or Infiniscore is added, we can adjust the choices accordingly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

Attributes:
Hybrid: Prefill and Decode workload are co-located in one engine.
DistServe: Prefill and Decode worload are assigned to different
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worload -> 'workload`



try:
logger.debug("Registering DLSlime Backend")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use the INFO log when trying to register the kv transfer engine backend.
If an exception occurs, it's better to log a WARNING or an ERROR message

from lmdeploy.disagg.config import MigrationBackend


MIGRATION_BACKENDS = {}
Copy link
Collaborator

@lvhan028 lvhan028 Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can use the mmengine registry instead of making a new one, like we did in lmdeploy/model.py.

self.endpoint[connect_request.protocol].connect_to(connect_request.remote_endpoint_info)

async def p2p_migrate(self, assignment: MigrationAssignment):
max_batch = 4096 + 2048
Copy link
Collaborator

@lvhan028 lvhan028 Apr 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What the two magic numbers represent?


def find_best_rdma_device(rank):
devices = available_nic()
return devices[rank % len(devices)]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may fix lint in here

@@ -0,0 +1,2 @@
class MigrationScheduler:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we remove it since it is not used?

@@ -414,8 +418,9 @@ async def forward_async(self, inputs):
if self.dag is None:
self.dag = self._compile_dag()
inputs = ray.put(inputs)
self.dag.execute(inputs)
await self.dag.get_object_refs_from_last_execute()
# self.dag.execute(inputs)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we remove the commented lines?

@lvhan028
Copy link
Collaborator

@JimyMa May resolve the linting errors as follows:

pip install pre-commit==3.8.0
cd <the/path/of/lmdeploy/repo>
pre-commit install
pre-commit run --all-files

Note that the python version should not be greater than 3.10

@@ -89,6 +91,20 @@ async def forward_async(self, inputs):
async def get_output_async(self):
"""get output async."""
raise NotImplementedError('Not Implemented')

""" PD Disaggregation API Begin """
def p2p_initialize(self, remote_engine_id: int, remote_engine_config: DistServeEngineConfig):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the arguments of this method are not aligned with that in ray_executor

def p2p_initialize(self, init_request: DistServeInitRequest):

"""init rdma link."""
raise NotImplementedError('Not implemented')

def p2p_connect(self, remote_engine_id: int, remote_endpoint_info: List[str]):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the arguments are not aligend with thatin ray_executor

@@ -520,3 +525,16 @@ def _init_distributed_environment_by_device(self, device_str: str):
ray.get([worker.set_env.remote(envs) for worker in self.workers])
else:
raise ValueError(f'Unsupported device type: {device_str}')

""" PD Disaggregation API Begin """
Copy link
Collaborator

@RunningLeon RunningLeon Apr 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may need to implement for tp=1 with uni_executor in lmdeploy/pytorch/engine/executor/uni_executor.py

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants