Skip to content

Commit 9bf6dad

Browse files
committed
Cleanup threadpool
1 parent 1acd93e commit 9bf6dad

File tree

2 files changed

+29
-37
lines changed

2 files changed

+29
-37
lines changed

common/threadpool.hpp

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,17 +11,8 @@
1111

1212
namespace ton {
1313

14-
class ThreadPool {
15-
public:
16-
ThreadPool(size_t num_threads = std::thread::hardware_concurrency()) = delete;
17-
~ThreadPool() = delete;
18-
19-
template <typename Iter, typename OIter>
20-
static void invoke_task_group(Iter tasks_beg, Iter tasks_end, OIter res_beg, size_t num_threads = 0);
21-
};
22-
23-
template <typename Iter, typename OIter>
24-
void ThreadPool::invoke_task_group(Iter tasks_beg, Iter tasks_end, OIter res_beg, size_t num_threads) {
14+
template <typename Iter, typename OIter>
15+
void invoke_task_group(Iter tasks_beg, Iter tasks_end, OIter res_beg, size_t num_threads = 0) {
2516
if (num_threads == 0) {
2617
num_threads = std::thread::hardware_concurrency();
2718
}
@@ -34,14 +25,14 @@ void ThreadPool::invoke_task_group(Iter tasks_beg, Iter tasks_end, OIter res_beg
3425
}
3526
num_threads = std::min(num_threads, n);
3627
std::atomic_size_t cur_pos{0};
37-
std::mutex panic_mutex;
28+
std::mutex error_mutex;
3829
std::optional<std::exception_ptr> error;
3930

4031
{
4132
std::vector<std::thread> workers;
4233
workers.reserve(num_threads);
4334
for (size_t id = 0; id < num_threads && cur_pos.load() < n; id++) {
44-
workers.emplace_back([&cur_pos, n, in, out, &error, &panic_mutex]() -> void {
35+
workers.emplace_back([&cur_pos, n, in, out, &error, &error_mutex]() -> void {
4536
while (true) {
4637
size_t pos = cur_pos.fetch_add(1);
4738
if (pos >= n) {
@@ -50,14 +41,14 @@ void ThreadPool::invoke_task_group(Iter tasks_beg, Iter tasks_end, OIter res_beg
5041
try {
5142
out[pos] = in[pos]();
5243
} catch (...) {
53-
std::lock_guard panic_lock(panic_mutex);
44+
std::lock_guard error_lock(error_mutex);
5445
error = std::current_exception();
5546
}
5647
}
5748
});
5849
}
5950

60-
for (auto &worker : workers) {
51+
for (auto& worker : workers) {
6152
worker.join();
6253
}
6354
}
@@ -71,4 +62,4 @@ void ThreadPool::invoke_task_group(Iter tasks_beg, Iter tasks_end, OIter res_beg
7162
}
7263
}
7364

74-
}
65+
} // namespace ton

validator/impl/validate-query.cpp

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6168,29 +6168,30 @@ bool ValidateQuery::check_transactions() {
61686168
LOG(INFO) << "checking all transactions";
61696169
std::deque<StdSmcAddress> account_addresses;
61706170
std::deque<CheckAccountTxsCtx> account_contexts;
6171-
std::vector<std::function<unsigned char /*bool*/ ()>> account_tasks;
6172-
6173-
account_blocks_dict_->check_for_each_extra([this, &account_addresses, &account_contexts, &account_tasks] (Ref<vm::CellSlice> value, Ref<vm::CellSlice> extra, td::ConstBitPtr key, int key_len) {
6174-
CHECK(key_len == 256);
6175-
StdSmcAddress address = key;
6176-
account_addresses.push_back(address);
6177-
6178-
account_contexts.emplace_back();
6179-
CheckAccountTxsCtx& ctx = account_contexts.back();
6180-
if (account_expected_defer_all_messages_.count(address)) {
6181-
ctx.defer_all_messages = true;
6182-
}
6171+
std::vector<std::function<bool()>> account_tasks;
6172+
6173+
account_blocks_dict_->check_for_each_extra(
6174+
[this, &account_addresses, &account_contexts, &account_tasks](Ref<vm::CellSlice> value, Ref<vm::CellSlice> extra,
6175+
td::ConstBitPtr key, int key_len) {
6176+
CHECK(key_len == 256);
6177+
StdSmcAddress address = key;
6178+
account_addresses.push_back(address);
6179+
6180+
account_contexts.emplace_back();
6181+
CheckAccountTxsCtx& ctx = account_contexts.back();
6182+
if (account_expected_defer_all_messages_.count(address)) {
6183+
ctx.defer_all_messages = true;
6184+
}
61836185

6184-
account_tasks.emplace_back([this, address, &ctx, acc_tr = std::move(value)] {
6185-
unsigned char result = check_account_transactions_ts(address, acc_tr, ctx);
6186-
return result;
6187-
});
6188-
return true;
6189-
});
6186+
account_tasks.emplace_back([this, address, &ctx, acc_tr = std::move(value)] {
6187+
return check_account_transactions_ts(address, acc_tr, ctx);
6188+
});
6189+
return true;
6190+
});
61906191

61916192
try {
6192-
std::vector<unsigned char /*bool*/> account_results(account_tasks.size(), false);
6193-
ThreadPool::invoke_task_group(account_tasks.begin(), account_tasks.end(), account_results.begin());
6193+
std::vector<int> account_results(account_tasks.size(), false);
6194+
invoke_task_group(account_tasks.begin(), account_tasks.end(), account_results.begin());
61946195
for (auto& ok : account_results) {
61956196
if (!ok) {
61966197
return false;
@@ -6203,7 +6204,7 @@ bool ValidateQuery::check_transactions() {
62036204
}
62046205

62056206
for (size_t pos = 0; pos < account_addresses.size(); pos++) {
6206-
auto &ctx = account_contexts[pos];
6207+
auto& ctx = account_contexts[pos];
62076208

62086209
for (auto& e : ctx.msg_proc_lt) {
62096210
msg_proc_lt_.emplace_back(std::move(e));

0 commit comments

Comments
 (0)