-
Notifications
You must be signed in to change notification settings - Fork 26
[WIP] Implementation of DRAM connector #285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
ChenyuZhu1
wants to merge
59
commits into
ModelEngine-Group:develop
Choose a base branch
from
ChenyuZhu1:develop
base: develop
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
59 commits
Select commit
Hold shift + click to select a range
be95065
naive interface definition of dramstore
ChenyuZhu1 c0f05aa
fix
ChenyuZhu1 e40b5ea
naive naive naive implementation
ChenyuZhu1 ba89e00
fix
ChenyuZhu1 add41d4
fix
ChenyuZhu1 1199074
fix
ChenyuZhu1 880986c
fix
ChenyuZhu1 1275c0f
fix
ChenyuZhu1 9029a70
fix
ChenyuZhu1 b3bee5f
fix
ChenyuZhu1 6c79f65
fix
ChenyuZhu1 60f72c7
fix
ChenyuZhu1 1670d2f
fix
ChenyuZhu1 5fe6167
fix
ChenyuZhu1 50612be
fix
ChenyuZhu1 6dde723
fix
ChenyuZhu1 11726cd
fix
ChenyuZhu1 5306a45
fix
ChenyuZhu1 4d446ab
fix
ChenyuZhu1 61cda77
fix
ChenyuZhu1 4c618e9
fix
ChenyuZhu1 71072ff
add memPool in infra/ and remove space/ dir in dramstore
ChenyuZhu1 ae69392
fix
ChenyuZhu1 dbc7e26
fix
ChenyuZhu1 851e323
fix
ChenyuZhu1 db39526
fix
ChenyuZhu1 6711566
fix
ChenyuZhu1 7441433
fix
ChenyuZhu1 8261cf1
fix
ChenyuZhu1 bb2ee75
fix
ChenyuZhu1 3ca9651
fix
ChenyuZhu1 22e1031
fix
ChenyuZhu1 1347e46
fix
ChenyuZhu1 9fc2223
fix
ChenyuZhu1 ce73e08
fix
ChenyuZhu1 1598d95
fix
ChenyuZhu1 488dca6
fix
ChenyuZhu1 c73466a
fix
ChenyuZhu1 e349854
fix
ChenyuZhu1 407dea7
fix
ChenyuZhu1 04aeb72
fix
ChenyuZhu1 cf90c05
fix
ChenyuZhu1 00c2bc2
add test for memory pool
ChenyuZhu1 5352792
fix
ChenyuZhu1 2069c76
fix
ChenyuZhu1 ec55a36
fix
ChenyuZhu1 7e9dd61
add LRU cache
ChenyuZhu1 114c3b2
fix
ChenyuZhu1 609f80e
fix
ChenyuZhu1 fc6689c
fix
ChenyuZhu1 a03d6a7
fix
ChenyuZhu1 e574023
fix
ChenyuZhu1 27a0b4d
fix
ChenyuZhu1 432b43f
fix
ChenyuZhu1 b38bc2f
fix
ChenyuZhu1 b64d30b
fix
ChenyuZhu1 935109e
fix
ChenyuZhu1 ce0cbb7
fix
ChenyuZhu1 b8c7210
fix
ChenyuZhu1 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,27 +24,86 @@ | |
| #include "dramstore.h" | ||
| #include "logger/logger.h" | ||
| #include "status/status.h" | ||
| #include "tsf_task/dram_tsf_task_manager.h" | ||
| #include "tsf_task/dram_tsf_task.h" | ||
| #include "memory/memory_pool.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| class DRAMStoreImpl : public DRAMStore { | ||
| public: | ||
| int32_t Setup(const size_t ioSize, const size_t capacity, const int32_t deviceId) { return -1; } | ||
| int32_t Alloc(const std::string& block) override { return -1; } | ||
| bool Lookup(const std::string& block) override { return false; } | ||
| void Commit(const std::string& block, const bool success) override {} | ||
| int32_t Setup(const Config& config) { | ||
| // config里之前没有blockSize,稳妥起见还是先打桩 | ||
| int32_t blockSize = 128; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need blocksize? config.def_readwrite("ioSize", &UC::DRAMStorePy::Config::ioSize) we have iosize here |
||
| this->memPool_ = std::make_unique<MemoryPool>(config.capacity, blockSize).release(); | ||
| // 初始化memPool的办法是否正确?如果失败的话怎么办? | ||
| // int32_t streamNumber = 60; // 这个参数是否需要,以及怎么传,还要讨论 | ||
| // int32_t timeoutMs = 10000; // 这个参数是否需要,以及怎么传,还要讨论 | ||
| auto status = this->transMgr_.Setup(config.deviceId, config.streamNumber, config.timeoutMs, this->memPool_); | ||
| if (status.Failure()) { | ||
| UC_ERROR("Failed({}) to setup DramTransferTaskManager.", status); | ||
| return status.Underlying(); | ||
| } | ||
| return Status::OK().Underlying(); | ||
| } | ||
|
|
||
| int32_t Alloc(const std::string& block) override { | ||
| return this->memPool_->NewBlock(block).Underlying(); | ||
| } | ||
|
|
||
| bool Lookup(const std::string& block) override { | ||
| return this->memPool_->LookupBlock(block); | ||
| } | ||
|
|
||
| void Commit(const std::string& block, const bool success) override { | ||
| this->memPool_->CommitBlock(block, success).Underlying(); | ||
| } | ||
|
|
||
| std::list<int32_t> Alloc(const std::list<std::string>& blocks) override | ||
| { | ||
| return std::list<int32_t>(); | ||
| std::list<int32_t> results; | ||
| for (const auto &block : blocks) { | ||
| results.emplace_back(this->Alloc(block)); | ||
| } | ||
| return results; | ||
| } | ||
|
|
||
| std::list<bool> Lookup(const std::list<std::string>& blocks) override | ||
| { | ||
| return std::list<bool>(); | ||
| std::list<bool> founds; | ||
| for (const auto &block : blocks) { | ||
| founds.emplace_back(this->Lookup(block)); | ||
| } | ||
| return founds; | ||
| } | ||
| void Commit(const std::list<std::string>& blocks, const bool success) override {} | ||
| size_t Submit(Task&& task) override { return 0; } | ||
| int32_t Wait(const size_t task) override { return -1; } | ||
| int32_t Check(const size_t task, bool& finish) override { return -1; } | ||
|
|
||
| void Commit(const std::list<std::string>& blocks, const bool success) override { | ||
| for (const auto &block : blocks) { | ||
| this->Commit(block, success); | ||
| } | ||
| } | ||
|
|
||
| size_t Submit(Task&& task) override { | ||
| std::list<DramTsfTask> tasks; | ||
| for (auto& shard : task.shards) { | ||
| tasks.push_back({task.type, shard.block, shard.offset, shard.address, task.size}); | ||
| } | ||
| size_t taskId; | ||
| return this->transMgr_.Submit(tasks, task.number * task.size, task.number, task.brief, taskId).Success() ? taskId : CCStore::invalidTaskId; | ||
| } | ||
|
|
||
| int32_t Wait(const size_t task) override { | ||
| return this->transMgr_.Wait(task).Underlying(); | ||
| } | ||
|
|
||
| int32_t Check(const size_t task, bool& finish) override { | ||
| return this->transMgr_.Check(task, finish).Underlying(); | ||
| } | ||
|
|
||
| private: | ||
| // DramSpaceManager spaceMgr_; | ||
| MemoryPool* memPool_; | ||
| DramTsfTaskManager transMgr_; | ||
| }; | ||
|
|
||
| int32_t DRAMStore::Setup(const Config& config) | ||
|
|
@@ -55,7 +114,7 @@ int32_t DRAMStore::Setup(const Config& config) | |
| return Status::OutOfMemory().Underlying(); | ||
| } | ||
| this->impl_ = impl; | ||
| return impl->Setup(config.ioSize, config.capacity, config.deviceId); | ||
| return impl->Setup(config); | ||
| } | ||
|
|
||
| } // namespace UC | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /** | ||
| * MIT License | ||
| * | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in all | ||
| * copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| * SOFTWARE. | ||
| * */ | ||
| #ifndef UNIFIEDCACHE_DRAM_TSF_TASK_H | ||
| #define UNIFIEDCACHE_DRAM_TSF_TASK_H | ||
|
|
||
| #include "ucmstore.h" | ||
| #include "dram_tsf_task_waiter.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| class DramTsfTask { | ||
| public: | ||
| using Type = CCStore::Task::Type; | ||
| // using Location = CCStore::Task::Location; // 这个不再需要了,因为只有D2H和H2D两种传输 | ||
|
|
||
| public: | ||
| DramTsfTask(const Type type, const std::string& blockId, | ||
| const size_t offset, const uintptr_t address, const size_t length) | ||
| : type{type}, blockId{blockId}, offset{offset}, address{address}, | ||
| length{length}, owner{0}, waiter{nullptr} | ||
| { | ||
| } | ||
| DramTsfTask() : DramTsfTask{Type::DUMP, {}, 0, 0, 0} {} // 无参构造函数 | ||
|
|
||
| public: | ||
| Type type; | ||
| // Location location; // 不需要了 | ||
| std::string blockId; // 对于一个task来说,这个bloockID和下一行的offset还是需要的,因为它们本质上是上层传来的。(参考dramstore.py.cc) | ||
| size_t offset; | ||
| uintptr_t address; // 在显卡上的地址 | ||
| size_t length; // 数据传输的长度 | ||
|
|
||
| size_t owner; // 大的Task的TaskId | ||
| std::shared_ptr<DramTsfTaskWaiter> waiter; | ||
| // std::shared_ptr<std::byte> hub; // 在nfsstore中,这个的意思是中转站(对于nfsstore来说,host的目的就是数据中转),因此在dram_connector中不再需要 | ||
| }; | ||
|
|
||
| } // namespace UC | ||
|
|
||
| #endif |
106 changes: 106 additions & 0 deletions
106
ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.cc
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /** | ||
| * MIT License | ||
| * | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in all | ||
| * copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| * SOFTWARE. | ||
| * */ | ||
| #include "dram_tsf_task_manager.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| Status DramTsfTaskManager::Setup(const int32_t deviceId, const size_t streamNumber, | ||
| const size_t timeoutMs, MemoryPool* memPool) | ||
| { | ||
| this->_queues.reserve(streamNumber); | ||
| for (size_t i = 0; i < streamNumber; ++i) { | ||
| auto& queue = this->_queues.emplace_back(std::make_unique<DramTsfTaskQueue>()); | ||
| auto status = queue->Setup(deviceId, &this->_failureSet, memPool); | ||
| if (status.Failure()) { return status; } | ||
| } | ||
| this->_timeoutMs = timeoutMs; | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status DramTsfTaskManager::Submit(std::list<DramTsfTask>& tasks, const size_t size, const size_t number, | ||
| const std::string& brief, size_t& taskId) | ||
| { | ||
| std::unique_lock<std::mutex> lk(this->_mutex); | ||
| taskId = ++this->_taskIdSeed; | ||
| auto [iter, success] = this->_waiters.emplace( | ||
| taskId, std::make_shared<DramTsfTaskWaiter>(taskId, size, number, brief)); | ||
| if (!success) { return Status::OutOfMemory(); } | ||
| std::vector<std::list<DramTsfTask>> lists; | ||
| this->Dispatch(tasks, lists, taskId, iter->second); | ||
| for (size_t i = 0; i < lists.size(); i++) { | ||
| if (lists[i].empty()) { continue; } | ||
| this->_queues[this->_qIdx]->Push(lists[i]); | ||
| this->_qIdx = (this->_qIdx + 1) % this->_queues.size(); | ||
| } | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status DramTsfTaskManager::Wait(const size_t taskId) | ||
| { | ||
| std::shared_ptr<DramTsfTaskWaiter> waiter = nullptr; | ||
| { | ||
| std::unique_lock<std::mutex> lk(this->_mutex); | ||
| auto iter = this->_waiters.find(taskId); | ||
| if (iter == this->_waiters.end()) { return Status::NotFound(); } | ||
| waiter = iter->second; | ||
| this->_waiters.erase(iter); | ||
| } | ||
| if (!waiter->Wait(this->_timeoutMs)) { | ||
| this->_failureSet.Insert(taskId); | ||
| waiter->Wait(); | ||
| } | ||
| bool failure = this->_failureSet.Contains(taskId); | ||
| this->_failureSet.Remove(taskId); | ||
| if (failure) { UC_ERROR("Transfer task({}) failed.", taskId); } | ||
| return failure ? Status::Error() : Status::OK(); | ||
| } | ||
|
|
||
| Status DramTsfTaskManager::Check(const size_t taskId, bool& finish) | ||
| { | ||
| std::lock_guard<std::mutex> lk(this->_mutex); | ||
| auto iter = this->_waiters.find(taskId); | ||
| if (iter == this->_waiters.end()) { return Status::NotFound(); } | ||
| finish = iter->second->Finish(); | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| void DramTsfTaskManager::Dispatch(std::list<DramTsfTask>& tasks, std::vector<std::list<DramTsfTask>>& targets, | ||
| const size_t taskId, std::shared_ptr<DramTsfTaskWaiter> waiter) const | ||
| { | ||
| auto qNumber = this->_queues.size(); | ||
| auto index = size_t(0); | ||
| targets.resize(qNumber); | ||
| auto it = tasks.begin(); | ||
| while (it != tasks.end()) { | ||
| auto next = std::next(it); | ||
| it->owner = taskId; | ||
| it->waiter = waiter; | ||
| auto& target = targets[index % qNumber]; | ||
| target.splice(target.end(), tasks, it); | ||
| index++; | ||
| it = next; | ||
| } | ||
| } | ||
|
|
||
| } // namespace UC |
60 changes: 60 additions & 0 deletions
60
ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.h
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /** | ||
| * MIT License | ||
| * | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in all | ||
| * copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| * SOFTWARE. | ||
| * */ | ||
| #ifndef UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H | ||
| #define UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H | ||
|
|
||
| #include <memory> | ||
| #include <unordered_map> | ||
| #include <vector> | ||
| #include "dram_tsf_task_queue.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| class DramTsfTaskManager { | ||
| public: | ||
| Status Setup(const int32_t deviceId, const size_t streamNumber, | ||
| // const size_t bufferSize, const size_t bufferNumber, // 这两个可能不需要,对于dram来说,因为这个buffer是用来数据中转的? | ||
| const size_t timeoutMs, MemoryPool* memPool); | ||
| Status Submit(std::list<DramTsfTask>& tasks, const size_t size, const size_t number, | ||
| const std::string& brief, size_t& taskId); | ||
| Status Wait(const size_t taskId); | ||
| Status Check(const size_t taskId, bool& finish); | ||
|
|
||
| private: | ||
| void Dispatch(std::list<DramTsfTask>& tasks, std::vector<std::list<DramTsfTask>>& targets, | ||
| const size_t taskId, std::shared_ptr<DramTsfTaskWaiter> waiter) const; | ||
|
|
||
| private: | ||
| std::mutex _mutex; | ||
| DramTsfTaskSet _failureSet; | ||
| std::unordered_map<size_t, std::shared_ptr<DramTsfTaskWaiter>> _waiters; | ||
| std::vector<std::unique_ptr<DramTsfTaskQueue>> _queues; | ||
| size_t _qIdx{0}; | ||
| size_t _taskIdSeed{0}; | ||
| size_t _timeoutMs{0}; | ||
| }; | ||
|
|
||
| } // namespace UC | ||
|
|
||
| #endif |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can get blocksize here from vllm.config?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right, but currently blockSize is not added into
config. If possible, it will be added into the Config class soon.