Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
be95065
naive interface definition of dramstore
ChenyuZhu1 Oct 13, 2025
c0f05aa
fix
ChenyuZhu1 Oct 13, 2025
e40b5ea
naive naive naive implementation
ChenyuZhu1 Oct 13, 2025
ba89e00
fix
ChenyuZhu1 Oct 14, 2025
add41d4
fix
ChenyuZhu1 Oct 14, 2025
1199074
fix
ChenyuZhu1 Oct 14, 2025
880986c
fix
ChenyuZhu1 Oct 14, 2025
1275c0f
fix
ChenyuZhu1 Oct 14, 2025
9029a70
fix
ChenyuZhu1 Oct 14, 2025
b3bee5f
fix
ChenyuZhu1 Oct 14, 2025
6c79f65
fix
ChenyuZhu1 Oct 14, 2025
60f72c7
fix
ChenyuZhu1 Oct 14, 2025
1670d2f
fix
ChenyuZhu1 Oct 14, 2025
5fe6167
fix
ChenyuZhu1 Oct 14, 2025
50612be
fix
ChenyuZhu1 Oct 14, 2025
6dde723
fix
ChenyuZhu1 Oct 14, 2025
11726cd
fix
ChenyuZhu1 Oct 14, 2025
5306a45
fix
ChenyuZhu1 Oct 14, 2025
4d446ab
fix
ChenyuZhu1 Oct 14, 2025
61cda77
fix
ChenyuZhu1 Oct 14, 2025
4c618e9
fix
ChenyuZhu1 Oct 14, 2025
71072ff
add memPool in infra/ and remove space/ dir in dramstore
ChenyuZhu1 Oct 14, 2025
ae69392
fix
ChenyuZhu1 Oct 14, 2025
dbc7e26
fix
ChenyuZhu1 Oct 15, 2025
851e323
fix
ChenyuZhu1 Oct 15, 2025
db39526
fix
ChenyuZhu1 Oct 15, 2025
6711566
fix
ChenyuZhu1 Oct 15, 2025
7441433
fix
ChenyuZhu1 Oct 15, 2025
8261cf1
fix
ChenyuZhu1 Oct 15, 2025
bb2ee75
fix
ChenyuZhu1 Oct 15, 2025
3ca9651
fix
ChenyuZhu1 Oct 15, 2025
22e1031
fix
ChenyuZhu1 Oct 15, 2025
1347e46
fix
ChenyuZhu1 Oct 15, 2025
9fc2223
fix
ChenyuZhu1 Oct 15, 2025
ce73e08
fix
ChenyuZhu1 Oct 15, 2025
1598d95
fix
ChenyuZhu1 Oct 15, 2025
488dca6
fix
ChenyuZhu1 Oct 15, 2025
c73466a
fix
ChenyuZhu1 Oct 15, 2025
e349854
fix
ChenyuZhu1 Oct 15, 2025
407dea7
fix
ChenyuZhu1 Oct 15, 2025
04aeb72
fix
ChenyuZhu1 Oct 15, 2025
cf90c05
fix
ChenyuZhu1 Oct 15, 2025
00c2bc2
add test for memory pool
ChenyuZhu1 Oct 15, 2025
5352792
fix
ChenyuZhu1 Oct 15, 2025
2069c76
fix
ChenyuZhu1 Oct 15, 2025
ec55a36
fix
ChenyuZhu1 Oct 15, 2025
7e9dd61
add LRU cache
ChenyuZhu1 Oct 16, 2025
114c3b2
fix
ChenyuZhu1 Oct 16, 2025
609f80e
fix
ChenyuZhu1 Oct 16, 2025
fc6689c
fix
ChenyuZhu1 Oct 16, 2025
a03d6a7
fix
ChenyuZhu1 Oct 16, 2025
e574023
fix
ChenyuZhu1 Oct 16, 2025
27a0b4d
fix
ChenyuZhu1 Oct 16, 2025
432b43f
fix
ChenyuZhu1 Oct 16, 2025
b38bc2f
fix
ChenyuZhu1 Oct 16, 2025
b64d30b
fix
ChenyuZhu1 Oct 16, 2025
935109e
fix
ChenyuZhu1 Oct 16, 2025
ce0cbb7
fix
ChenyuZhu1 Oct 16, 2025
b8c7210
fix
ChenyuZhu1 Oct 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions ucm/store/dramstore/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
file(GLOB_RECURSE UCMSTORE_DRAM_CC_SOURCE_FILES "./cc/*.cc")
add_library(dramstore STATIC ${UCMSTORE_DRAM_CC_SOURCE_FILES})
target_include_directories(dramstore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cc)
target_link_libraries(dramstore PUBLIC storeinfra)
target_include_directories(dramstore PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/cc/api
${CMAKE_CURRENT_SOURCE_DIR}/cc/domain
)
target_link_libraries(dramstore PUBLIC storeinfra storedevice)

file(GLOB_RECURSE UCMSTORE_DRAM_CPY_SOURCE_FILES "./cpy/*.cc")
pybind11_add_module(ucmdramstore ${UCMSTORE_DRAM_CPY_SOURCE_FILES})
Expand Down
81 changes: 70 additions & 11 deletions ucm/store/dramstore/cc/api/dramstore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,27 +24,86 @@
#include "dramstore.h"
#include "logger/logger.h"
#include "status/status.h"
#include "tsf_task/dram_tsf_task_manager.h"
#include "tsf_task/dram_tsf_task.h"
#include "memory/memory_pool.h"

namespace UC {

class DRAMStoreImpl : public DRAMStore {
public:
int32_t Setup(const size_t ioSize, const size_t capacity, const int32_t deviceId) { return -1; }
int32_t Alloc(const std::string& block) override { return -1; }
bool Lookup(const std::string& block) override { return false; }
void Commit(const std::string& block, const bool success) override {}
int32_t Setup(const Config& config) {
// config里之前没有blockSize,稳妥起见还是先打桩
int32_t blockSize = 128;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can get blocksize here from vllm.config?

Copy link
Contributor Author

@ChenyuZhu1 ChenyuZhu1 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes you are right, but currently blockSize is not added into config. If possible, it will be added into the Config class soon.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need blocksize? config.def_readwrite("ioSize", &UC::DRAMStorePy::Config::ioSize) we have iosize here

this->memPool_ = std::make_unique<MemoryPool>(config.capacity, blockSize).release();
// 初始化memPool的办法是否正确?如果失败的话怎么办?
// int32_t streamNumber = 60; // 这个参数是否需要,以及怎么传,还要讨论
// int32_t timeoutMs = 10000; // 这个参数是否需要,以及怎么传,还要讨论
auto status = this->transMgr_.Setup(config.deviceId, config.streamNumber, config.timeoutMs, this->memPool_);
if (status.Failure()) {
UC_ERROR("Failed({}) to setup DramTransferTaskManager.", status);
return status.Underlying();
}
return Status::OK().Underlying();
}

int32_t Alloc(const std::string& block) override {
return this->memPool_->NewBlock(block).Underlying();
}

bool Lookup(const std::string& block) override {
return this->memPool_->LookupBlock(block);
}

void Commit(const std::string& block, const bool success) override {
this->memPool_->CommitBlock(block, success).Underlying();
}

std::list<int32_t> Alloc(const std::list<std::string>& blocks) override
{
return std::list<int32_t>();
std::list<int32_t> results;
for (const auto &block : blocks) {
results.emplace_back(this->Alloc(block));
}
return results;
}

std::list<bool> Lookup(const std::list<std::string>& blocks) override
{
return std::list<bool>();
std::list<bool> founds;
for (const auto &block : blocks) {
founds.emplace_back(this->Lookup(block));
}
return founds;
}
void Commit(const std::list<std::string>& blocks, const bool success) override {}
size_t Submit(Task&& task) override { return 0; }
int32_t Wait(const size_t task) override { return -1; }
int32_t Check(const size_t task, bool& finish) override { return -1; }

void Commit(const std::list<std::string>& blocks, const bool success) override {
for (const auto &block : blocks) {
this->Commit(block, success);
}
}

size_t Submit(Task&& task) override {
std::list<DramTsfTask> tasks;
for (auto& shard : task.shards) {
tasks.push_back({task.type, shard.block, shard.offset, shard.address, task.size});
}
size_t taskId;
return this->transMgr_.Submit(tasks, task.number * task.size, task.number, task.brief, taskId).Success() ? taskId : CCStore::invalidTaskId;
}

int32_t Wait(const size_t task) override {
return this->transMgr_.Wait(task).Underlying();
}

int32_t Check(const size_t task, bool& finish) override {
return this->transMgr_.Check(task, finish).Underlying();
}

private:
// DramSpaceManager spaceMgr_;
MemoryPool* memPool_;
DramTsfTaskManager transMgr_;
};

int32_t DRAMStore::Setup(const Config& config)
Expand All @@ -55,7 +114,7 @@ int32_t DRAMStore::Setup(const Config& config)
return Status::OutOfMemory().Underlying();
}
this->impl_ = impl;
return impl->Setup(config.ioSize, config.capacity, config.deviceId);
return impl->Setup(config);
}

} // namespace UC
6 changes: 4 additions & 2 deletions ucm/store/dramstore/cc/api/dramstore.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@ class DRAMStore : public CCStore {
size_t ioSize;
size_t capacity;
int32_t deviceId;
Config(const size_t ioSize, const size_t capacity)
: ioSize{ioSize}, capacity{capacity}, deviceId{-1}
size_t streamNumber;
size_t timeoutMs;
Config(const size_t ioSize, const size_t capacity, const size_t streamNumber, const size_t timeoutMs)
: ioSize{ioSize}, capacity{capacity}, deviceId{-1}, streamNumber(streamNumber), timeoutMs(timeoutMs)
{
}
};
Expand Down
61 changes: 61 additions & 0 deletions ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#ifndef UNIFIEDCACHE_DRAM_TSF_TASK_H
#define UNIFIEDCACHE_DRAM_TSF_TASK_H

#include "ucmstore.h"
#include "dram_tsf_task_waiter.h"

namespace UC {

class DramTsfTask {
public:
using Type = CCStore::Task::Type;
// using Location = CCStore::Task::Location; // 这个不再需要了,因为只有D2H和H2D两种传输

public:
DramTsfTask(const Type type, const std::string& blockId,
const size_t offset, const uintptr_t address, const size_t length)
: type{type}, blockId{blockId}, offset{offset}, address{address},
length{length}, owner{0}, waiter{nullptr}
{
}
DramTsfTask() : DramTsfTask{Type::DUMP, {}, 0, 0, 0} {} // 无参构造函数

public:
Type type;
// Location location; // 不需要了
std::string blockId; // 对于一个task来说,这个bloockID和下一行的offset还是需要的,因为它们本质上是上层传来的。(参考dramstore.py.cc)
size_t offset;
uintptr_t address; // 在显卡上的地址
size_t length; // 数据传输的长度

size_t owner; // 大的Task的TaskId
std::shared_ptr<DramTsfTaskWaiter> waiter;
// std::shared_ptr<std::byte> hub; // 在nfsstore中,这个的意思是中转站(对于nfsstore来说,host的目的就是数据中转),因此在dram_connector中不再需要
};

} // namespace UC

#endif
106 changes: 106 additions & 0 deletions ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#include "dram_tsf_task_manager.h"

namespace UC {

Status DramTsfTaskManager::Setup(const int32_t deviceId, const size_t streamNumber,
const size_t timeoutMs, MemoryPool* memPool)
{
this->_queues.reserve(streamNumber);
for (size_t i = 0; i < streamNumber; ++i) {
auto& queue = this->_queues.emplace_back(std::make_unique<DramTsfTaskQueue>());
auto status = queue->Setup(deviceId, &this->_failureSet, memPool);
if (status.Failure()) { return status; }
}
this->_timeoutMs = timeoutMs;
return Status::OK();
}

Status DramTsfTaskManager::Submit(std::list<DramTsfTask>& tasks, const size_t size, const size_t number,
const std::string& brief, size_t& taskId)
{
std::unique_lock<std::mutex> lk(this->_mutex);
taskId = ++this->_taskIdSeed;
auto [iter, success] = this->_waiters.emplace(
taskId, std::make_shared<DramTsfTaskWaiter>(taskId, size, number, brief));
if (!success) { return Status::OutOfMemory(); }
std::vector<std::list<DramTsfTask>> lists;
this->Dispatch(tasks, lists, taskId, iter->second);
for (size_t i = 0; i < lists.size(); i++) {
if (lists[i].empty()) { continue; }
this->_queues[this->_qIdx]->Push(lists[i]);
this->_qIdx = (this->_qIdx + 1) % this->_queues.size();
}
return Status::OK();
}

Status DramTsfTaskManager::Wait(const size_t taskId)
{
std::shared_ptr<DramTsfTaskWaiter> waiter = nullptr;
{
std::unique_lock<std::mutex> lk(this->_mutex);
auto iter = this->_waiters.find(taskId);
if (iter == this->_waiters.end()) { return Status::NotFound(); }
waiter = iter->second;
this->_waiters.erase(iter);
}
if (!waiter->Wait(this->_timeoutMs)) {
this->_failureSet.Insert(taskId);
waiter->Wait();
}
bool failure = this->_failureSet.Contains(taskId);
this->_failureSet.Remove(taskId);
if (failure) { UC_ERROR("Transfer task({}) failed.", taskId); }
return failure ? Status::Error() : Status::OK();
}

Status DramTsfTaskManager::Check(const size_t taskId, bool& finish)
{
std::lock_guard<std::mutex> lk(this->_mutex);
auto iter = this->_waiters.find(taskId);
if (iter == this->_waiters.end()) { return Status::NotFound(); }
finish = iter->second->Finish();
return Status::OK();
}

void DramTsfTaskManager::Dispatch(std::list<DramTsfTask>& tasks, std::vector<std::list<DramTsfTask>>& targets,
const size_t taskId, std::shared_ptr<DramTsfTaskWaiter> waiter) const
{
auto qNumber = this->_queues.size();
auto index = size_t(0);
targets.resize(qNumber);
auto it = tasks.begin();
while (it != tasks.end()) {
auto next = std::next(it);
it->owner = taskId;
it->waiter = waiter;
auto& target = targets[index % qNumber];
target.splice(target.end(), tasks, it);
index++;
it = next;
}
}

} // namespace UC
60 changes: 60 additions & 0 deletions ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#ifndef UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H
#define UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H

#include <memory>
#include <unordered_map>
#include <vector>
#include "dram_tsf_task_queue.h"

namespace UC {

class DramTsfTaskManager {
public:
Status Setup(const int32_t deviceId, const size_t streamNumber,
// const size_t bufferSize, const size_t bufferNumber, // 这两个可能不需要,对于dram来说,因为这个buffer是用来数据中转的?
const size_t timeoutMs, MemoryPool* memPool);
Status Submit(std::list<DramTsfTask>& tasks, const size_t size, const size_t number,
const std::string& brief, size_t& taskId);
Status Wait(const size_t taskId);
Status Check(const size_t taskId, bool& finish);

private:
void Dispatch(std::list<DramTsfTask>& tasks, std::vector<std::list<DramTsfTask>>& targets,
const size_t taskId, std::shared_ptr<DramTsfTaskWaiter> waiter) const;

private:
std::mutex _mutex;
DramTsfTaskSet _failureSet;
std::unordered_map<size_t, std::shared_ptr<DramTsfTaskWaiter>> _waiters;
std::vector<std::unique_ptr<DramTsfTaskQueue>> _queues;
size_t _qIdx{0};
size_t _taskIdSeed{0};
size_t _timeoutMs{0};
};

} // namespace UC

#endif
Loading