-
Notifications
You must be signed in to change notification settings - Fork 536
LMDeploy Distserve #3304
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
LMDeploy Distserve #3304
Conversation
636fb5b
to
94eee2b
Compare
…pus to ray.init for run in dlc
lmdeploy/cli/serve.py
Outdated
@@ -1,5 +1,5 @@ | |||
# Copyright (c) OpenMMLab. All rights reserved. | |||
|
|||
from lmdeploy.disagg.messages import EngineRole, MigrationBackend, MigrationTransportProtocol |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can put this after line 307 to avoid unnecessary importing time
1. [PD Connection more efficiently][High Priority] In DSV3 DP + EP condition, we need to concurrently construct prefill_dp_size (for exampe 32) * decode_dp_size(for example 144) links. We add a function `pd_consolidation_multi_thread` to do this. However, we need to check if the construction operation is thread safe. 2. [Combine with proxy] Maybe we should save conn_config to avoid repeatly reconnection of PD Link. 3. [PD Control Plane][High Priority] For DP + EP, we need to reconstruct DisaggEngineConfig to record more information (e.g. dp_idx, tp_idx ...) 4. [Combine with router][Important] How to perform PD Load Balance in disaggregated LLM Serving. 5. [PD Data Plane] adapt to Open Source KVCache manager like Mooncake, infiniStore or NiXL and more transport media.
lmdeploy/cli/serve.py
Outdated
default='DLSlime', | ||
choices=['DLSlime', 'Mooncake', 'InfiniStore'], | ||
help='kvcache migration management backend when PD disaggregation') | ||
parser.add_argument('--available-nics', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what "nics" refer to?
lmdeploy/cli/serve.py
Outdated
type=str, | ||
choices=['Hybrid', 'DistServe'], | ||
default='Hybrid', | ||
help='the strategy to dispatch requests to nodes') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May clarify the help info. It is the same as --routing-strategy
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The api_server is assigned a specific "role" in this PR. I propose updating the communication protocol between the api_server and the proxy_server to include this role information.
Benefits:
- The proxy server can make decisions based on the api_server's role.
- This change would allow us to remove the
--serving-strategy
option, simplifying the argument list.
Would this approach be feasible? I’d appreciate any feedback or suggestions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. --serving-strategy
can be removed and pd mode can be inferred from engine role
in backend config.
# For Disaggregation | ||
parser.add_argument('--migration-protocol', | ||
type=str, | ||
choices=['TCP', 'RDMA', 'NVLINK'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we support all of them?
lmdeploy/cli/serve.py
Outdated
help='transport protocol of KV migration') | ||
parser.add_argument('--link-type', | ||
type=str, | ||
choices=['Ethernet', 'IB'], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is RoCE
a link type?
lmdeploy/cli/serve.py
Outdated
choices=['Ethernet', 'IB'], | ||
default='Ethernet', | ||
help='RDMA Link Type') | ||
parser.add_argument('--disable-gdr', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is gdr
short for GPU Direct Memory Access
?
If open the option--disable-gdr
, what --link-type
should we choose?
lmdeploy/cli/serve.py
Outdated
@@ -309,7 +347,10 @@ def api_server(args): | |||
quant_policy=args.quant_policy, | |||
eager_mode=args.eager_mode, | |||
max_prefill_token_num=args.max_prefill_token_num, | |||
enable_microbatch=args.enable_microbatch) | |||
enable_microbatch=args.enable_microbatch, | |||
role=EngineRole.__members__[args.role], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use EngineRole[args.role]
?
lmdeploy/disagg/README.md
Outdated
|
||
``` shell | ||
lmdeploy serve proxy | ||
--server-name 10.130.8.139 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'd better not specify a real IP in the user guide.
The default 0.0.0.0
is used when this option is not provided.
Therefore, we can remove this option in this guide
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default server_port
of api_server is 8000. I recommend using the default value here.
A little reminder, 5000 int he API Usage
section should be changed to 8000, too
lmdeploy/disagg/README.md
Outdated
``` | ||
|
||
### Check NVSHMEM configuration: | ||
Make sure to verify NVSHMEM installation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you kindly provide the checking method or related url links?
|
||
@register_migration_backend(MigrationBackend.InfiniStore) | ||
class InfiniStoreBackend(MigrationBackendImpl): | ||
def p2p_initialize(self, init_request: DistServeInitRequest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this backend is not supported, we can remove it from choices in cli arguments
|
||
@register_migration_backend(MigrationBackend.Mooncake) | ||
class MooncakeBackend(MigrationBackendImpl): | ||
def p2p_initialize(self, init_request: DistServeInitRequest): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if this backend is not supported, we can remove it from choices in cli arguments
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@RunningLeon Let's keep it. @JimyMa will work with mooncake team to support it.
@JimyMa Could we update the choices of --migration-backend
in the CLI?
Once Mooncake or Infiniscore is added, we can adjust the choices accordingly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
lmdeploy/disagg/config.py
Outdated
Attributes: | ||
Hybrid: Prefill and Decode workload are co-located in one engine. | ||
DistServe: Prefill and Decode worload are assigned to different |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
worload
-> 'workload`
lmdeploy/disagg/backend/__init__.py
Outdated
|
||
|
||
try: | ||
logger.debug("Registering DLSlime Backend") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use the INFO log when trying to register the kv transfer engine backend.
If an exception occurs, it's better to log a WARNING or an ERROR message
lmdeploy/disagg/backend/backend.py
Outdated
from lmdeploy.disagg.config import MigrationBackend | ||
|
||
|
||
MIGRATION_BACKENDS = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can use the mmengine
registry instead of making a new one, like we did in lmdeploy/model.py
.
self.endpoint[connect_request.protocol].connect_to(connect_request.remote_endpoint_info) | ||
|
||
async def p2p_migrate(self, assignment: MigrationAssignment): | ||
max_batch = 4096 + 2048 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What the two magic numbers represent?
lmdeploy/disagg/utils.py
Outdated
|
||
def find_best_rdma_device(rank): | ||
devices = available_nic() | ||
return devices[rank % len(devices)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may fix lint in here
lmdeploy/disagg/scheduler.py
Outdated
@@ -0,0 +1,2 @@ | |||
class MigrationScheduler: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we remove it since it is not used?
@@ -414,8 +418,9 @@ async def forward_async(self, inputs): | |||
if self.dag is None: | |||
self.dag = self._compile_dag() | |||
inputs = ray.put(inputs) | |||
self.dag.execute(inputs) | |||
await self.dag.get_object_refs_from_last_execute() | |||
# self.dag.execute(inputs) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we remove the commented lines?
@JimyMa May resolve the linting errors as follows:
Note that the python version should not be greater than 3.10 |
@@ -89,6 +91,20 @@ async def forward_async(self, inputs): | |||
async def get_output_async(self): | |||
"""get output async.""" | |||
raise NotImplementedError('Not Implemented') | |||
|
|||
""" PD Disaggregation API Begin """ | |||
def p2p_initialize(self, remote_engine_id: int, remote_engine_config: DistServeEngineConfig): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the arguments of this method are not aligned with that in ray_executor
def p2p_initialize(self, init_request: DistServeInitRequest): |
"""init rdma link.""" | ||
raise NotImplementedError('Not implemented') | ||
|
||
def p2p_connect(self, remote_engine_id: int, remote_endpoint_info: List[str]): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the arguments are not aligend with thatin ray_executor
@@ -520,3 +525,16 @@ def _init_distributed_environment_by_device(self, device_str: str): | |||
ray.get([worker.set_env.remote(envs) for worker in self.workers]) | |||
else: | |||
raise ValueError(f'Unsupported device type: {device_str}') | |||
|
|||
""" PD Disaggregation API Begin """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
may need to implement for tp=1 with uni_executor in lmdeploy/pytorch/engine/executor/uni_executor.py
What the lmdeploy-distserve is included:
State of lmdeploy-distserve:
Next Step
Initialization
The PD Consolidation process outlines the sequential steps for establishing peer-to-peer (P2P) connections between system components. The process begins with the Router , which acts as the central orchestrator. First, the Router initiates the connection setup by sending a p2p_initialize message to both the Prefill Server and the Decode Server . This ensures that all necessary components are prepared for the subsequent connection phase.
Once the initialization phase is complete for both the Prefill Server and the Decode Server , the Router proceeds to establish the actual P2P connections. It sends a p2p_connect message to the Prefill Server to finalize the connection, followed by another p2p_connect message to the Decode Server . This systematic approach ensures that all components are properly initialized before any connections are established, forming a cohesive network during the system's startup phase.
Control Plane
The diagram illustrates the workflow and interactions between various components involved in the system's prefill and decode processes. This process is designed to manage tasks efficiently, ensuring smooth operation and scalability.
Prefill Process:
The Prefill Server initiates the prefill process by sending a Prefill Message to the Prefill Engine .
The Prefill Engine processes the request and generates an
EngineOutput
, which includes details such asFirstToken
and CahceBlockIds
.The Prefill Scheduler receives the output from the Prefill Engine and manages task scheduling. Tasks are placed into a Waiting Queue with a status of
Status.WAITING
.Once ready, the tasks are forwarded to the Forward Executor , which processes them with a status of
Status.RUNNING
. The status will be converted toStatus.ToBeMigrated
and will be free when decode engine migration done.Decode Process:
The Decode Server sends requests to the Decode Engine , which processes the input and generates an
EngineOutput
. This output may include details likeGenToken
. The Decode Scheduler manages the decoded tasks and places them into a Migration Queue with a status ofStatus.WaitingMigration
. The Migration Executor processes these tasks, transitioning their status toStatus.Running
. Completed tasks are then sent back to the Forward Executor for further processing (Prefill Enginecache_free
).Key Features
This structured approach enables seamless coordination between components, facilitating efficient task execution and system control within the Control Plane .
Data Plane
The diagram illustrates the workflow and interactions between key components responsible for managing cache operations, migration, and load balancing. This process is designed to optimize data handling and ensure efficient resource utilization.
Prefill CacheEngine:
The Prefill CacheEngine handles caching operations for prefill tasks. It interacts with the
MigrationBackend.Store
to store cached data, which can be migrated or loaded as needed.Decode CacheEngine:
The Decode CacheEngine manages caching operations for decode tasks. It interacts with the MigrationBackend.Load to retrieve cached data when required.
Optional Store Component:
An optional Store component is included, which can be utilized for additional storage needs.This component may interact with the
MigrationBackend.Store
to manage persistent storage or backup mechanisms.Migration Operations:
Both the Prefill CacheEngine and Decode CacheEngine utilize the
MigrationBackend.Migrate
functionality to migrate cached data as necessary. This ensures that cached data can be efficiently moved between different storage locations or systems, maintaining data consistency and availability.Key Features
This structured approach enables seamless coordination between components, facilitating efficient data handling and system control within the Data Plane .
How to build
pip install dlslime==0.0.1.post2 pip install -v -e .
How to Run
Step 1. Start Prefill Engine
Step 2. Start Decode Engine
Step 3. Start Router