Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/getting-started/example/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
:maxdepth: 2
nfs_conn.md
dram_conn.md
mooncake_conn.md
disaggregated_prefill/index.md
:::

180 changes: 180 additions & 0 deletions docs/source/getting-started/example/mooncake_conn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
# Mooncake Connector

This document provides a usage example and configuration guide for the **Mooncake Connector**. This connector enables offloading of KV cache from GPU HBM to CPU Mooncake, helping reduce memory pressure and support larger models or batch sizes.

## Performance

| tokens | mooncake-first | mooncake-second | default |
| ------ | ------------------ | ------------------ | ------------------ |
| 2k | 1.9231491860002279 | 0.8265988459810615 | 0.5419427898712457 |
| 4k | 3.9460434830747544 | 1.5273493870627135 | 0.991630249004811 |
| 8k | 7.577957597002387 | 2.7632693520281464 | 2.0716467570047827 |
| 16k | 16.823639799049126 | 5.515289016952738 | 4.742832682048902 |
| 32k | 81.98759594326839 | 14.217441103421152 | 12.310140203218907 |

Use mooncake fig && default:
<p align="center">
<img alt="UCM" src="../../images/mooncake_performance.png" width="40%">
</p>

## Features

The Monncake connector supports the following functionalities:

- `dump`: Offload KV cache blocks from HBM to Mooncake.
- `load`: Load KV cache blocks from Mooncake back to HBM.
- `lookup`: Look up KV blocks stored in Mooncake by block hash.
- `wait`: Ensure that all copy streams between CPU and GPU have completed.

## Configuration

### Start Mooncake Services

1. Follow the [Mooncake official guide](https://github.com/kvcache-ai/Mooncake/blob/v0.3.4/doc/en/build.md) to build Mooncake.

> **[Warning]**: Currently, this connector only supports Mooncake v0.3.4, and the updated version is being adapted.

2. Start Mooncake Store Service

Please change the IP addresses and ports in the following guide according to your env.

```bash
# Unset HTTP proxies
unset http_proxy https_proxy no_proxy HTTP_PROXY HTTPS_PROXY NO_PROXY
# Navigate to the metadata server directory, http server for example.
cd $MOONCAKE_ROOT_DIR/mooncake-transfer-engine/example/http-metadata-server
# Start Metadata Service
go run . --addr=0.0.0.0:23790
# Start Master Service
mooncake_master --port 50001
```
- Replace `$MOONCAKE_ROOT_DIR` with your Mooncake source root path.
- Make sure to unset any HTTP proxies to prevent networking issues.
- Use appropriate port based on your environment.



### Required Parameters

To use the Mooncake connector, you need to configure the `connector_config` dictionary in your model's launch configuration.

- `local_hostname`:
The IP address of the current node used to communicate with the metadata server.
- `metadata_server`:
The metadata server of the mooncake transfer engine.
- `master_server_address`:
The IP address and the port of the master daemon process of MooncakeStore.
- `protocol` *(optional)*:
If not provided, it defaults to **tcp**.
- `device_name` *(optional)*:
The device to be used for data transmission, it is required when “protocol” is set to “rdma”. If multiple NIC devices are used, they can be separated by commas such as “erdma_0,erdma_1”. Please note that there are no spaces between them.
- `global_segment_size`*(optional)*:
The size of each global segment in bytes. `DEFAULT_GLOBAL_SEGMENT_SIZE = 3355443200` **3.125 GiB**
- `local_buffer_size`*(optional)*:
The size of the local buffer in bytes. `DEFAULT_LOCAL_BUFFER_SIZE = 1073741824` **1.0 GiB**


### Example:

```python
kv_connector_extra_config={
"ucm_connector_name": "UcmMooncakeStore",
"ucm_connector_config":{
"local_hostname": "127.0.0.1",
"metadata_server": "http://127.0.0.1:23790/metadata",
"protocol": "tcp",
"device_name": "",
"master_server_address": "127.0.0.1:50001"
}
}
```

## Launching Inference

### Offline Inference

To start **offline inference** with the Mooncake connector,modify the script `examples/offline_inference.py` to include the `kv_connector_extra_config` for Mooncake connector usage:

```python
# In examples/offline_inference.py
ktc = KVTransferConfig(
...
kv_connector_extra_config={
"ucm_connector_name": "UcmMooncakeStore",
"ucm_connector_config":{
"local_hostname": "127.0.0.1",
"metadata_server": "http://127.0.0.1:23790/metadata",
"protocol": "tcp",
"device_name": "",
"master_server_address": "127.0.0.1:50001"
}
}
)
```

Then run the script as follows:

```bash
cd examples/
python offline_inference.py
```

### Online Inference

For **online inference** , vLLM with our connector can also be deployed as a server that implements the OpenAI API protocol.

First, specify the python hash seed by:
```bash
export PYTHONHASHSEED=123456
```

Run the following command to start the vLLM server with the Qwen/Qwen2.5-14B-Instruct model:

```bash
vllm serve /home/models/Qwen2.5-14B-Instruct \
--max-model-len 20000 \
--tensor-parallel-size 2 \
--gpu_memory_utilization 0.87 \
--trust-remote-code \
--port 7800 \
--kv-transfer-config \
'{
"kv_connector": "UnifiedCacheConnectorV1",
"kv_connector_module_path": "unifiedcache.integration.vllm.uc_connector",
"kv_role": "kv_both",
"kv_connector_extra_config": {
"ucm_connector_name": "UcmMooncakeStore",
"ucm_connector_config":{
"local_hostname": "127.0.0.1",
"metadata_server": "http://127.0.0.1:23790/metadata",
"protocol": "tcp",
"device_name": "",
"master_server_address": "127.0.0.1:50001"
}
}
}
}'
```

If you see log as below:

```bash
INFO: Started server process [321290]
INFO: Waiting for application startup.
INFO: Application startup complete.
```

Congratulations, you have successfully started the vLLM server with Mooncake Connector!

After successfully started the vLLM server,You can interact with the API as following:

```bash
curl http://localhost:7800/v1/completions \
-H "Content-Type: application/json" \
-d '{
"model": "/home/models/Qwen2.5-14B-Instruct",
"prompt": "Shanghai is a",
"max_tokens": 7,
"temperature": 0
}'
```
Binary file added docs/source/images/mooncake_performance.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
155 changes: 155 additions & 0 deletions test/test_mooncake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import hashlib
import uuid

import torch

from unifiedcache.logger import init_logger
from unifiedcache.ucm_connector.base import Task
from unifiedcache.ucm_connector.ucm_mooncake import UcmMooncakeStore

logger = init_logger(__name__)

mooncake_dict_config = {
"local_hostname": "127.0.0.1",
"metadata_server": "http://127.0.0.1:23790/metadata",
"protocol": "tcp",
"device_name": "",
"master_server_address": "127.0.0.1:50001",
}


def tensor_hash(tensor: torch.Tensor) -> str:
"""Calculate the hash value of the tensor."""
tensor_bytes = tensor.clone().detach().cpu().numpy().tobytes()
hash_object = hashlib.blake2b(tensor_bytes)
hash_hex = hash_object.hexdigest()
return str(int(hash_hex[:16], 16))


def test_lookup_not_found():
"""Test that lookup returns False for non-existent block IDs."""
store = UcmMooncakeStore(mooncake_dict_config)
block_ids = [uuid.uuid4().hex for _ in range(10)]
masks = store.lookup(block_ids)
assert all(mask is False for mask in masks)


def test_lookup_found():
"""Test that lookup returns True for existing block IDs after dumping data."""
src_block_data = [
torch.randint(0, 1000, (1, 100), dtype=torch.int) for _ in range(5)
]
block_ids = [tensor_hash(data) for data in src_block_data]
offset = [0] * len(block_ids)

store = UcmMooncakeStore(mooncake_dict_config)
task: Task = store.dump(
block_ids=block_ids, offset=offset, src_tensor=src_block_data
)
ret = store.wait(task)
assert ret == 0
masks = store.lookup(block_ids)
assert all(mask is True for mask in masks)


def test_dump_once():
"""Test dumping data once and verifying it exists in the store."""
src_block_data = [
torch.randint(0, 1000, (1, 100), dtype=torch.int) for _ in range(5)
]
block_ids = [tensor_hash(data) for data in src_block_data]
offset = [0] * len(block_ids)

store = UcmMooncakeStore(mooncake_dict_config)
task: Task = store.dump(
block_ids=block_ids, offset=offset, src_tensor=src_block_data
)
ret = store.wait(task)
assert ret == 0
masks = store.lookup(block_ids)
assert all(mask is True for mask in masks)


def test_dump_repeated():
"""Test that repeated dumping of the same data doesn't cause errors."""
src_block_data = [
torch.randint(0, 1000, (1, 100), dtype=torch.int) for _ in range(5)
]
block_ids = [tensor_hash(data) for data in src_block_data]
offset = [0] * len(block_ids)

store = UcmMooncakeStore(mooncake_dict_config)
task: Task = store.dump(
block_ids=block_ids, offset=offset, src_tensor=src_block_data
)
ret = store.wait(task)
assert ret == 0
masks = store.lookup(block_ids)
assert all(mask is True for mask in masks)

task: Task = store.dump(
block_ids=block_ids, offset=offset, src_tensor=src_block_data
)
ret = store.wait(task)
assert ret == 0


def test_load_existing_data():
"""Test loading data that was previously dumped into the store."""
src_block_data = [
torch.randint(0, 1000, (1, 100), dtype=torch.int) for _ in range(5)
]
dst_block_data = [
torch.empty(data.shape, dtype=data.dtype) for data in src_block_data
]
block_ids = [tensor_hash(data) for data in src_block_data]
offset = [0] * len(block_ids)

store = UcmMooncakeStore(mooncake_dict_config)
task: Task = store.dump(
block_ids=block_ids, offset=offset, src_tensor=src_block_data
)
ret = store.wait(task)
assert ret == 0

masks = store.lookup(block_ids)
assert all(mask is True for mask in masks)

task: Task = store.load(
block_ids=block_ids, offset=offset, dst_tensor=dst_block_data
)
ret = store.wait(task)
assert ret == 0
assert all(
[
torch.equal(src_block_data[i], dst_block_data[i]) is True
for i in range(len(src_block_data))
]
)


def test_load_non_existent_data():
"""Test loading data that doesn't exist in the store verifies the destination remains unchanged."""
src_block_data = [
torch.randint(0, 1000, (1, 100), dtype=torch.int) for _ in range(5)
]
dst_block_data = [
torch.empty(data.shape, dtype=data.dtype) for data in src_block_data
]
block_ids = [tensor_hash(data) for data in src_block_data]
offset = [0] * len(block_ids)
store = UcmMooncakeStore(mooncake_dict_config)
masks = store.lookup(block_ids)
assert all(mask is False for mask in masks)

task: Task = store.load(
block_ids=block_ids, offset=offset, dst_tensor=dst_block_data
)
ret = store.wait(task)
assert ret != 0
assert all(
[
torch.equal(src_block_data[i], dst_block_data[i]) is False
for i in range(len(src_block_data))
]
)
3 changes: 3 additions & 0 deletions unifiedcache/ucm_connector/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,6 @@ def create_connector(cls, connector_name: str, config: dict) -> UcmKVStoreBase:
UcmConnectorFactory.register_connector(
"UcmNfsStore", "unifiedcache.ucm_connector.ucm_nfs_store", "UcmNfsStore"
)
UcmConnectorFactory.register_connector(
"UcmMooncakeStore", "unifiedcache.ucm_connector.ucm_mooncake", "UcmMooncakeStore"
)
Loading