diff --git a/ucm/store/dramstore/CMakeLists.txt b/ucm/store/dramstore/CMakeLists.txt index 53c9bce1..aceb3aa4 100644 --- a/ucm/store/dramstore/CMakeLists.txt +++ b/ucm/store/dramstore/CMakeLists.txt @@ -1,7 +1,10 @@ file(GLOB_RECURSE UCMSTORE_DRAM_CC_SOURCE_FILES "./cc/*.cc") add_library(dramstore STATIC ${UCMSTORE_DRAM_CC_SOURCE_FILES}) -target_include_directories(dramstore PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/cc) -target_link_libraries(dramstore PUBLIC storeinfra) +target_include_directories(dramstore PUBLIC + ${CMAKE_CURRENT_SOURCE_DIR}/cc/api + ${CMAKE_CURRENT_SOURCE_DIR}/cc/domain +) +target_link_libraries(dramstore PUBLIC storeinfra storedevice) file(GLOB_RECURSE UCMSTORE_DRAM_CPY_SOURCE_FILES "./cpy/*.cc") pybind11_add_module(ucmdramstore ${UCMSTORE_DRAM_CPY_SOURCE_FILES}) diff --git a/ucm/store/dramstore/cc/api/dramstore.cc b/ucm/store/dramstore/cc/api/dramstore.cc index 56b4350f..e601a9e8 100644 --- a/ucm/store/dramstore/cc/api/dramstore.cc +++ b/ucm/store/dramstore/cc/api/dramstore.cc @@ -24,27 +24,86 @@ #include "dramstore.h" #include "logger/logger.h" #include "status/status.h" +#include "tsf_task/dram_tsf_task_manager.h" +#include "tsf_task/dram_tsf_task.h" +#include "memory/memory_pool.h" namespace UC { class DRAMStoreImpl : public DRAMStore { public: - int32_t Setup(const size_t ioSize, const size_t capacity, const int32_t deviceId) { return -1; } - int32_t Alloc(const std::string& block) override { return -1; } - bool Lookup(const std::string& block) override { return false; } - void Commit(const std::string& block, const bool success) override {} + int32_t Setup(const Config& config) { + // config里之前没有blockSize,稳妥起见还是先打桩 + int32_t blockSize = 128; + this->memPool_ = std::make_unique(config.capacity, blockSize).release(); + // 初始化memPool的办法是否正确?如果失败的话怎么办? + // int32_t streamNumber = 60; // 这个参数是否需要,以及怎么传,还要讨论 + // int32_t timeoutMs = 10000; // 这个参数是否需要,以及怎么传,还要讨论 + auto status = this->transMgr_.Setup(config.deviceId, config.streamNumber, config.timeoutMs, this->memPool_); + if (status.Failure()) { + UC_ERROR("Failed({}) to setup DramTransferTaskManager.", status); + return status.Underlying(); + } + return Status::OK().Underlying(); + } + + int32_t Alloc(const std::string& block) override { + return this->memPool_->NewBlock(block).Underlying(); + } + + bool Lookup(const std::string& block) override { + return this->memPool_->LookupBlock(block); + } + + void Commit(const std::string& block, const bool success) override { + this->memPool_->CommitBlock(block, success).Underlying(); + } + std::list Alloc(const std::list& blocks) override { - return std::list(); + std::list results; + for (const auto &block : blocks) { + results.emplace_back(this->Alloc(block)); + } + return results; } + std::list Lookup(const std::list& blocks) override { - return std::list(); + std::list founds; + for (const auto &block : blocks) { + founds.emplace_back(this->Lookup(block)); + } + return founds; } - void Commit(const std::list& blocks, const bool success) override {} - size_t Submit(Task&& task) override { return 0; } - int32_t Wait(const size_t task) override { return -1; } - int32_t Check(const size_t task, bool& finish) override { return -1; } + + void Commit(const std::list& blocks, const bool success) override { + for (const auto &block : blocks) { + this->Commit(block, success); + } + } + + size_t Submit(Task&& task) override { + std::list tasks; + for (auto& shard : task.shards) { + tasks.push_back({task.type, shard.block, shard.offset, shard.address, task.size}); + } + size_t taskId; + return this->transMgr_.Submit(tasks, task.number * task.size, task.number, task.brief, taskId).Success() ? taskId : CCStore::invalidTaskId; + } + + int32_t Wait(const size_t task) override { + return this->transMgr_.Wait(task).Underlying(); + } + + int32_t Check(const size_t task, bool& finish) override { + return this->transMgr_.Check(task, finish).Underlying(); + } + +private: + // DramSpaceManager spaceMgr_; + MemoryPool* memPool_; + DramTsfTaskManager transMgr_; }; int32_t DRAMStore::Setup(const Config& config) @@ -55,7 +114,7 @@ int32_t DRAMStore::Setup(const Config& config) return Status::OutOfMemory().Underlying(); } this->impl_ = impl; - return impl->Setup(config.ioSize, config.capacity, config.deviceId); + return impl->Setup(config); } } // namespace UC diff --git a/ucm/store/dramstore/cc/api/dramstore.h b/ucm/store/dramstore/cc/api/dramstore.h index 1dc97573..b05c36d0 100644 --- a/ucm/store/dramstore/cc/api/dramstore.h +++ b/ucm/store/dramstore/cc/api/dramstore.h @@ -34,8 +34,10 @@ class DRAMStore : public CCStore { size_t ioSize; size_t capacity; int32_t deviceId; - Config(const size_t ioSize, const size_t capacity) - : ioSize{ioSize}, capacity{capacity}, deviceId{-1} + size_t streamNumber; + size_t timeoutMs; + Config(const size_t ioSize, const size_t capacity, const size_t streamNumber, const size_t timeoutMs) + : ioSize{ioSize}, capacity{capacity}, deviceId{-1}, streamNumber(streamNumber), timeoutMs(timeoutMs) { } }; diff --git a/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task.h b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task.h new file mode 100644 index 00000000..0d33b0f3 --- /dev/null +++ b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task.h @@ -0,0 +1,61 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DRAM_TSF_TASK_H +#define UNIFIEDCACHE_DRAM_TSF_TASK_H + +#include "ucmstore.h" +#include "dram_tsf_task_waiter.h" + +namespace UC { + +class DramTsfTask { +public: + using Type = CCStore::Task::Type; + // using Location = CCStore::Task::Location; // 这个不再需要了,因为只有D2H和H2D两种传输 + +public: + DramTsfTask(const Type type, const std::string& blockId, + const size_t offset, const uintptr_t address, const size_t length) + : type{type}, blockId{blockId}, offset{offset}, address{address}, + length{length}, owner{0}, waiter{nullptr} + { + } + DramTsfTask() : DramTsfTask{Type::DUMP, {}, 0, 0, 0} {} // 无参构造函数 + +public: + Type type; + // Location location; // 不需要了 + std::string blockId; // 对于一个task来说,这个bloockID和下一行的offset还是需要的,因为它们本质上是上层传来的。(参考dramstore.py.cc) + size_t offset; + uintptr_t address; // 在显卡上的地址 + size_t length; // 数据传输的长度 + + size_t owner; // 大的Task的TaskId + std::shared_ptr waiter; + // std::shared_ptr hub; // 在nfsstore中,这个的意思是中转站(对于nfsstore来说,host的目的就是数据中转),因此在dram_connector中不再需要 +}; + +} // namespace UC + +#endif diff --git a/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.cc b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.cc new file mode 100644 index 00000000..b3eb7c3f --- /dev/null +++ b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.cc @@ -0,0 +1,106 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "dram_tsf_task_manager.h" + +namespace UC { + +Status DramTsfTaskManager::Setup(const int32_t deviceId, const size_t streamNumber, + const size_t timeoutMs, MemoryPool* memPool) +{ + this->_queues.reserve(streamNumber); + for (size_t i = 0; i < streamNumber; ++i) { + auto& queue = this->_queues.emplace_back(std::make_unique()); + auto status = queue->Setup(deviceId, &this->_failureSet, memPool); + if (status.Failure()) { return status; } + } + this->_timeoutMs = timeoutMs; + return Status::OK(); +} + +Status DramTsfTaskManager::Submit(std::list& tasks, const size_t size, const size_t number, + const std::string& brief, size_t& taskId) +{ + std::unique_lock lk(this->_mutex); + taskId = ++this->_taskIdSeed; + auto [iter, success] = this->_waiters.emplace( + taskId, std::make_shared(taskId, size, number, brief)); + if (!success) { return Status::OutOfMemory(); } + std::vector> lists; + this->Dispatch(tasks, lists, taskId, iter->second); + for (size_t i = 0; i < lists.size(); i++) { + if (lists[i].empty()) { continue; } + this->_queues[this->_qIdx]->Push(lists[i]); + this->_qIdx = (this->_qIdx + 1) % this->_queues.size(); + } + return Status::OK(); +} + +Status DramTsfTaskManager::Wait(const size_t taskId) +{ + std::shared_ptr waiter = nullptr; + { + std::unique_lock lk(this->_mutex); + auto iter = this->_waiters.find(taskId); + if (iter == this->_waiters.end()) { return Status::NotFound(); } + waiter = iter->second; + this->_waiters.erase(iter); + } + if (!waiter->Wait(this->_timeoutMs)) { + this->_failureSet.Insert(taskId); + waiter->Wait(); + } + bool failure = this->_failureSet.Contains(taskId); + this->_failureSet.Remove(taskId); + if (failure) { UC_ERROR("Transfer task({}) failed.", taskId); } + return failure ? Status::Error() : Status::OK(); +} + +Status DramTsfTaskManager::Check(const size_t taskId, bool& finish) +{ + std::lock_guard lk(this->_mutex); + auto iter = this->_waiters.find(taskId); + if (iter == this->_waiters.end()) { return Status::NotFound(); } + finish = iter->second->Finish(); + return Status::OK(); +} + +void DramTsfTaskManager::Dispatch(std::list& tasks, std::vector>& targets, + const size_t taskId, std::shared_ptr waiter) const +{ + auto qNumber = this->_queues.size(); + auto index = size_t(0); + targets.resize(qNumber); + auto it = tasks.begin(); + while (it != tasks.end()) { + auto next = std::next(it); + it->owner = taskId; + it->waiter = waiter; + auto& target = targets[index % qNumber]; + target.splice(target.end(), tasks, it); + index++; + it = next; + } +} + +} // namespace UC diff --git a/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.h b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.h new file mode 100644 index 00000000..d33348ac --- /dev/null +++ b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_manager.h @@ -0,0 +1,60 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H +#define UNIFIEDCACHE_DRAM_TSF_TASK_MANAGER_H + +#include +#include +#include +#include "dram_tsf_task_queue.h" + +namespace UC { + +class DramTsfTaskManager { +public: + Status Setup(const int32_t deviceId, const size_t streamNumber, + // const size_t bufferSize, const size_t bufferNumber, // 这两个可能不需要,对于dram来说,因为这个buffer是用来数据中转的? + const size_t timeoutMs, MemoryPool* memPool); + Status Submit(std::list& tasks, const size_t size, const size_t number, + const std::string& brief, size_t& taskId); + Status Wait(const size_t taskId); + Status Check(const size_t taskId, bool& finish); + +private: + void Dispatch(std::list& tasks, std::vector>& targets, + const size_t taskId, std::shared_ptr waiter) const; + +private: + std::mutex _mutex; + DramTsfTaskSet _failureSet; + std::unordered_map> _waiters; + std::vector> _queues; + size_t _qIdx{0}; + size_t _taskIdSeed{0}; + size_t _timeoutMs{0}; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_queue.cc b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_queue.cc new file mode 100644 index 00000000..11c471fe --- /dev/null +++ b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_queue.cc @@ -0,0 +1,145 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ + +#include "dram_tsf_task_queue.h" + +namespace UC { + +#define UC_TASK_ERROR(s, t) \ + do { \ + UC_ERROR("Failed({}) to run task({},{},{},{}).", (s), (t).owner, (t).blockId, (t).offset, \ + (t).length); \ + } while (0) + +Status DramTsfTaskQueue::Setup(const int32_t deviceId, DramTsfTaskSet* failureSet, MemoryPool* memPool) +{ + this->_failureSet = failureSet; + this->_memPool = memPool; + if (deviceId >= 0) { + this->_device = DeviceFactory::Make(deviceId, 0, 0); // 这里不需要buffer,暂时都先传0吧 + if (!this->_device) { return Status::OutOfMemory(); } + } + if (!this->_streamOper.Setup([this](DramTsfTask& task) { this->StreamOper(task); }, + [this] { return this->_device->Setup().Success(); } )) { + return Status::Error(); + } + return Status::OK(); +} + +void DramTsfTaskQueue::Push(std::list& tasks) +{ + this->_streamOper.Push(tasks); +} + +void DramTsfTaskQueue::StreamOper(DramTsfTask& task) +{ + if (this->_failureSet->Contains(task.owner)) { + this->Done(task, false); + return; + } + if (task.type == DramTsfTask::Type::LOAD) { + this->H2D(task); + } else { + this->D2H(task); + } +} + +// 这个H2D和D2H函数是重点要重新实现的。 +void DramTsfTaskQueue::H2D(DramTsfTask& task) +{ + // TODO 这里地址要重新写逻辑 + auto block_addr = this->_memPool->GetAddress(task.blockId); + auto host_src = block_addr + task.offset; + if (!host_src) { + UC_TASK_ERROR(Status::Error(), task); + this->Done(task, false); + return; + } + auto status = this->_device->H2DAsync((std::byte*)task.address, (std::byte*)host_src, task.length); + if (status.Failure()) { + UC_TASK_ERROR(status, task); + this->Done(task, false); + return; + } + status = this->_device->AppendCallback([this, task](bool success) mutable { + if (!success) { UC_TASK_ERROR(Status::Error(), task); } + this->Done(task, success); + // 这里是否需要return? + }); + if (status.Failure()) { + UC_TASK_ERROR(status, task); + this->Done(task, false); + return; + } +} + +// 这个函数也是重点要重新实现的。 +void DramTsfTaskQueue::D2H(DramTsfTask& task) +{ + // TODO 这里地址要重新写逻辑 + auto block_addr = this->_memPool->GetAddress(task.blockId); + if (!block_addr) { + // 如果还没有,那么临时分配 + this->_memPool->NewBlock(task.blockId); + block_addr = this->_memPool->GetAddress(task.blockId); + if (!block_addr) { + UC_TASK_ERROR(Status::Error(), task); + this->Done(task, false); + return; + } + } + auto host_dst = block_addr + task.offset; + if (!host_dst) { + UC_TASK_ERROR(Status::Error(), task); + this->Done(task, false); + return; + } + auto status = this->_device->D2HAsync((std::byte*)host_dst, (std::byte*)task.address, task.length); + if (status.Failure()) { + UC_TASK_ERROR(status, task); + this->Done(task, false); + return; + } + status = this->_device->AppendCallback([this, task](bool success) mutable { + if (!success) { + UC_TASK_ERROR(Status::Error(), task); + this->Done(task, false); + return; // 这里是否需要return? + } + this->Done(task, true); + }); + if (status.Failure()) { + UC_TASK_ERROR(status, task); + this->Done(task, false); + return; + } +} + +void DramTsfTaskQueue::Done(const DramTsfTask& task, bool success) +{ + if (!success) { this->_failureSet->Insert(task.owner); } + task.waiter->Done(); +} + +} // namespace UC diff --git a/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_queue.h b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_queue.h new file mode 100644 index 00000000..ebcc84f8 --- /dev/null +++ b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_queue.h @@ -0,0 +1,56 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DRAM_TSF_TAKS_QUEUE_H +#define UNIFIEDCACHE_DRAM_TSF_TAKS_QUEUE_H + +#include "idevice.h" +#include "thread/thread_pool.h" +#include "dram_tsf_task.h" +#include "dram_tsf_task_set.h" +#include "memory/memory_pool.h" + +namespace UC { + +class DramTsfTaskQueue { +public: + Status Setup(const int32_t deviceId, + DramTsfTaskSet* failureSet, MemoryPool* memPool); + void Push(std::list& tasks); + +private: + void StreamOper(DramTsfTask& task); + void H2D(DramTsfTask& task); + void D2H(DramTsfTask& task); + void Done(const DramTsfTask& task, bool success); + +private: + ThreadPool _streamOper; + std::unique_ptr _device; + DramTsfTaskSet* _failureSet; + MemoryPool* _memPool; +}; + +} // namespace UC + +#endif diff --git a/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_set.h b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_set.h new file mode 100644 index 00000000..655ee1d0 --- /dev/null +++ b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_set.h @@ -0,0 +1,35 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DRAM_TSF_TASK_SET_H +#define UNIFIEDCACHE_DRAM_TSF_TASK_SET_H + +#include "template/hashset.h" + +namespace UC { + +class DramTsfTaskSet : public HashSet {}; + +} // namespace UC + +#endif \ No newline at end of file diff --git a/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_waiter.h b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_waiter.h new file mode 100644 index 00000000..61d920af --- /dev/null +++ b/ucm/store/dramstore/cc/domain/tsf_task/dram_tsf_task_waiter.h @@ -0,0 +1,84 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_DRAM_TSF_TASK_WAITER_H +#define UNIFIEDCACHE_DRAM_TSF_TASK_WAITER_H + +#include "logger/logger.h" +#include "thread/latch.h" +#include "time/stopwatch.h" + +namespace UC { + +class DramTsfTaskWaiter : public Latch { +public: + DramTsfTaskWaiter(const size_t id, const size_t size, const size_t number, const std::string& brief) + : Latch{number}, id_{id}, size_{size}, number_{number}, brief_{brief} + { + } + void Done() + { + if (Latch::Done() == 0) { + auto elapsed = this->sw_.Elapsed().count(); + UC_DEBUG("Task({},{},{},{}) finished, elapsed={:.06f}s, bw={:.06f}GB/s.", this->id_, + this->brief_, this->number_, this->size_, elapsed, + this->size_ / elapsed / (1ULL << 30)); + this->Notify(); + } + } + using Latch::Wait; + bool Wait(const size_t timeoutMs) + { + if (timeoutMs == 0) { + this->Wait(); + return true; + } + auto finish = false; + { + std::unique_lock lk(this->mutex_); + if (this->counter_ == 0) { return true; } + auto elapsed = (size_t)this->sw_.ElapsedMs().count(); + if (elapsed < timeoutMs) { + finish = this->cv_.wait_for(lk, std::chrono::milliseconds(timeoutMs - elapsed), + [this] { return this->counter_ == 0; }); + } + } + if (!finish) { + UC_WARN("Task({},{},{},{}) timeout, elapsed={:.06f}s.", this->id_, this->brief_, + this->number_, this->size_, this->sw_.Elapsed().count()); + } + return finish; + } + bool Finish() { return this->counter_ == 0; } + +private: + size_t id_; + size_t size_; + size_t number_; + std::string brief_; + StopWatch sw_; +}; + +} // namespace UC + +#endif // UNIFIEDCACHE_DRAM_TSF_TASK_WAITER_H diff --git a/ucm/store/dramstore/cpy/dramstore.py.cc b/ucm/store/dramstore/cpy/dramstore.py.cc index 635e9144..2cf52382 100644 --- a/ucm/store/dramstore/cpy/dramstore.py.cc +++ b/ucm/store/dramstore/cpy/dramstore.py.cc @@ -21,7 +21,7 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. * */ -#include "api/dramstore.h" +#include "dramstore.h" #include namespace py = pybind11; @@ -100,10 +100,12 @@ PYBIND11_MODULE(ucmdramstore, module) module.attr("build_type") = UCM_BUILD_TYPE; auto store = py::class_(module, "DRAMStore"); auto config = py::class_(store, "Config"); - config.def(py::init(), py::arg("ioSize"), py::arg("capacity")); + config.def(py::init(), py::arg("ioSize"), py::arg("capacity"), py::arg("streamNumber"), py::arg("timeoutMs")); config.def_readwrite("ioSize", &UC::DRAMStorePy::Config::ioSize); config.def_readwrite("capacity", &UC::DRAMStorePy::Config::capacity); config.def_readwrite("deviceId", &UC::DRAMStorePy::Config::deviceId); + config.def_readwrite("streamNumber", &UC::DRAMStorePy::Config::streamNumber); + config.def_readwrite("timeoutMs", &UC::DRAMStorePy::Config::timeoutMs); store.def(py::init<>()); store.def("CCStoreImpl", &UC::DRAMStorePy::CCStoreImpl); store.def("Setup", &UC::DRAMStorePy::Setup); diff --git a/ucm/store/infra/CMakeLists.txt b/ucm/store/infra/CMakeLists.txt index f3e0ce72..7cc1a68d 100644 --- a/ucm/store/infra/CMakeLists.txt +++ b/ucm/store/infra/CMakeLists.txt @@ -10,11 +10,13 @@ endif() file(GLOB_RECURSE UCMSTORE_COMMON_STATUS_SOURCE_FILES "status/*.cc") file(GLOB_RECURSE UCMSTORE_COMMON_TEMPLATE_SOURCE_FILES "template/*.cc") file(GLOB_RECURSE UCMSTORE_COMMON_THREAD_SOURCE_FILES "thread/*.cc") +file(GLOB_RECURSE UCMSTORE_COMMON_MEMORY_SOURCE_FILES "memory/*.cc") target_sources(storeinfra PRIVATE ${UCMSTORE_COMMON_FILE_SOURCE_FILES}) target_sources(storeinfra PRIVATE ${UCMSTORE_COMMON_LOGGER_SOURCE_FILES}) target_sources(storeinfra PRIVATE ${UCMSTORE_COMMON_STATUS_SOURCE_FILES}) target_sources(storeinfra PRIVATE ${UCMSTORE_COMMON_TEMPLATE_SOURCE_FILES}) target_sources(storeinfra PRIVATE ${UCMSTORE_COMMON_THREAD_SOURCE_FILES}) +target_sources(storeinfra PRIVATE ${UCMSTORE_COMMON_MEMORY_SOURCE_FILES}) target_link_libraries(storeinfra PUBLIC fmt) if(LOGGER_BACKEND STREQUAL "spdlog") target_link_libraries(storeinfra PUBLIC spdlog) diff --git a/ucm/store/infra/memory/memory_pool.h b/ucm/store/infra/memory/memory_pool.h new file mode 100644 index 00000000..6106d27a --- /dev/null +++ b/ucm/store/infra/memory/memory_pool.h @@ -0,0 +1,157 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_MEMORY_POOL_H +#define UNIFIEDCACHE_MEMORY_POOL_H + +#include +#include +#include +#include +#include +#include +#include "status/status.h" + +namespace UC { + +class MemoryPool { +public: + MemoryPool(uint32_t capacity, uint32_t blockSize) + : pool_(new char[capacity]), + capacity_(capacity), + blockSize_(blockSize) { + if (!pool_) { + throw std::bad_alloc(); + } + uint32_t slotNum = capacity / blockSize; + for (uint32_t i = 0; i < slotNum; ++i) { + // 将所有槽位都预先占好,插入LRU队列中。 + std::string dummy = "__slot_" + std::to_string(i); + char* addr = pool_ + i * blockSize_; + lruList_.push_front(dummy); + lruIndex_[dummy] = lruList_.begin(); + addressMap_[dummy] = addr; + } + } + + ~MemoryPool() { + delete[] pool_; + } + + MemoryPool(const MemoryPool&) = delete; + MemoryPool& operator=(const MemoryPool&) = delete; + + Status NewBlock(const std::string& blockId) { + if (addressMap_.count(blockId)) { + return Status::DuplicateKey(); + } + if (lruList_.empty()) { + // 所有空间里的块都正在写,那么就不能够分配 + return Status::Error(); + } + char* addr = LRUEvictOne(); + addressMap_[blockId] = addr; + return Status::OK(); + } + + bool LookupBlock(const std::string& blockId) const { + return availableBlocks_.count(blockId); + } + + char* GetAddress(const std::string& blockId) const { + auto it = addressMap_.find(blockId); + return it == addressMap_.end() ? nullptr : it->second; + } + + Status CommitBlock(const std::string& blockId, bool success) { + if (success) { + availableBlocks_.insert(blockId); + touchUnsafe(blockId); + } else { + resetSpaceOfBlock(blockId); + } + return Status::OK(); + } + + // 单元测试用,外部应该用不到 + char* GetFirstAddr() { + return pool_; + } + +private: + char* pool_ = nullptr; + uint32_t capacity_; + uint32_t blockSize_; + + std::unordered_map addressMap_; + std::set availableBlocks_; + + using ListType = std::list; + ListType lruList_; + std::unordered_map lruIndex_; + + void touchUnsafe(const std::string& blockId) { + auto it = lruIndex_.find(blockId); + if (it != lruIndex_.end()) { + lruList_.splice(lruList_.begin(), lruList_, it->second); + } + else { + lruList_.push_front(blockId); // 访问一次,该块就是最近使用了的,所以放到LRU队列的头部。这就是一般LRU的逻辑 + lruIndex_[blockId] = lruList_.begin(); + } + } + + char* LRUEvictOne() { + const std::string& victim = lruList_.back(); + // 真实数据块,才从availableBlocks_中删掉 + if (victim.rfind("__slot_", 0) != 0) { + availableBlocks_.erase(victim); + } + char* addr = addressMap_[victim]; + addressMap_.erase(victim); + lruIndex_.erase(victim); + lruList_.pop_back(); + return addr; + } + + void resetSpaceOfBlock(const std::string& blockId) { + // availableBlocks_.erase(blockId); // 这句大概不需要? + auto it = addressMap_.find(blockId); + char* addr = it->second; + int32_t offset = static_cast(addr - pool_); + std::string dummy = "__slot_" + std::to_string(offset / blockSize_); + addressMap_.erase(blockId); + + auto lit = lruIndex_.find(blockId); + if (lit != lruIndex_.end()) { + lruList_.erase(lit->second); + lruIndex_.erase(lit); + } + lruList_.push_back(dummy); // 将一个块commit false后,回收之前分配的内存,并且要将其放到LRU队列的尾部(下次可以写的时候,要马上就写。因为该块的优先级高于已经写了的块) + lruIndex_[dummy] = std::prev(lruList_.end()); + addressMap_[dummy] = addr; + } +}; + +} // namespace UC +#endif \ No newline at end of file diff --git a/ucm/store/test/case/dramstore/dram_tsf_task_waiter_test.cc b/ucm/store/test/case/dramstore/dram_tsf_task_waiter_test.cc new file mode 100644 index 00000000..77df2355 --- /dev/null +++ b/ucm/store/test/case/dramstore/dram_tsf_task_waiter_test.cc @@ -0,0 +1,62 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include +#include +#include +#include "tsf_task/dram_tsf_task_waiter.h" + +class UCDramTsfTaskWaiterTest : public ::testing::Test {}; + +TEST_F(UCDramTsfTaskWaiterTest, TaskTimeout) +{ + UC::DramTsfTaskWaiter waiter{1, 1024, 1, "xxx"}; + auto fut = std::async([&] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + waiter.Done(); + }); + ASSERT_FALSE(waiter.Wait(1)); + fut.get(); + ASSERT_TRUE(waiter.Finish()); +} + +TEST_F(UCDramTsfTaskWaiterTest, TaskSuccess) +{ + UC::DramTsfTaskWaiter waiter{1, 1024, 1, "xxx"}; + auto fut = std::async([&] { waiter.Done(); }); + ASSERT_TRUE(waiter.Wait(1000)); + ASSERT_TRUE(waiter.Finish()); + fut.get(); +} + +TEST_F(UCDramTsfTaskWaiterTest, TaskTimeoutButSuccess) +{ + UC::DramTsfTaskWaiter waiter{1, 1024, 1, "xxx"}; + auto fut = std::async([&] { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + waiter.Done(); + }); + fut.get(); + ASSERT_TRUE(waiter.Finish()); + ASSERT_TRUE(waiter.Wait(1)); +} diff --git a/ucm/store/test/case/infra/mem_pool_test.cc b/ucm/store/test/case/infra/mem_pool_test.cc new file mode 100644 index 00000000..ede7f3cb --- /dev/null +++ b/ucm/store/test/case/infra/mem_pool_test.cc @@ -0,0 +1,136 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ + +#include "infra/memory/memory_pool.h" +#include + +class UCMemoryPoolTest : public ::testing::Test {}; + +TEST_F(UCMemoryPoolTest, NewBlockAllocateAndCommit) +{ + UC::MemoryPool memPool(10, 2); // 初始化内存池 + const std::string block1 = "block1"; + ASSERT_FALSE(memPool.LookupBlock(block1)); + ASSERT_EQ(memPool.GetAddress(block1), nullptr); + ASSERT_EQ(memPool.NewBlock(block1), UC::Status::OK()); + ASSERT_FALSE(memPool.LookupBlock(block1)); + ASSERT_NE(memPool.GetAddress(block1), nullptr); + ASSERT_EQ(memPool.NewBlock(block1), UC::Status::DuplicateKey()); + ASSERT_EQ(memPool.CommitBlock(block1, true), UC::Status::OK()); + ASSERT_TRUE(memPool.LookupBlock(block1)); +} + +TEST_F(UCMemoryPoolTest, EvictOldBlock) +{ + UC::MemoryPool memPool(10, 5); // 初始化内存池 + const std::string block1 = "block1"; + const std::string block2 = "block2"; + const std::string block3 = "block3"; + ASSERT_EQ(memPool.NewBlock(block1), UC::Status::OK()); + ASSERT_NE(memPool.GetAddress(block1), nullptr); + ASSERT_EQ(memPool.NewBlock(block2), UC::Status::OK()); + ASSERT_NE(memPool.GetAddress(block2), nullptr); + memPool.CommitBlock(block1, true); + memPool.CommitBlock(block2, true); + ASSERT_EQ(memPool.NewBlock(block3), UC::Status::OK()); + ASSERT_NE(memPool.GetAddress(block3), nullptr); + ASSERT_EQ(memPool.GetAddress(block1), nullptr); + ASSERT_NE(memPool.GetAddress(block2), nullptr); + ASSERT_FALSE(memPool.LookupBlock(block1)); + ASSERT_TRUE(memPool.LookupBlock(block2)); + ASSERT_FALSE(memPool.LookupBlock(block3)); +} + +TEST_F(UCMemoryPoolTest, OldBlockCommitFalse) +{ + UC::MemoryPool memPool(32, 8); // 初始化内存池 + const std::string block1 = "block1"; + const std::string block2 = "block2"; + const std::string block3 = "block3"; + const std::string block4 = "block4"; + const std::string block5 = "block5"; + ASSERT_EQ(memPool.NewBlock(block1), UC::Status::OK()); + ASSERT_NE(memPool.GetAddress(block1), nullptr); + ASSERT_EQ(memPool.NewBlock(block2), UC::Status::OK()); + ASSERT_NE(memPool.GetAddress(block2), nullptr); + ASSERT_EQ(memPool.NewBlock(block3), UC::Status::OK()); + ASSERT_NE(memPool.GetAddress(block3), nullptr); + memPool.CommitBlock(block1, true); + memPool.CommitBlock(block2, false); + ASSERT_TRUE(memPool.LookupBlock(block1)); + ASSERT_FALSE(memPool.LookupBlock(block2)); + ASSERT_FALSE(memPool.LookupBlock(block3)); + ASSERT_EQ(memPool.NewBlock(block4), UC::Status::OK()); + ASSERT_EQ(static_cast(memPool.GetAddress((block4)) - memPool.GetFirstAddr()), 8); + ASSERT_EQ(memPool.NewBlock(block5), UC::Status::OK()); + ASSERT_EQ(static_cast(memPool.GetAddress((block5)) - memPool.GetFirstAddr()), 24); + memPool.CommitBlock(block3, true); + memPool.CommitBlock(block4, true); + memPool.CommitBlock(block5, true); + ASSERT_TRUE(memPool.LookupBlock(block1)); + ASSERT_FALSE(memPool.LookupBlock(block2)); + ASSERT_TRUE(memPool.LookupBlock(block3)); + ASSERT_TRUE(memPool.LookupBlock(block4)); + ASSERT_TRUE(memPool.LookupBlock(block5)); + + ASSERT_EQ(memPool.NewBlock(block1), UC::Status::DuplicateKey()); + ASSERT_EQ(memPool.NewBlock(block2), UC::Status::OK()); + ASSERT_EQ(static_cast(memPool.GetAddress((block2)) - memPool.GetFirstAddr()), 0); + ASSERT_FALSE(memPool.LookupBlock(block1)); + ASSERT_FALSE(memPool.LookupBlock(block2)); + memPool.CommitBlock(block2, true); + ASSERT_TRUE(memPool.LookupBlock(block2)); +} + +TEST_F(UCMemoryPoolTest, NoCommittedBlock) +{ + UC::MemoryPool memPool(32, 8); // 初始化内存池 + const std::string block1 = "block1"; + const std::string block2 = "block2"; + const std::string block3 = "block3"; + const std::string block4 = "block4"; + const std::string block5 = "block5"; + const std::string block6 = "block6"; + ASSERT_EQ(memPool.NewBlock(block1), UC::Status::OK()); + ASSERT_EQ(memPool.NewBlock(block2), UC::Status::OK()); + ASSERT_EQ(memPool.NewBlock(block3), UC::Status::OK()); + ASSERT_EQ(memPool.NewBlock(block4), UC::Status::OK()); + ASSERT_EQ(memPool.NewBlock(block5), UC::Status::Error()); + memPool.CommitBlock(block1, true); + ASSERT_TRUE(memPool.LookupBlock(block1)); + ASSERT_EQ(memPool.NewBlock(block5), UC::Status::OK()); + ASSERT_EQ(static_cast(memPool.GetAddress((block5)) - memPool.GetFirstAddr()), 0); + ASSERT_FALSE(memPool.LookupBlock(block1)); + ASSERT_EQ(memPool.NewBlock(block6), UC::Status::Error()); + ASSERT_EQ(static_cast(memPool.GetAddress((block2)) - memPool.GetFirstAddr()), 8); + memPool.CommitBlock(block2, false); + ASSERT_EQ(memPool.GetAddress((block2)), nullptr); + ASSERT_FALSE(memPool.LookupBlock(block1)); + ASSERT_EQ(memPool.NewBlock(block6), UC::Status::OK()); + ASSERT_EQ(static_cast(memPool.GetAddress((block6)) - memPool.GetFirstAddr()), 8); + ASSERT_FALSE(memPool.LookupBlock(block6)); + memPool.CommitBlock(block6, true); + ASSERT_TRUE(memPool.LookupBlock(block6)); + ASSERT_EQ(static_cast(memPool.GetAddress((block6)) - memPool.GetFirstAddr()), 8); +} \ No newline at end of file