Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions src/thread_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -440,25 +440,31 @@ impl ThreadPool {
}
// The size decreased
cmp::Ordering::Less => {
let mut heartbeat_handle = None;

// Halt the heartbeat thread when scaling to zero.
if let Some(control) = state.managed_threads.heartbeat.take() {
if new_size == 0
&& let Some(control) = state.managed_threads.heartbeat.take()
{
control.halt.store(true, Ordering::Relaxed);
let _ = control.handle.join();
heartbeat_handle = Some(control.handle);
}

// Pull the workers we intend to halt out of the thread manager.
let terminating_workers = state.managed_threads.workers.split_off(new_size);

drop(state);

// Terminate the workers.
for worker in &terminating_workers {
// Tell the worker to halt.
worker.control.halt.store(true, Ordering::Relaxed);
}

// Wake any sleeping workers to ensure they will eventually see the termination notice.
// self.job_is_ready.notify_all();
for seat in &state.seats {
seat.data.sleep_controller.wake();
}

drop(state);

let own_lease = Worker::map_current(|worker| worker.lease.index);

Expand All @@ -470,6 +476,10 @@ impl ThreadPool {
let _ = worker.control.handle.join();
}
}

if let Some(handle) = heartbeat_handle {
let _ = handle.join();
}
}
}

Expand Down Expand Up @@ -906,7 +916,7 @@ impl Worker {
/// Cooperatively yields execution to the threadpool, allowing it to execute
/// some work.
///
/// Tis function may execute either local or shared work: work already
/// This function may execute either local or shared work: work already
/// queued on the worker, or work off-loaded by a different worker. If there
/// is no work on the pool, this will lock the thread-pool mutex, so it
/// should not be called within a hot loop. Consider using
Expand Down Expand Up @@ -1134,7 +1144,7 @@ impl Worker {
// the queue must point to `stack_job`, implying that
// `stack_job` cannot have been executed yet.
let a = unsafe { stack_job.unwrap() };
// Execute the closure directly and return the results. This is
// Execute the closure directly and return the results. This
// allows the compiler to inline and optimize `a`.
let result_a = a(self);
return (result_a, result_b);
Expand Down Expand Up @@ -1553,7 +1563,7 @@ fn heartbeat_loop(thread_pool: &'static ThreadPool, halt: Arc<AtomicBool>) {
queued_to_heartbeat = (seat_index + 1) % num_seats;
}

// Count every occupied slot, even if we didn't sent them a heartbeat.
// Count every occupied slot, even if we didn't send them a heartbeat.
num_occupied += 1;
}
}
Expand Down