Skip to content

Commit 81e5392

Browse files
committed
Add cancel() method to measurement
1 parent 6b34e90 commit 81e5392

File tree

3 files changed

+14
-3
lines changed

3 files changed

+14
-3
lines changed

SEFramework/SEFramework/Pipeline/Measurement.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
/** Copyright © 2019-2022 Université de Genève, LMU Munich - Faculty of Physics, IAP-CNRS/Sorbonne Université
1+
/**
2+
* Copyright © 2019-2022 Université de Genève, LMU Munich - Faculty of Physics, IAP-CNRS/Sorbonne Université
23
*
34
* This library is free software; you can redistribute it and/or modify it under
45
* the terms of the GNU Lesser General Public License as published by the Free
@@ -39,6 +40,7 @@ class Measurement : public PipelineReceiver<SourceGroupInterface>, public Pipeli
3940
virtual void startThreads() = 0;
4041
virtual void stopThreads() = 0;
4142
virtual void synchronizeThreads() = 0;
43+
virtual void cancel() = 0;
4244
};
4345

4446
}

SEImplementation/SEImplementation/Measurement/MultithreadedMeasurement.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ class MultithreadedMeasurement : public Measurement {
4646
, m_group_counter(0)
4747
, m_input_done(false)
4848
, m_abort_raised(false)
49+
, m_cancel(false)
4950
, m_semaphore(max_queue_size) {}
5051

5152
~MultithreadedMeasurement() override;
@@ -57,6 +58,10 @@ class MultithreadedMeasurement : public Measurement {
5758
void stopThreads() override;
5859
void synchronizeThreads() override;
5960

61+
void cancel() override {
62+
m_cancel = true;
63+
}
64+
6065
private:
6166
using QueuePair = std::pair<int, std::unique_ptr<SourceGroupInterface>>;
6267
// We want O(1) for the *lowest* value (received order)
@@ -70,7 +75,7 @@ class MultithreadedMeasurement : public Measurement {
7075
std::unique_ptr<std::thread> m_output_thread;
7176

7277
int m_group_counter;
73-
std::atomic_bool m_input_done, m_abort_raised;
78+
std::atomic_bool m_input_done, m_abort_raised, m_cancel;
7479

7580
std::condition_variable m_new_output;
7681
OutputQueue m_output_queue;

SEImplementation/src/lib/Measurement/MultithreadedMeasurement.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,10 @@ void MultithreadedMeasurement::receiveSource(std::unique_ptr<SourceGroupInterfac
8080
// Put the new SourceGroup into the input queue
8181
auto order_number = m_group_counter;
8282
auto lambda = [this, order_number, source_group = std::move(source_group)]() mutable {
83+
// Flush the queue without measurements on cancel
84+
if (m_cancel) {
85+
return;
86+
}
8387
// Trigger measurements
8488
for (const auto& source : *source_group) {
8589
m_source_to_row(source);
@@ -114,7 +118,7 @@ void MultithreadedMeasurement::outputThreadStatic(MultithreadedMeasurement* meas
114118
void MultithreadedMeasurement::outputThreadLoop() {
115119
int next_id = 0;
116120

117-
while (m_thread_pool->activeThreads() > 0) {
121+
while (m_thread_pool->activeThreads() > 0 && !m_cancel) {
118122
std::unique_lock<std::mutex> output_lock(m_output_queue_mutex);
119123

120124
// Wait for something in the output queue

0 commit comments

Comments
 (0)