Skip to content

Commit 3dd93f8

Browse files
committed
Add nfsstore bandwidth testing script
1 parent 603dc9f commit 3dd93f8

File tree

2 files changed

+216
-185
lines changed

2 files changed

+216
-185
lines changed
Lines changed: 96 additions & 185 deletions
Original file line numberDiff line numberDiff line change
@@ -1,47 +1,26 @@
11
# -*- coding: utf-8 -*-
2-
#
3-
# MIT License
4-
#
5-
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
6-
#
7-
# Permission is hereby granted, free of charge, to any person obtaining a copy
8-
# of this software and associated documentation files (the "Software"), to deal
9-
# in the Software without restriction, including without limitation the rights
10-
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11-
# copies of the Software, and to permit persons to whom the Software is
12-
# furnished to do so, subject to the following conditions:
13-
#
14-
# The above copyright notice and this permission notice shall be included in all
15-
# copies or substantial portions of the Software.
16-
#
17-
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18-
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19-
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20-
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21-
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22-
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23-
# SOFTWARE.
24-
#
252
import csv
263
import os
274
import secrets
285
import time
29-
from typing import Dict, List
6+
from typing import Dict, List, Tuple
307

318
import torch
329

3310
from ucm.store.nfsstore.nfsstore_connector import UcmNfsStore
3411
from ucm.store.ucmstore import UcmKVStoreBase
3512

3613

37-
def setup_store(storage_backends, block_size, device_id, io_size) -> UcmKVStoreBase:
14+
def setup(
15+
storage_backends, block_size, device_id, io_size, transferStreamNumber
16+
) -> UcmKVStoreBase:
3817
config = {
3918
"storage_backends": storage_backends,
4019
"kv_block_size": block_size,
4120
"role": "worker",
4221
"device": device_id,
4322
"io_size": io_size,
44-
"stream_number": 128,
23+
"transferStreamNumber": transferStreamNumber,
4524
}
4625
return UcmNfsStore(config)
4726

@@ -150,162 +129,94 @@ def fetch(
150129
return total_size, elapsed_time, throughput_gbps
151130

152131

153-
def main():
154-
storage_backends = "."
155-
device_id = 1
156-
mla = False
157-
repeat = 3
158-
block_elem_size = 2
159-
num_tokens_list = [2048, 4096, 8192, 16384, 32768]
160-
161-
if mla:
162-
block_lens = [64, 128]
163-
block_layer = 61
164-
head_size = 576
165-
kv = 1
166-
model_name = "deepseek-v3"
167-
num_head_list = [1]
168-
else:
169-
block_lens = [128, 256]
170-
block_layer = 64
171-
head_size = 128
172-
kv = 2
173-
model_name = "QwQ-32B"
174-
num_head_list = [1, 2, 4]
175-
176-
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
177-
csv_file = os.path.join(SCRIPT_DIR, "embed_fetch_result.csv")
178-
need_header = not os.path.exists(csv_file)
179-
180-
with open(csv_file, "a", newline="", encoding="utf-8") as csv_fp:
181-
writer = csv.writer(csv_fp)
182-
183-
if need_header:
184-
writer.writerow(
185-
[
186-
"Model",
187-
"Sequence Length",
188-
"Batch Size",
189-
"Layers",
190-
"Element Size",
191-
"KV",
192-
"Num Head",
193-
"Block Size",
194-
"IO Count",
195-
"IO Size(B)",
196-
"Total Size(GB)",
197-
"Write Avg Time(s)",
198-
"Write Avg Bandwidth(GB/s)",
199-
"Read Avg Time(s)",
200-
"Read Avg Bandwidth(GB/s)",
201-
]
202-
)
203-
204-
for num_head in num_head_list:
205-
for block_len in block_lens:
206-
block_dim = head_size * num_head
207-
io_size = block_dim * block_len * block_elem_size
208-
block_size = io_size * block_layer
209-
real_blocks = max(20, 1200 // num_head)
210-
211-
for num_tokens in num_tokens_list:
212-
sep = "=" * 60
213-
print(
214-
f"\n{sep}\n= num_head={num_head} | num_tokens={num_tokens:>6} | Repeat {repeat} times =\n{sep}\n"
215-
)
216-
217-
batch_size = int(num_tokens / block_len)
218-
io_num = int(num_tokens / block_len * block_layer)
219-
220-
w_bw_list, r_bw_list = [], []
221-
w_time_list, r_time_list = [], []
222-
w_size_sum, r_size_sum = 0.0, 0.0
223-
224-
for r in range(repeat):
225-
print(f"\n--- Round {r+1} ---")
226-
store = setup_store(
227-
storage_backends, block_size, device_id, io_size
228-
)
229-
230-
hashes, kvcaches = make_buffers(
231-
real_blocks,
232-
device_id,
233-
batch_size,
234-
head_size,
235-
block_len,
236-
block_layer,
237-
num_head,
238-
)
239-
240-
results = store.create(hashes[:batch_size])
241-
assert sum(results) == 0, "Create operation failed"
242-
243-
w_size, w_time, w_bw = embed(
244-
store,
245-
hashes[:batch_size],
246-
kvcaches,
247-
num_tokens,
248-
block_len,
249-
block_layer,
250-
block_dim,
251-
)
252-
store.commit(hashes[:batch_size], True)
253-
254-
store_all_hashes(hashes[:batch_size])
255-
256-
r_size, r_time, r_bw = fetch(
257-
store,
258-
hashes[:batch_size],
259-
kvcaches,
260-
num_tokens,
261-
block_len,
262-
block_layer,
263-
block_dim,
264-
)
265-
266-
w_bw_list.append(w_bw)
267-
r_bw_list.append(r_bw)
268-
w_time_list.append(w_time)
269-
r_time_list.append(r_time)
270-
w_size_sum += w_size
271-
r_size_sum += r_size
272-
273-
# Clean up resources
274-
del kvcaches, hashes, store
275-
torch.cuda.empty_cache()
276-
277-
avg_w_bw = sum(w_bw_list) / repeat
278-
avg_r_bw = sum(r_bw_list) / repeat
279-
avg_w_time = sum(w_time_list) / repeat
280-
avg_r_time = sum(r_time_list) / repeat
281-
avg_w_size = w_size_sum / (1024**3) / repeat
282-
avg_r_size = r_size_sum / (1024**3) / repeat
283-
284-
writer.writerow(
285-
[
286-
model_name,
287-
num_tokens,
288-
batch_size,
289-
block_layer,
290-
block_elem_size,
291-
kv,
292-
num_head,
293-
block_len,
294-
io_num,
295-
io_size,
296-
f"{avg_w_size:.4f}",
297-
f"{avg_w_time:.4f}",
298-
f"{avg_w_bw:.4f}",
299-
f"{avg_r_time:.4f}",
300-
f"{avg_r_bw:.4f}",
301-
]
302-
)
303-
304-
csv_fp.flush()
305-
306-
print("\n" + "=" * 60 + "\n= All combinations tested =\n" + "=" * 60 + "\n")
307-
308-
309-
if __name__ == "__main__":
310-
os.environ["UC_LOGGER_LEVEL"] = "debug"
311-
main()
132+
def run(
133+
storage_backends: str,
134+
device_id: int,
135+
repeat: int,
136+
num_head: int,
137+
block_len: int,
138+
transferStreamNumber: int,
139+
num_tokens: int,
140+
block_layer: int,
141+
head_size: int,
142+
block_elem_size: int,
143+
) -> Tuple[float, float, float, float, float, float]:
144+
"""
145+
Run a single test with given parameters and return performance metrics.
146+
147+
Returns:
148+
Tuple of (avg_w_size, avg_w_time, avg_w_bw, avg_r_time, avg_r_bw, avg_r_size)
149+
"""
150+
151+
block_dim = head_size * num_head
152+
io_size = block_dim * block_len * block_elem_size
153+
block_size = io_size * block_layer
154+
batch_size = int(num_tokens / block_len)
155+
real_blocks = batch_size + 10
156+
157+
w_bw_list, r_bw_list = [], []
158+
w_time_list, r_time_list = [], []
159+
w_size_sum, r_size_sum = 0.0, 0.0
160+
161+
store = setup(
162+
storage_backends, block_size, device_id, io_size, transferStreamNumber
163+
)
164+
for r in range(repeat):
165+
print(f"\n--- Round {r+1} ---")
166+
167+
hashes, kvcaches = make_buffers(
168+
real_blocks,
169+
device_id,
170+
batch_size,
171+
head_size,
172+
block_len,
173+
block_layer,
174+
num_head,
175+
)
176+
177+
results = store.create(hashes[:batch_size])
178+
assert sum(results) == 0, "Create operation failed"
179+
180+
w_size, w_time, w_bw = embed(
181+
store,
182+
hashes[:batch_size],
183+
kvcaches,
184+
num_tokens,
185+
block_len,
186+
block_layer,
187+
block_dim,
188+
)
189+
store.commit(hashes[:batch_size], True)
190+
191+
store_all_hashes(hashes[:batch_size])
192+
193+
r_size, r_time, r_bw = fetch(
194+
store,
195+
hashes[:batch_size],
196+
kvcaches,
197+
num_tokens,
198+
block_len,
199+
block_layer,
200+
block_dim,
201+
)
202+
203+
w_bw_list.append(w_bw)
204+
r_bw_list.append(r_bw)
205+
w_time_list.append(w_time)
206+
r_time_list.append(r_time)
207+
w_size_sum += w_size
208+
r_size_sum += r_size
209+
210+
# Clean up resources
211+
del kvcaches, hashes
212+
torch.cuda.empty_cache()
213+
214+
del store
215+
avg_w_bw = sum(w_bw_list) / repeat
216+
avg_r_bw = sum(r_bw_list) / repeat
217+
avg_w_time = sum(w_time_list) / repeat
218+
avg_r_time = sum(r_time_list) / repeat
219+
avg_w_size = w_size_sum / (1024**3) / repeat
220+
avg_r_size = r_size_sum / (1024**3) / repeat
221+
222+
return avg_w_size, avg_w_time, avg_w_bw, avg_r_time, avg_r_bw, avg_r_size

0 commit comments

Comments
 (0)