Skip to content

Commit aa4de13

Browse files
committed
[native] Make initializeThreadPools() virtual
Summary: Make this method virtual such that we allow additional thread pool customizations (such as thread pool monitoring) for implementing servers. Differential Revision: D79104048
1 parent 66e4b58 commit aa4de13

File tree

2 files changed

+41
-41
lines changed

2 files changed

+41
-41
lines changed

presto-native-execution/presto_cpp/main/PrestoServer.cpp

Lines changed: 39 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -401,45 +401,6 @@ void PrestoServer::run() {
401401
registerPrestoPlanNodeSerDe();
402402
registerDynamicFunctions();
403403

404-
const auto numExchangeHttpClientIoThreads = std::max<size_t>(
405-
systemConfig->exchangeHttpClientNumIoThreadsHwMultiplier() *
406-
std::thread::hardware_concurrency(),
407-
1);
408-
exchangeHttpIoExecutor_ = std::make_shared<folly::IOThreadPoolExecutor>(
409-
numExchangeHttpClientIoThreads,
410-
std::make_shared<folly::NamedThreadFactory>("ExchangeIO"));
411-
412-
PRESTO_STARTUP_LOG(INFO) << "Exchange Http IO executor '"
413-
<< exchangeHttpIoExecutor_->getName() << "' has "
414-
<< exchangeHttpIoExecutor_->numThreads()
415-
<< " threads.";
416-
for (auto evb : exchangeHttpIoExecutor_->getAllEventBases()) {
417-
evb->setMaxLatency(
418-
std::chrono::milliseconds(systemConfig->exchangeIoEvbViolationThresholdMs()),
419-
[]() { RECORD_METRIC_VALUE(kCounterExchangeIoEvbViolation, 1); },
420-
/*dampen=*/false);
421-
}
422-
423-
const auto numExchangeHttpClientCpuThreads = std::max<size_t>(
424-
systemConfig->exchangeHttpClientNumCpuThreadsHwMultiplier() *
425-
std::thread::hardware_concurrency(),
426-
1);
427-
428-
exchangeHttpCpuExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(
429-
numExchangeHttpClientCpuThreads,
430-
std::make_shared<folly::NamedThreadFactory>("ExchangeCPU"));
431-
432-
PRESTO_STARTUP_LOG(INFO) << "Exchange Http CPU executor '"
433-
<< exchangeHttpCpuExecutor_->getName() << "' has "
434-
<< exchangeHttpCpuExecutor_->numThreads()
435-
<< " threads.";
436-
437-
if (systemConfig->exchangeEnableConnectionPool()) {
438-
PRESTO_STARTUP_LOG(INFO) << "Enable exchange Http Client connection pool.";
439-
exchangeSourceConnectionPool_ =
440-
std::make_unique<http::HttpClientConnectionPool>();
441-
}
442-
443404
facebook::velox::exec::ExchangeSource::registerFactory(
444405
[this](
445406
const std::string& taskId,
@@ -855,6 +816,45 @@ void PrestoServer::initializeThreadPools() {
855816
numSpillerCpuThreads,
856817
std::make_shared<folly::NamedThreadFactory>("Spiller"));
857818
}
819+
820+
const auto numExchangeHttpClientIoThreads = std::max<size_t>(
821+
systemConfig->exchangeHttpClientNumIoThreadsHwMultiplier() *
822+
std::thread::hardware_concurrency(),
823+
1);
824+
exchangeHttpIoExecutor_ = std::make_shared<folly::IOThreadPoolExecutor>(
825+
numExchangeHttpClientIoThreads,
826+
std::make_shared<folly::NamedThreadFactory>("ExchangeIO"));
827+
828+
PRESTO_STARTUP_LOG(INFO) << "Exchange Http IO executor '"
829+
<< exchangeHttpIoExecutor_->getName() << "' has "
830+
<< exchangeHttpIoExecutor_->numThreads()
831+
<< " threads.";
832+
for (auto evb : exchangeHttpIoExecutor_->getAllEventBases()) {
833+
evb->setMaxLatency(
834+
std::chrono::milliseconds(systemConfig->exchangeIoEvbViolationThresholdMs()),
835+
[]() { RECORD_METRIC_VALUE(kCounterExchangeIoEvbViolation, 1); },
836+
/*dampen=*/false);
837+
}
838+
839+
const auto numExchangeHttpClientCpuThreads = std::max<size_t>(
840+
systemConfig->exchangeHttpClientNumCpuThreadsHwMultiplier() *
841+
std::thread::hardware_concurrency(),
842+
1);
843+
844+
exchangeHttpCpuExecutor_ = std::make_shared<folly::CPUThreadPoolExecutor>(
845+
numExchangeHttpClientCpuThreads,
846+
std::make_shared<folly::NamedThreadFactory>("ExchangeCPU"));
847+
848+
PRESTO_STARTUP_LOG(INFO) << "Exchange Http CPU executor '"
849+
<< exchangeHttpCpuExecutor_->getName() << "' has "
850+
<< exchangeHttpCpuExecutor_->numThreads()
851+
<< " threads.";
852+
853+
if (systemConfig->exchangeEnableConnectionPool()) {
854+
PRESTO_STARTUP_LOG(INFO) << "Enable exchange Http Client connection pool.";
855+
exchangeSourceConnectionPool_ =
856+
std::make_unique<http::HttpClientConnectionPool>();
857+
}
858858
}
859859

860860
std::unique_ptr<velox::cache::SsdCache> PrestoServer::setupSsdCache() {

presto-native-execution/presto_cpp/main/PrestoServer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ class PrestoServer {
126126

127127
virtual void initializeCoordinatorDiscoverer();
128128

129+
virtual void initializeThreadPools();
130+
129131
virtual std::shared_ptr<velox::exec::TaskListener> getTaskListener();
130132

131133
virtual std::shared_ptr<velox::exec::ExprSetListener> getExprSetListener();
@@ -194,8 +196,6 @@ class PrestoServer {
194196

195197
void initializeVeloxMemory();
196198

197-
void initializeThreadPools();
198-
199199
void registerStatsCounters();
200200

201201
protected:

0 commit comments

Comments
 (0)