Skip to content

LMDeploy Distserve #3304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 59 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
97d6d5d
sync main
JimyMa Apr 1, 2025
3241c1a
typo correct
JimyMa Apr 2, 2025
1788a28
1. typo 2. add migration event
JimyMa Apr 2, 2025
03b363f
1. move slime to 'https://github.com/JimyMa/DLSlime.git' and init rea…
JimyMa Apr 3, 2025
aabb72b
Update disagg README
JimyMa Apr 3, 2025
3ba605f
mute slime when disable distserve
JimyMa Apr 3, 2025
2e6ee7a
remove build_migration.sh
JimyMa Apr 3, 2025
cdf55c1
revert debug code
JimyMa Apr 3, 2025
ace6ece
1. identify interface. 2. add multi backend registry
JimyMa Apr 6, 2025
481052e
add dlslime max transfer batch
JimyMa Apr 6, 2025
f9b7409
add an infinistore interface
JimyMa Apr 6, 2025
60032b6
add load/store
JimyMa Apr 7, 2025
aa43faa
conditional register of Multi Migration Backend
JimyMa Apr 8, 2025
97e4430
merge router to proxy
JimyMa Apr 11, 2025
1e6c4da
remove redandunt print
JimyMa Apr 11, 2025
290e606
Merge branch 'main' of github.com:JimyMa/lmdeploy into distserve-update
JimyMa Apr 11, 2025
b530384
1. remove redandunt print 2. revert safe_run
JimyMa Apr 11, 2025
efcb72c
dsv3 kvtransfer support (bypass v cache)
JimyMa Apr 12, 2025
a3d973b
dsv3 debug, 1. change log info to log debug of log resp. 2. add num_c…
JimyMa Apr 12, 2025
31fd9f3
DSV3 Debug, known issue:
JimyMa Apr 14, 2025
48d791a
revert match to if,else
JimyMa Apr 14, 2025
2f02e05
[bugfix] rename typo
JimyMa Apr 14, 2025
ae959a0
[refactor] refactor pd_conn
JimyMa Apr 14, 2025
11d9961
1. format code. 2. add engine_role for passing ut test
JimyMa Apr 14, 2025
18da0fb
1. format code 2. parse dp, ep, and dp rank to DisaggEngineConfig
JimyMa Apr 14, 2025
a478c77
1. add pd conn timeout, 2. add default EngineRole to Hybrid, 3. fix d…
JimyMa Apr 15, 2025
c490de4
1. refactor PDConnection Pool
JimyMa Apr 17, 2025
df3f9ef
refactor debug
JimyMa Apr 18, 2025
61ad2a7
fix migration loop bug
JimyMa Apr 18, 2025
ad27c3a
add proxy arguments about distserve
JimyMa Apr 18, 2025
1c3b20c
bugfix
JimyMa Apr 18, 2025
119059f
debug interface
JimyMa Apr 18, 2025
1f220d4
remove unnesessary EngineRole Check.
JimyMa Apr 18, 2025
0a58979
add v1/chat/completions support
JimyMa Apr 18, 2025
83838d8
remove redundent print
JimyMa Apr 18, 2025
b108752
async free cache
JimyMa Apr 18, 2025
74d9256
async free cache
JimyMa Apr 18, 2025
39b2c4f
Merge branch 'main' of github.com:JimyMa/lmdeploy into distserve-micr…
JimyMa Apr 19, 2025
65ba59f
1. add some comments.
JimyMa Apr 19, 2025
3af751b
1. bugfix
JimyMa Apr 21, 2025
6028ec2
[proxy] add connection_warmup api
JimyMa Apr 21, 2025
3047e7b
1. bugfix (warmup_connection_typo and wrong args) 2. preserve cache b…
JimyMa Apr 21, 2025
649b51e
[disagg] update readme, 1. fault tolerance and 2. replace router to p…
JimyMa Apr 21, 2025
531524a
bugfix
JimyMa Apr 21, 2025
ce660ca
fix decode back pressure bug
JimyMa Apr 21, 2025
957bd68
1. add migration_request to chat/completions for correctly cache free
JimyMa Apr 21, 2025
f6de868
2. free cache bugfix
JimyMa Apr 22, 2025
7437bfa
1. fix lock running bug
JimyMa Apr 22, 2025
b0a8f1f
1. fix dist.broadcast deadlock
JimyMa Apr 23, 2025
a7bb7c4
[lint] 1. fix lint
JimyMa Apr 24, 2025
d488d87
rename Ethernet to RoCE
JimyMa Apr 24, 2025
b626d9e
change emun.Enum.__members__[elem] to enum.Enum[elem] directly
JimyMa Apr 24, 2025
2d6f8c1
update readme
JimyMa Apr 24, 2025
fec61ba
update migration-backend
JimyMa Apr 24, 2025
2637091
1. update readme 2. move module to string for conditional import
JimyMa Apr 24, 2025
3dedc69
1. update readme
JimyMa Apr 24, 2025
c09a06b
1. remove migic number and handle long assignments in dlslime. 2. add…
JimyMa Apr 25, 2025
160cb3c
fix error migration in dummy situation
JimyMa Apr 25, 2025
e97a486
1. bugfix when token is not a decodable utf-8 (in test)
JimyMa Apr 25, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benchmark/profile_generation.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ async def _gather_tasks(tasks):

out_token_throughput = np.round(token_latency_stats.size / elapsed_time, 2)
total_token_throughput = np.round(concurrency * test_round * (input_seqlen + output_seqlen) / elapsed_time, 2)
print(f'\n{"-" * 50}\ntotal time: {elapsed_time:.2f}s\n'
print(f'\n{" - " * 50}\ntotal time: {elapsed_time:.2f}s\n'
f'concurrency: {concurrency}, test_round: {test_round}\n'
f'input_tokens: {input_seqlen}, output_tokens: {output_seqlen}\n'
f'first_token latency(min, max, ave): '
Expand All @@ -188,7 +188,7 @@ async def _gather_tasks(tasks):
f'{token_latency_ave}s\n'
f'token_latency percentiles(50%,75%,95%,99%)(s): {percentiles}\n'
f'throughput(output): {out_token_throughput} token/s\n'
f'throughput(total): {total_token_throughput} token/s\n{"-" * 50}')
f'throughput(total): {total_token_throughput} token/s\n{" - " * 50}')
return model_path, \
[first_token_latency_min, first_token_latency_max,
first_token_latency_ave], \
Expand Down
38 changes: 35 additions & 3 deletions lmdeploy/cli/serve.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Copyright (c) OpenMMLab. All rights reserved.

from lmdeploy.disagg.config import EngineRole, MigrationBackend
from lmdeploy.utils import get_max_batch_size

from .cli import CLI
Expand Down Expand Up @@ -125,6 +125,20 @@ def add_parser_api_server():
'engine’s tasks once the maximum number of concurrent requests is '
'reached, regardless of any additional requests sent by clients '
'concurrently during that time. Default to None.')
parser.add_argument('--role',
type=str,
default='Hybrid',
choices=['Hybrid', 'Prefill', 'Decode'],
help='Hybrid for Non-Disaggregated Engine;'
'Prefill for Disaggregated Prefill Engine;'
'Decode for Disaggregated Decode Engine;')
parser.add_argument('--migration-backend',
type=str,
default='DLSlime',
choices=['DLSlime'],
help='kvcache migration management backend when PD disaggregation, '
'Mooncake and InfiniStore will be supported in the future')
parser.add_argument('--available-nics', type=str, nargs='+', default=None, help='available-nics')
# common args
ArgumentHelper.backend(parser)
ArgumentHelper.log_level(parser)
Expand Down Expand Up @@ -216,7 +230,13 @@ def add_parser_proxy():
parser.set_defaults(run=SubCliServe.proxy)
parser.add_argument('--server-name', type=str, default='0.0.0.0', help='Host ip for proxy serving')
parser.add_argument('--server-port', type=int, default=8000, help='Server port of the proxy')
parser.add_argument('--strategy',
parser.add_argument('--serving-strategy',
type=str,
choices=['Hybrid', 'DistServe'],
default='Hybrid',
help='the strategy to serve, Hybrid for colocating Prefill and Decode'
'workloads into same engine, DistServe for Prefill-Decode Disaggregation')
parser.add_argument('--routing-strategy',
type=str,
choices=['random', 'min_expected_latency', 'min_observed_latency'],
default='min_expected_latency',
Expand All @@ -226,6 +246,15 @@ def add_parser_proxy():
help='Whether to disable cache status of the '
'proxy. If set, the proxy will forget the status '
'of the previous time')
# For Disaggregation
parser.add_argument('--migration-protocol',
type=str,
choices=['TCP', 'RDMA', 'NVLINK'],
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we support all of them?

default='RDMA',
help='transport protocol of KV migration')
parser.add_argument('--link-type', type=str, choices=['RoCE', 'IB'], default='RoCE', help='RDMA Link Type')
parser.add_argument('--disable-gdr', action='store_true', help='with GPU Direct Memory Access')

ArgumentHelper.api_keys(parser)
ArgumentHelper.ssl(parser)
ArgumentHelper.log_level(parser)
Expand Down Expand Up @@ -309,7 +338,10 @@ def api_server(args):
quant_policy=args.quant_policy,
eager_mode=args.eager_mode,
max_prefill_token_num=args.max_prefill_token_num,
enable_microbatch=args.enable_microbatch)
enable_microbatch=args.enable_microbatch,
role=EngineRole[args.role],
migration_backend=MigrationBackend[args.migration_backend],
available_nics=args.available_nics)
else:
from lmdeploy.messages import TurbomindEngineConfig
backend_config = TurbomindEngineConfig(dtype=args.dtype,
Expand Down
98 changes: 98 additions & 0 deletions lmdeploy/disagg/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
# LMDeploy-DistServe

## Key Components

1. ​**Router Service**: Coordinates between prefill/decode engines
2. ​**Migration Manager**: Facilitates high-performance memory sharing

## Installation

```
# Inference Engine
pip install lmdeploy[all] >= 0.7.0

# Transfer Engine

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious. Is it a private Python package for transferring? I couldn't find any information about this project. Thanks.

pip install dlslime==0.0.1.post3
```

## Quick Start

### 1. Configure Endpoints

First deploy your prefill and decode engines.

```shell
# Prefill Engine
CUDA_VISIBLE_DEVICES=0,1 lmdeploy serve api_server internlm/internlm2_5-7b-chat --server-port 23333 --role Prefill --tp 2
# Decode Engine
CUDA_VISIBLE_DEVICES=2,3 lmdeploy serve api_server internlm/internlm2_5-7b-chat --server-port 23334 --role Decode --tp 2
```

### 2. Launch Router Service

```shell
lmdeploy serve proxy
--server-name 0.0.0.0
--server-port 8000
--routing-strategy "min_expected_latency"
--serving-strategy DistServe
--log-level INFO
```

## API Usage

```shell
# API Invoke
curl -X POST "http://localhost:8000/v1/completions" \
-H "Content-Type: application/json" \
-d '{"model": "internlm/internlm2_5-7b-chat", "temperature":0, "prompt": "Shanghai is a city that ", "max_tokens": 16, "stream": false}'
# Output
{
"id":"2",
"object":"text_completion",
"created":1743662400,"
model":"internlm/internlm2_5-7b-chat",
"choices":[
{
"index":0,
"text":" is very famous for its skyscrapers. It is also a city","logprobs":null,"finish_reason":"length"
}
],
"usage": {
"prompt_tokens":7,"total_tokens":23,"completion_tokens":16
}
}
```

## Trouble Shooting

### RDMA Connection Failed:

Make sure ibverbs is correctly installed:

```
# on Ubuntu
sudo apt install libibverbs-dev
# on CentOS
sudo yum install ibverbs-devel
```

```bash
ibstatus # Verify IB device status
ibv_devinfo # Check device capabilities
```

### Check GPU Direct RDMA:

By now, lmdeploy-distserve use GPUDirect RDMA to perform KVTransfer. Make sure GPUDirect RDMA Driver is loaded to kernel.

```bash
lsmod | grep nv_peer_mem
# GPUDirect RDMA info will be printed If GPUDirect RDMA is correctly loaded.
```

### ConnectionPool Issue​​

Currently, if the ​​Proxy disconnects​​, the connection pool must be ​​warmed up again​​. A future enhancement could involve:

A ​​dedicated connection pool management server​​ (e.g., using ​​Raft-based tools like ETCD​​, as mentioned in ​​Mooncake​​) to improve ​​connection discovery​​ and avoid repeated warmups.
1 change: 1 addition & 0 deletions lmdeploy/disagg/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright (c) OpenMMLab. All rights reserved.
24 changes: 24 additions & 0 deletions lmdeploy/disagg/backend/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Copyright (c) OpenMMLab. All rights reserved.
from lmdeploy.logger import get_logger

logger = get_logger('lmdeploy')

try:
logger.debug('Registering DLSlime Backend')
from .dlslime import DLSlimeBackend
except ImportError:
logger.warning('Disable DLSlime Backend')

try:
logger.debug('Registering Mooncake Backend')
from .mooncake import MooncakeBackend
except ImportError:
logger.warning('Disable Mooncake Backend')

try:
logger.debug('Registering InfiniStoreBackend Backend')
from .infinistore import InfiniStoreBackend
except ImportError:
logger.warning('Disable InfiniStoreBackend Backend')

__all__ = ['DLSlimeBackend', 'MooncakeBackend', 'InfiniStoreBackend']
4 changes: 4 additions & 0 deletions lmdeploy/disagg/backend/backend.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Copyright (c) OpenMMLab. All rights reserved.
from mmengine.registry import Registry

MIGRATION_BACKENDS = Registry('migration_backend', locations=['lmdeploy.disagg.backend.backend'])
37 changes: 37 additions & 0 deletions lmdeploy/disagg/backend/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright (c) OpenMMLab. All rights reserved.
from abc import abstractmethod

from lmdeploy.disagg.config import MigrationProtocol
from lmdeploy.disagg.messages import DistServeRegisterMRMessage, MigrationAssignment
from lmdeploy.disagg.request import DistServeConnectionRequest, DistServeInitRequest


class MigrationBackendImpl:

@abstractmethod
def p2p_initialize(self, init_request: DistServeInitRequest):
raise NotImplementedError

@abstractmethod
def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage):
raise NotImplementedError

@abstractmethod
def endpoint_info(self, remote_engine_id: int, protocol: MigrationProtocol):
return NotImplementedError

@abstractmethod
def p2p_connect(self, conn_req: DistServeConnectionRequest):
raise NotImplementedError

@abstractmethod
async def p2p_migrate(self, assignment: MigrationAssignment):
raise NotImplementedError

@abstractmethod
async def store(self, assignment: MigrationAssignment):
raise NotImplementedError

@abstractmethod
async def load(self, assignment: MigrationAssignment):
raise NotImplementedError
77 changes: 77 additions & 0 deletions lmdeploy/disagg/backend/dlslime.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# Copyright (c) OpenMMLab. All rights reserved.
from typing import Dict

from dlslime import RDMAEndpoint, available_nic

from lmdeploy.disagg.backend.backend import MIGRATION_BACKENDS
from lmdeploy.disagg.backend.base import MigrationBackendImpl
from lmdeploy.disagg.config import DistServeEngineConfig, MigrationBackend, MigrationProtocol
from lmdeploy.disagg.messages import DistServeRegisterMRMessage, MigrationAssignment
from lmdeploy.disagg.request import DistServeConnectionRequest, DistServeInitRequest
from lmdeploy.logger import get_logger

logger = get_logger('lmdeploy')


class DLSlimeMigrationManagement:

def __init__(self, init_request: DistServeInitRequest):
self.rank = init_request.rank
self.local_engine_config: DistServeEngineConfig = init_request.local_engine_config
self.remote_engine_config: DistServeEngineConfig = init_request.remote_engine_config
self.endpoint: Dict[MigrationProtocol, RDMAEndpoint] = {
MigrationProtocol.TCP: None,
MigrationProtocol.RDMA: None,
MigrationProtocol.NVLINK: None,
}
if init_request.rdma_config:
nics = self.local_engine_config.available_nics or available_nic()
device_name = nics[self.rank % len(nics)]
logger.info(f'use device {device_name} for kv migration')
self.endpoint[MigrationProtocol.RDMA] = RDMAEndpoint(device_name=device_name,
ib_port=1,
link_type=init_request.rdma_config.link_type.name)
if init_request.nvlink_init_request:
raise NotImplementedError
if init_request.tcp_init_request:
raise NotImplementedError

def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage):
self.endpoint[register_mr_request.protocol].register_memory_region(register_mr_request.mr_key,
register_mr_request.addr,
register_mr_request.length)

def connect_to(self, connect_request: DistServeConnectionRequest):
self.endpoint[connect_request.protocol].connect_to(connect_request.remote_endpoint_info)

async def p2p_migrate(self, assignment: MigrationAssignment):
await self.endpoint[assignment.protocol].read_batch_async(assignment.mr_key, assignment.target_offset,
assignment.source_offset, assignment.length)


@MIGRATION_BACKENDS.register_module(MigrationBackend.DLSlime.name)
class DLSlimeBackend(MigrationBackendImpl):

def __init__(self):
self.links: Dict[int, DLSlimeMigrationManagement] = {}

def p2p_initialize(self, init_request: DistServeInitRequest):
self.links[init_request.remote_engine_id] = DLSlimeMigrationManagement(init_request)

def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage):
self.links[register_mr_request.remote_engine_id].register_memory_region(register_mr_request)

def endpoint_info(self, remote_engine_id: int, protocol: MigrationProtocol):
return self.links[remote_engine_id].endpoint[protocol].local_endpoint_info

def p2p_connect(self, conn_req: DistServeConnectionRequest):
self.links[conn_req.remote_engine_id].connect_to(conn_req)

async def p2p_migrate(self, assignment: MigrationAssignment):
await self.links[assignment.remote_engine_id].p2p_migrate(assignment)

async def store(self, assignment: MigrationAssignment):
raise NotImplementedError

async def load(self, assignment: MigrationAssignment):
raise NotImplementedError
31 changes: 31 additions & 0 deletions lmdeploy/disagg/backend/infinistore.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) OpenMMLab. All rights reserved.
from lmdeploy.disagg.backend.backend import MIGRATION_BACKENDS
from lmdeploy.disagg.backend.base import MigrationBackendImpl
from lmdeploy.disagg.config import MigrationBackend, MigrationProtocol
from lmdeploy.disagg.messages import DistServeRegisterMRMessage, MigrationAssignment
from lmdeploy.disagg.request import DistServeConnectionRequest, DistServeInitRequest


@MIGRATION_BACKENDS.register_module(MigrationBackend.InfiniStore.name)
class InfiniStoreBackend(MigrationBackendImpl):

def p2p_initialize(self, init_request: DistServeInitRequest):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this backend is not supported, we can remove it from choices in cli arguments

raise NotImplementedError

def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage):
raise NotImplementedError

def endpoint_info(self, remote_engine_id: int, protocol: MigrationProtocol):
return NotImplementedError

def p2p_connect(self, conn_req: DistServeConnectionRequest):
raise NotImplementedError

async def p2p_migrate(self, assignment: MigrationAssignment):
raise NotImplementedError

async def store(self, assignment: MigrationAssignment):
raise NotImplementedError

async def load(self, assignment: MigrationAssignment):
raise NotImplementedError
31 changes: 31 additions & 0 deletions lmdeploy/disagg/backend/mooncake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) OpenMMLab. All rights reserved.
from lmdeploy.disagg.backend.backend import MIGRATION_BACKENDS
from lmdeploy.disagg.backend.base import MigrationBackendImpl
from lmdeploy.disagg.config import MigrationBackend, MigrationProtocol
from lmdeploy.disagg.messages import DistServeRegisterMRMessage, MigrationAssignment
from lmdeploy.disagg.request import DistServeConnectionRequest, DistServeInitRequest


@MIGRATION_BACKENDS.register_module(MigrationBackend.Mooncake.name)
class MooncakeBackend(MigrationBackendImpl):

def p2p_initialize(self, init_request: DistServeInitRequest):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if this backend is not supported, we can remove it from choices in cli arguments

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@RunningLeon Let's keep it. @JimyMa will work with mooncake team to support it.
@JimyMa Could we update the choices of --migration-backend in the CLI?
Once Mooncake or Infiniscore is added, we can adjust the choices accordingly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

raise NotImplementedError

def register_memory_region(self, register_mr_request: DistServeRegisterMRMessage):
raise NotImplementedError

def endpoint_info(self, remote_engine_id: int, protocol: MigrationProtocol):
return NotImplementedError

def p2p_connect(self, connect_request: DistServeConnectionRequest):
raise NotImplementedError

async def p2p_migrate(self, assignment: MigrationAssignment):
raise NotImplementedError

async def store(self, assignment: MigrationAssignment):
raise NotImplementedError

async def load(self, assignment: MigrationAssignment):
raise NotImplementedError
Loading