-
Notifications
You must be signed in to change notification settings - Fork 26
[WIP] Implementation of DRAM connector #285
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: develop
Are you sure you want to change the base?
Changes from 56 commits
be95065
c0f05aa
e40b5ea
ba89e00
add41d4
1199074
880986c
1275c0f
9029a70
b3bee5f
6c79f65
60f72c7
1670d2f
5fe6167
50612be
6dde723
11726cd
5306a45
4d446ab
61cda77
4c618e9
71072ff
ae69392
dbc7e26
851e323
db39526
6711566
7441433
8261cf1
bb2ee75
3ca9651
22e1031
1347e46
9fc2223
ce73e08
1598d95
488dca6
c73466a
e349854
407dea7
04aeb72
cf90c05
00c2bc2
5352792
2069c76
ec55a36
7e9dd61
114c3b2
609f80e
fc6689c
a03d6a7
e574023
27a0b4d
432b43f
b38bc2f
b64d30b
935109e
ce0cbb7
b8c7210
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,27 +24,86 @@ | |
| #include "dramstore.h" | ||
| #include "logger/logger.h" | ||
| #include "status/status.h" | ||
| #include "tsf_task/dram_tsf_task_manager.h" | ||
| #include "tsf_task/dram_tsf_task.h" | ||
| #include "memory/memory_pool.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| class DRAMStoreImpl : public DRAMStore { | ||
| public: | ||
| int32_t Setup(const size_t ioSize, const size_t capacity, const int32_t deviceId) { return -1; } | ||
| int32_t Alloc(const std::string& block) override { return -1; } | ||
| bool Lookup(const std::string& block) override { return false; } | ||
| void Commit(const std::string& block, const bool success) override {} | ||
| int32_t Setup(const Config& config) { | ||
| // config里之前没有blockSize,稳妥起见还是先打桩 | ||
| int32_t blockSize = 128; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need blocksize? config.def_readwrite("ioSize", &UC::DRAMStorePy::Config::ioSize) we have iosize here |
||
| this->memPool_ = std::make_unique<MemoryPool>(config.capacity, blockSize).release(); | ||
| // 初始化memPool的办法是否正确?如果失败的话怎么办? | ||
| int32_t streamNumber = 60; // 这个参数是否需要,以及怎么传,还要讨论 | ||
| int32_t timeoutMs = 10000; // 这个参数是否需要,以及怎么传,还要讨论 | ||
|
||
| auto status = this->transMgr_.Setup(config.deviceId, streamNumber, timeoutMs, this->memPool_); | ||
| if (status.Failure()) { | ||
| UC_ERROR("Failed({}) to setup DramTransferTaskManager.", status); | ||
| return status.Underlying(); | ||
| } | ||
| return Status::OK().Underlying(); | ||
| } | ||
|
|
||
| int32_t Alloc(const std::string& block) override { | ||
| return this->memPool_->NewBlock(block).Underlying(); | ||
| } | ||
|
|
||
| bool Lookup(const std::string& block) override { | ||
| return this->memPool_->LookupBlock(block); | ||
| } | ||
|
|
||
| void Commit(const std::string& block, const bool success) override { | ||
| this->memPool_->CommitBlock(block, success).Underlying(); | ||
| } | ||
|
|
||
| std::list<int32_t> Alloc(const std::list<std::string>& blocks) override | ||
| { | ||
| return std::list<int32_t>(); | ||
| std::list<int32_t> results; | ||
| for (const auto &block : blocks) { | ||
| results.emplace_back(this->Alloc(block)); | ||
| } | ||
| return results; | ||
| } | ||
|
|
||
| std::list<bool> Lookup(const std::list<std::string>& blocks) override | ||
| { | ||
| return std::list<bool>(); | ||
| std::list<bool> founds; | ||
| for (const auto &block : blocks) { | ||
| founds.emplace_back(this->Lookup(block)); | ||
| } | ||
| return founds; | ||
| } | ||
| void Commit(const std::list<std::string>& blocks, const bool success) override {} | ||
| size_t Submit(Task&& task) override { return 0; } | ||
| int32_t Wait(const size_t task) override { return -1; } | ||
| int32_t Check(const size_t task, bool& finish) override { return -1; } | ||
|
|
||
| void Commit(const std::list<std::string>& blocks, const bool success) override { | ||
| for (const auto &block : blocks) { | ||
| this->Commit(block, success); | ||
| } | ||
| } | ||
|
|
||
| size_t Submit(Task&& task) override { | ||
| std::list<DramTsfTask> tasks; | ||
| for (auto& shard : task.shards) { | ||
| tasks.push_back({task.type, shard.block, shard.offset, shard.address, task.size}); | ||
| } | ||
| size_t taskId; | ||
| return this->transMgr_.Submit(tasks, task.number * task.size, task.number, task.brief, taskId).Success() ? taskId : CCStore::invalidTaskId; | ||
| } | ||
|
|
||
| int32_t Wait(const size_t task) override { | ||
| return this->transMgr_.Wait(task).Underlying(); | ||
| } | ||
|
|
||
| int32_t Check(const size_t task, bool& finish) override { | ||
| return this->transMgr_.Check(task, finish).Underlying(); | ||
| } | ||
|
|
||
| private: | ||
| // DramSpaceManager spaceMgr_; | ||
| MemoryPool* memPool_; | ||
| DramTsfTaskManager transMgr_; | ||
| }; | ||
|
|
||
| int32_t DRAMStore::Setup(const Config& config) | ||
|
|
@@ -55,7 +114,7 @@ int32_t DRAMStore::Setup(const Config& config) | |
| return Status::OutOfMemory().Underlying(); | ||
| } | ||
| this->impl_ = impl; | ||
| return impl->Setup(config.ioSize, config.capacity, config.deviceId); | ||
| return impl->Setup(config); | ||
| } | ||
|
|
||
| } // namespace UC | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,61 @@ | ||
| /** | ||
| * MIT License | ||
| * | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in all | ||
| * copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| * SOFTWARE. | ||
| * */ | ||
| #ifndef UNIFIEDCACHE_DRAM_TSF_TASK_H | ||
| #define UNIFIEDCACHE_DRAM_TSF_TASK_H | ||
|
|
||
| #include "ucmstore.h" | ||
| #include "dram_tsf_task_waiter.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| class DramTsfTask { | ||
| public: | ||
| using Type = CCStore::Task::Type; | ||
| // using Location = CCStore::Task::Location; // 这个不再需要了,因为只有D2H和H2D两种传输 | ||
|
|
||
| public: | ||
| DramTsfTask(const Type type, const std::string& blockId, | ||
| const size_t offset, const uintptr_t address, const size_t length) | ||
| : type{type}, blockId{blockId}, offset{offset}, address{address}, | ||
| length{length}, owner{0}, waiter{nullptr} | ||
| { | ||
| } | ||
| DramTsfTask() : DramTsfTask{Type::DUMP, {}, 0, 0, 0} {} // 无参构造函数 | ||
|
|
||
| public: | ||
| Type type; | ||
| // Location location; // 不需要了 | ||
| std::string blockId; // 对于一个task来说,这个bloockID和下一行的offset还是需要的,因为它们本质上是上层传来的。(参考dramstore.py.cc) | ||
| size_t offset; | ||
| uintptr_t address; // 在显卡上的地址 | ||
| size_t length; // 数据传输的长度 | ||
|
|
||
| size_t owner; // 大的Task的TaskId | ||
| std::shared_ptr<DramTsfTaskWaiter> waiter; | ||
| // std::shared_ptr<std::byte> hub; // 在nfsstore中,这个的意思是中转站(对于nfsstore来说,host的目的就是数据中转),因此在dram_connector中不再需要 | ||
| }; | ||
|
|
||
| } // namespace UC | ||
|
|
||
| #endif |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,106 @@ | ||
| /** | ||
| * MIT License | ||
| * | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in all | ||
| * copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| * SOFTWARE. | ||
| * */ | ||
| #include "dram_tsf_task_manager.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| Status DramTsfTaskManager::Setup(const int32_t deviceId, const size_t streamNumber, | ||
| const size_t timeoutMs, MemoryPool* memPool) | ||
| { | ||
| this->_queues.reserve(streamNumber); | ||
| for (size_t i = 0; i < streamNumber; ++i) { | ||
| auto& queue = this->_queues.emplace_back(std::make_unique<DramTsfTaskQueue>()); | ||
| auto status = queue->Setup(deviceId, &this->_failureSet, memPool); | ||
| if (status.Failure()) { return status; } | ||
| } | ||
| this->_timeoutMs = timeoutMs; | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status DramTsfTaskManager::Submit(std::list<DramTsfTask>& tasks, const size_t size, const size_t number, | ||
| const std::string& brief, size_t& taskId) | ||
| { | ||
| std::unique_lock<std::mutex> lk(this->_mutex); | ||
| taskId = ++this->_taskIdSeed; | ||
| auto [iter, success] = this->_waiters.emplace( | ||
| taskId, std::make_shared<DramTsfTaskWaiter>(taskId, size, number, brief)); | ||
| if (!success) { return Status::OutOfMemory(); } | ||
| std::vector<std::list<DramTsfTask>> lists; | ||
| this->Dispatch(tasks, lists, taskId, iter->second); | ||
| for (size_t i = 0; i < lists.size(); i++) { | ||
| if (lists[i].empty()) { continue; } | ||
| this->_queues[this->_qIdx]->Push(lists[i]); | ||
| this->_qIdx = (this->_qIdx + 1) % this->_queues.size(); | ||
| } | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| Status DramTsfTaskManager::Wait(const size_t taskId) | ||
| { | ||
| std::shared_ptr<DramTsfTaskWaiter> waiter = nullptr; | ||
| { | ||
| std::unique_lock<std::mutex> lk(this->_mutex); | ||
| auto iter = this->_waiters.find(taskId); | ||
| if (iter == this->_waiters.end()) { return Status::NotFound(); } | ||
| waiter = iter->second; | ||
| this->_waiters.erase(iter); | ||
| } | ||
| if (!waiter->Wait(this->_timeoutMs)) { | ||
| this->_failureSet.Insert(taskId); | ||
| waiter->Wait(); | ||
| } | ||
| bool failure = this->_failureSet.Contains(taskId); | ||
| this->_failureSet.Remove(taskId); | ||
| if (failure) { UC_ERROR("Transfer task({}) failed.", taskId); } | ||
| return failure ? Status::Error() : Status::OK(); | ||
| } | ||
|
|
||
| Status DramTsfTaskManager::Check(const size_t taskId, bool& finish) | ||
| { | ||
| std::lock_guard<std::mutex> lk(this->_mutex); | ||
| auto iter = this->_waiters.find(taskId); | ||
| if (iter == this->_waiters.end()) { return Status::NotFound(); } | ||
| finish = iter->second->Finish(); | ||
| return Status::OK(); | ||
| } | ||
|
|
||
| void DramTsfTaskManager::Dispatch(std::list<DramTsfTask>& tasks, std::vector<std::list<DramTsfTask>>& targets, | ||
| const size_t taskId, std::shared_ptr<DramTsfTaskWaiter> waiter) const | ||
| { | ||
| auto qNumber = this->_queues.size(); | ||
| auto index = size_t(0); | ||
| targets.resize(qNumber); | ||
| auto it = tasks.begin(); | ||
| while (it != tasks.end()) { | ||
| auto next = std::next(it); | ||
| it->owner = taskId; | ||
| it->waiter = waiter; | ||
| auto& target = targets[index % qNumber]; | ||
| target.splice(target.end(), tasks, it); | ||
| index++; | ||
| it = next; | ||
| } | ||
| } | ||
|
|
||
| } // namespace UC |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,60 @@ | ||
| /** | ||
| * MIT License | ||
| * | ||
| * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. | ||
| * | ||
| * Permission is hereby granted, free of charge, to any person obtaining a copy | ||
| * of this software and associated documentation files (the "Software"), to deal | ||
| * in the Software without restriction, including without limitation the rights | ||
| * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell | ||
| * copies of the Software, and to permit persons to whom the Software is | ||
| * furnished to do so, subject to the following conditions: | ||
| * | ||
| * The above copyright notice and this permission notice shall be included in all | ||
| * copies or substantial portions of the Software. | ||
| * | ||
| * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR | ||
| * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, | ||
| * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE | ||
| * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER | ||
| * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | ||
| * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE | ||
| * SOFTWARE. | ||
| * */ | ||
| #ifndef UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H | ||
| #define UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H | ||
|
|
||
| #include <memory> | ||
| #include <unordered_map> | ||
| #include <vector> | ||
| #include "dram_tsf_task_queue.h" | ||
|
|
||
| namespace UC { | ||
|
|
||
| class DramTsfTaskManager { | ||
| public: | ||
| Status Setup(const int32_t deviceId, const size_t streamNumber, | ||
| // const size_t bufferSize, const size_t bufferNumber, // 这两个可能不需要,对于dram来说,因为这个buffer是用来数据中转的? | ||
| const size_t timeoutMs, MemoryPool* memPool); | ||
| Status Submit(std::list<DramTsfTask>& tasks, const size_t size, const size_t number, | ||
| const std::string& brief, size_t& taskId); | ||
| Status Wait(const size_t taskId); | ||
| Status Check(const size_t taskId, bool& finish); | ||
|
|
||
| private: | ||
| void Dispatch(std::list<DramTsfTask>& tasks, std::vector<std::list<DramTsfTask>>& targets, | ||
| const size_t taskId, std::shared_ptr<DramTsfTaskWaiter> waiter) const; | ||
|
|
||
| private: | ||
| std::mutex _mutex; | ||
| DramTsfTaskSet _failureSet; | ||
| std::unordered_map<size_t, std::shared_ptr<DramTsfTaskWaiter>> _waiters; | ||
| std::vector<std::unique_ptr<DramTsfTaskQueue>> _queues; | ||
| size_t _qIdx{0}; | ||
| size_t _taskIdSeed{0}; | ||
| size_t _timeoutMs{0}; | ||
| }; | ||
|
|
||
| } // namespace UC | ||
|
|
||
| #endif |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can get blocksize here from vllm.config?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes you are right, but currently blockSize is not added into
config. If possible, it will be added into the Config class soon.