From 7fd176cbb73420e47e65e430e819860372ff1b4b Mon Sep 17 00:00:00 2001 From: Zeljko Jovanovic <128973134+zesk1999@users.noreply.github.com> Date: Fri, 10 Oct 2025 13:02:36 +0200 Subject: [PATCH] Handle maximum deadline misses case (#6016) * Refs #23289. Handle maximum deadline misses case. Data writer implementation. Signed-off-by: zesk1999 * Refs #23289. Handle maximum deadline misses case. Data reader implementation. Signed-off-by: zesk1999 * Refs #23289. Handle maximum deadline misses case. Tests. Signed-off-by: zesk1999 --------- Signed-off-by: zesk1999 (cherry picked from commit 3230d1d7ffa0b70e0d3e4200887c7ec3d8edcafc) Signed-off-by: zesk1999 # Conflicts: # src/cpp/fastdds/publisher/DataWriterImpl.cpp # src/cpp/fastdds/publisher/DataWriterImpl.hpp # src/cpp/fastdds/subscriber/DataReaderImpl.cpp # test/blackbox/CMakeLists.txt # test/blackbox/common/BlackboxTestsDeadlineQos.cpp --- src/cpp/fastdds/publisher/DataWriterImpl.cpp | 146 ++++++++++---- src/cpp/fastdds/publisher/DataWriterImpl.hpp | 18 ++ src/cpp/fastdds/subscriber/DataReaderImpl.cpp | 169 +++++++++++----- src/cpp/fastdds/subscriber/DataReaderImpl.hpp | 12 ++ test/blackbox/CMakeLists.txt | 6 +- test/blackbox/api/dds-pim/PubSubReader.hpp | 38 +++- test/blackbox/api/dds-pim/PubSubWriter.hpp | 23 ++- .../api/fastrtps_deprecated/PubSubReader.hpp | 59 +++++- .../api/fastrtps_deprecated/PubSubWriter.hpp | 54 ++++- .../common/BlackboxTestsDeadlineQos.cpp | 185 +++++++++++++++++- test/utils/LogCounter.hpp | 149 ++++++++++++++ 11 files changed, 734 insertions(+), 125 deletions(-) create mode 100644 test/utils/LogCounter.hpp diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.cpp b/src/cpp/fastdds/publisher/DataWriterImpl.cpp index c1c871f45ab..d12c490eac1 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.cpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.cpp @@ -53,7 +53,7 @@ #ifdef FASTDDS_STATISTICS #include #include -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; @@ -397,12 +397,7 @@ ReturnCode_t DataWriterImpl::enable() // In case it has been loaded from the persistence DB, rebuild instances on history history_.rebuild_instances(); - deadline_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - qos_.deadline().period.to_ns() * 1e-6); + configure_deadline_timer_(); lifespan_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(), [&]() -> bool @@ -642,8 +637,8 @@ ReturnCode_t DataWriterImpl::check_write_preconditions( type_.get()->getKey(data, &instance_handle, is_key_protected); } - //Check if the Handle is different from the special value HANDLE_NIL and - //does not correspond with the instance referred by the data + // Check if the Handle is different from the special value HANDLE_NIL and + // does not correspond with the instance referred by the data if (handle.isDefined() && handle != instance_handle) { return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET; @@ -992,6 +987,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( } } + // new_change seeds the next per-instance deadline and reschedules the timer for the next sample CacheChange_t* ch = writer_->new_change(change_kind, handle); if (ch != nullptr) { @@ -1024,7 +1020,8 @@ ReturnCode_t DataWriterImpl::perform_create_new_change( return ReturnCode_t::RETCODE_TIMEOUT; } - if (qos_.deadline().period != c_TimeInfinite) + if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != c_TimeInfinite && + deadline_missed_status_.total_count < std::numeric_limits::max()) { if (!history_.set_next_deadline( handle, @@ -1135,7 +1132,7 @@ void DataWriterImpl::publisher_qos_updated() { if (writer_ != nullptr) { - //NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED + // NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos()); publisher_->rtps_participant()->updateWriter(writer_, get_topic_attributes(qos_, *topic_, type_), wqos); } @@ -1170,6 +1167,9 @@ ReturnCode_t DataWriterImpl::set_qos( return ReturnCode_t::RETCODE_IMMUTABLE_POLICY; } + // Take a snapshot of the current QoS before mutating it + const DataWriterQos old_qos = qos_; + set_qos(qos_, qos_to_set, !enabled); if (enabled) @@ -1185,33 +1185,30 @@ ReturnCode_t DataWriterImpl::set_qos( writer_->updateAttributes(w_att); } - //Notify the participant that a Writer has changed its QOS + // Notify the participant that a Writer has changed its QOS fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_); WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos()); publisher_->rtps_participant()->updateWriter(writer_, topic_att, wqos); - // Deadline - if (qos_.deadline().period != c_TimeInfinite) + // If the deadline period actually changed, (re)configure the timer. + if (old_qos.deadline().period != qos_.deadline().period) { - deadline_duration_us_ = - duration>(qos_.deadline().period.to_ns() * 1e-3); - deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); - } - else - { - deadline_timer_->cancel_timer(); + configure_deadline_timer_(); } // Lifespan - if (qos_.lifespan().duration != c_TimeInfinite) + if (old_qos.lifespan().duration != qos_.lifespan().duration) { - lifespan_duration_us_ = - duration>(qos_.lifespan().duration.to_ns() * 1e-3); - lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); - } - else - { - lifespan_timer_->cancel_timer(); + if (qos_.lifespan().duration != c_TimeInfinite) + { + lifespan_duration_us_ = + duration>(qos_.lifespan().duration.to_ns() * 1e-3); + lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); + } + else + { + lifespan_timer_->cancel_timer(); + } } } @@ -1283,7 +1280,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::INCOMPATIBLE_QOS); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1322,7 +1319,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::LIVELINESS_LOST); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1366,7 +1363,7 @@ void DataWriterImpl::InnerDataWriterListener::notify_status_observer( } } -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS ReturnCode_t DataWriterImpl::wait_for_acknowledgments( const Duration_t& max_wait) @@ -1459,10 +1456,12 @@ ReturnCode_t DataWriterImpl::get_publication_matched_status( bool DataWriterImpl::deadline_timer_reschedule() { - assert(qos_.deadline().period != c_TimeInfinite); - std::unique_lock lock(writer_->getMutex()); + assert(qos_.deadline().period != c_TimeInfinite); + assert(deadline_timer_ != nullptr); + assert(deadline_missed_status_.total_count < std::numeric_limits::max()); + steady_clock::time_point next_deadline_us; if (!history_.get_next_deadline(timer_owner_, next_deadline_us)) { @@ -1475,18 +1474,57 @@ bool DataWriterImpl::deadline_timer_reschedule() return true; } -bool DataWriterImpl::deadline_missed() +void DataWriterImpl::configure_deadline_timer_() { - assert(qos_.deadline().period != c_TimeInfinite); - std::unique_lock lock(writer_->getMutex()); - deadline_missed_status_.total_count++; - deadline_missed_status_.total_count_change++; - deadline_missed_status_.last_instance_handle = timer_owner_; + // Create the timer once + if (deadline_timer_ == nullptr) + { + deadline_timer_ = new TimedEvent( + publisher_->rtps_participant()->get_resource_event(), + [this]() -> bool + { + return deadline_missed(); + }, + // Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly + std::numeric_limits::max() + ); + } + + // Handle "infinite" and "zero" outside the callback + if (qos_.deadline().period == c_TimeInfinite) + { + deadline_duration_us_ = std::chrono::duration::max(); + deadline_timer_->cancel_timer(); + return; + } + + deadline_duration_us_ = + std::chrono::duration>(qos_.deadline().period.to_ns() * 1e-3); + + if (qos_.deadline().period.to_ns() == 0) + { + deadline_timer_->cancel_timer(); + + deadline_missed_status_.total_count = std::numeric_limits::max(); + deadline_missed_status_.total_count_change = std::numeric_limits::max(); + EPROSIMA_LOG_WARNING( + DATA_WRITER, + "Deadline period is 0, it will be ignored from now on."); + + // Bump once and notify listener exactly once. + notify_deadline_missed_nts_(); + return; + } + + deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); +} + +void DataWriterImpl::notify_deadline_missed_nts_() +{ StatusMask notify_status = StatusMask::offered_deadline_missed(); - auto listener = get_listener_for(notify_status); - if (nullptr != listener) + if (auto* listener = get_listener_for(notify_status)) { listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_); deadline_missed_status_.total_count_change = 0; @@ -1494,9 +1532,31 @@ bool DataWriterImpl::deadline_missed() #ifdef FASTDDS_STATISTICS writer_listener_.notify_status_observer(statistics::DEADLINE_MISSED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +bool DataWriterImpl::deadline_missed() +{ + std::unique_lock lock(writer_->getMutex()); + + assert(qos_.deadline().period != c_TimeInfinite); + + deadline_missed_status_.total_count++; + deadline_missed_status_.total_count_change++; + deadline_missed_status_.last_instance_handle = timer_owner_; + + notify_deadline_missed_nts_(); + + // If we just reached the max -> log ONCE, stop timer, and bail. + if (deadline_missed_status_.total_count == std::numeric_limits::max()) + { + EPROSIMA_LOG_WARNING(DATA_WRITER, + "Maximum number of deadline missed messages reached. Stopping deadline timer."); + deadline_timer_->cancel_timer(); + return false; // do not reschedule + } if (!history_.set_next_deadline( timer_owner_, diff --git a/src/cpp/fastdds/publisher/DataWriterImpl.hpp b/src/cpp/fastdds/publisher/DataWriterImpl.hpp index 98b6119b5df..80304226e3b 100644 --- a/src/cpp/fastdds/publisher/DataWriterImpl.hpp +++ b/src/cpp/fastdds/publisher/DataWriterImpl.hpp @@ -20,6 +20,7 @@ #define _FASTRTPS_DATAWRITERIMPL_HPP_ #include +#include #include #include @@ -578,6 +579,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter /** * @brief A method to reschedule the deadline timer + * @return true if deadline rescheduling succeeded, false otherwise */ bool deadline_timer_reschedule(); @@ -736,6 +738,22 @@ class DataWriterImpl : protected rtps::IReaderDataFilter private: + /** + * (Re)configures the deadline timer: + * In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and + * for non-infinite positive values store period. + */ + void configure_deadline_timer_(); + + /** + * Notifies listeners that a deadline has been missed. + */ + void notify_deadline_missed_nts_(); + + void create_history( + const std::shared_ptr& payload_pool, + const std::shared_ptr& change_pool); + DataWriterQos get_datawriter_qos_from_settings( const DataWriterQos& qos); diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp index 2d6f2561286..dde79de6873 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.cpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.cpp @@ -55,7 +55,7 @@ #ifdef FASTDDS_STATISTICS #include #include -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS using eprosima::fastrtps::RecursiveTimedMutex; using eprosima::fastrtps::c_TimeInfinite; @@ -260,12 +260,7 @@ ReturnCode_t DataReaderImpl::enable() reader_ = reader; - deadline_timer_ = new TimedEvent(subscriber_->get_participant()->get_resource_event(), - [&]() -> bool - { - return deadline_missed(); - }, - qos_.deadline().period.to_ns() * 1e-6); + configure_deadline_timer_(); lifespan_timer_ = new TimedEvent(subscriber_->get_participant()->get_resource_event(), [&]() -> bool @@ -341,7 +336,7 @@ void DataReaderImpl::stop() DataReaderImpl::~DataReaderImpl() { - // assert there are no pending conditions + // Assert there are no pending conditions assert(read_conditions_.empty()); // Disable the datareader to prevent receiving data in the middle of deleting it @@ -868,6 +863,9 @@ ReturnCode_t DataReaderImpl::set_qos( return ReturnCode_t::RETCODE_IMMUTABLE_POLICY; } + // Take a snapshot of the current QoS before mutating it + const DataReaderQos old_qos = qos_; + set_qos(qos_, qos_to_set, !enabled); if (enabled) @@ -875,27 +873,25 @@ ReturnCode_t DataReaderImpl::set_qos( // NOTIFY THE BUILTIN PROTOCOLS THAT THE READER HAS CHANGED update_rtps_reader_qos(); - // Deadline - if (qos_.deadline().period != c_TimeInfinite) + // If the deadline period actually changed, (re)configure the timer. + if (old_qos.deadline().period != qos_.deadline().period) { - deadline_duration_us_ = duration>(qos_.deadline().period.to_ns() * 1e-3); - deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); - } - else - { - deadline_timer_->cancel_timer(); + configure_deadline_timer_(); } // Lifespan - if (qos_.lifespan().duration != c_TimeInfinite) + if (old_qos.lifespan().duration != qos_.lifespan().duration) { - lifespan_duration_us_ = - std::chrono::duration>(qos_.lifespan().duration.to_ns() * 1e-3); - lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); - } - else - { - lifespan_timer_->cancel_timer(); + if (qos_.lifespan().duration != c_TimeInfinite) + { + lifespan_duration_us_ = + std::chrono::duration>(qos_.lifespan().duration.to_ns() * 1e-3); + lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6); + } + else + { + lifespan_timer_->cancel_timer(); + } } } @@ -978,7 +974,7 @@ void DataReaderImpl::InnerDataReaderListener::on_liveliness_changed( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::LIVELINESS_CHANGED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1001,7 +997,7 @@ void DataReaderImpl::InnerDataReaderListener::on_requested_incompatible_qos( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::INCOMPATIBLE_QOS); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1024,7 +1020,7 @@ void DataReaderImpl::InnerDataReaderListener::on_sample_lost( #ifdef FASTDDS_STATISTICS notify_status_observer(statistics::SAMPLE_LOST); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS data_reader_->user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); } @@ -1058,12 +1054,12 @@ void DataReaderImpl::InnerDataReaderListener::notify_status_observer( { if (!statistics_pp_impl->get_status_observer()->on_local_entity_status_change(data_reader_->guid(), status_id)) { - EPROSIMA_LOG_ERROR(DATA_WRITER, "Could not set entity status"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set entity status"); } } } -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS bool DataReaderImpl::on_data_available( const GUID_t& writer_guid, @@ -1102,13 +1098,14 @@ bool DataReaderImpl::on_new_cache_change_added( return false; } - if (qos_.deadline().period != c_TimeInfinite) + if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != c_TimeInfinite && + deadline_missed_status_.total_count < std::numeric_limits::max()) { if (!history_.set_next_deadline( change->instanceHandle, steady_clock::now() + duration_cast(deadline_duration_us_))) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set next deadline in the history"); } else if (timer_owner_ == change->instanceHandle || timer_owner_ == InstanceHandle_t()) { @@ -1150,7 +1147,7 @@ bool DataReaderImpl::on_new_cache_change_added( } else { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "A change was added to history that could not be retrieved"); + EPROSIMA_LOG_ERROR(DATA_READER, "A change was added to history that could not be retrieved"); } // Update and restart the timer @@ -1203,14 +1200,16 @@ ReturnCode_t DataReaderImpl::get_subscription_matched_status( bool DataReaderImpl::deadline_timer_reschedule() { - assert(qos_.deadline().period != c_TimeInfinite); - std::unique_lock lock(reader_->getMutex()); + assert(qos_.deadline().period != c_TimeInfinite); + assert(deadline_timer_ != nullptr); + assert(deadline_missed_status_.total_count < std::numeric_limits::max()); + steady_clock::time_point next_deadline_us; if (!history_.get_next_deadline(timer_owner_, next_deadline_us)) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not get the next deadline from the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not get the next deadline from the history"); return false; } auto interval_ms = duration_cast(next_deadline_us - steady_clock::now()); @@ -1219,15 +1218,55 @@ bool DataReaderImpl::deadline_timer_reschedule() return true; } -bool DataReaderImpl::deadline_missed() +void DataReaderImpl::configure_deadline_timer_() { - assert(qos_.deadline().period != c_TimeInfinite); - std::unique_lock lock(reader_->getMutex()); - deadline_missed_status_.total_count++; - deadline_missed_status_.total_count_change++; - deadline_missed_status_.last_instance_handle = timer_owner_; + // Create the timer once + if (deadline_timer_ == nullptr) + { + deadline_timer_ = new TimedEvent( + subscriber_->rtps_participant()->get_resource_event(), + [this]() -> bool + { + return deadline_missed(); + }, + // Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly + std::numeric_limits::max() + ); + } + + // Handle "infinite" and "zero" outside the callback + if (qos_.deadline().period == c_TimeInfinite) + { + deadline_duration_us_ = std::chrono::duration::max(); + deadline_timer_->cancel_timer(); + return; + } + + deadline_duration_us_ = + std::chrono::duration>(qos_.deadline().period.to_ns() * 1e-3); + + if (qos_.deadline().period.to_ns() == 0) + { + deadline_timer_->cancel_timer(); + + deadline_missed_status_.total_count = std::numeric_limits::max(); + deadline_missed_status_.total_count_change = std::numeric_limits::max(); + EPROSIMA_LOG_WARNING( + DATA_READER, + "Deadline period is 0, it will be ignored from now on."); + + // Bump once and notify listener exactly once. + notify_deadline_missed_nts_(); + return; + } + + deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6); +} + +void DataReaderImpl::notify_deadline_missed_nts_() +{ StatusMask notify_status = StatusMask::requested_deadline_missed(); auto listener = get_listener_for(notify_status); if (nullptr != listener) @@ -1238,15 +1277,37 @@ bool DataReaderImpl::deadline_missed() #ifdef FASTDDS_STATISTICS reader_listener_.notify_status_observer(statistics::DEADLINE_MISSED); -#endif //FASTDDS_STATISTICS +#endif // FASTDDS_STATISTICS user_datareader_->get_statuscondition().get_impl()->set_status(notify_status, true); +} + +bool DataReaderImpl::deadline_missed() +{ + std::unique_lock lock(reader_->getMutex()); + + assert(qos_.deadline().period != c_TimeInfinite); + + deadline_missed_status_.total_count++; + deadline_missed_status_.total_count_change++; + deadline_missed_status_.last_instance_handle = timer_owner_; + + notify_deadline_missed_nts_(); + + // If we just reached the max -> log ONCE, stop timer, and bail + if (deadline_missed_status_.total_count == std::numeric_limits::max()) + { + EPROSIMA_LOG_WARNING(DATA_READER, + "Maximum number of deadline missed messages reached. Stopping deadline timer."); + deadline_timer_->cancel_timer(); + return false; // do not reschedule + } if (!history_.set_next_deadline( timer_owner_, steady_clock::now() + duration_cast(deadline_duration_us_), true)) { - EPROSIMA_LOG_ERROR(SUBSCRIBER, "Could not set next deadline in the history"); + EPROSIMA_LOG_ERROR(DATA_READER, "Could not set next deadline in the history"); return false; } return deadline_timer_reschedule(); @@ -1913,7 +1974,7 @@ ReturnCode_t DataReaderImpl::check_datasharing_compatible( return ReturnCode_t::RETCODE_OK; break; default: - EPROSIMA_LOG_ERROR(DATA_WRITER, "Unknown data sharing kind."); + EPROSIMA_LOG_ERROR(DATA_READER, "Unknown data sharing kind."); return ReturnCode_t::RETCODE_BAD_PARAMETER; } } @@ -1945,14 +2006,14 @@ ReturnCode_t DataReaderImpl::delete_contained_entities() // Check pending ReadConditions for (detail::ReadConditionImpl* impl : read_conditions_) { - // should be alive + // Should be alive auto keep_alive = impl->shared_from_this(); assert((bool)keep_alive); - // free ReadConditions + // Free ReadConditions impl->detach_all_conditions(); } - // release the colection + // Release the collection read_conditions_.clear(); return ReturnCode_t::RETCODE_OK; @@ -2062,12 +2123,12 @@ ReadCondition* DataReaderImpl::create_readcondition( if (it != read_conditions_.end()) { - // already there + // Already there impl = (*it)->shared_from_this(); } else { - // create a new one + // Create a new one impl = std::make_shared(*this, key); impl->set_trigger_value(current_mask); // Add the implementation object to the collection @@ -2078,7 +2139,7 @@ ReadCondition* DataReaderImpl::create_readcondition( ReadCondition* cond = new ReadCondition(); auto ret_code = impl->attach_condition(cond); - // attach cannot fail in this scenario + // Attach cannot fail in this scenario assert(!!ret_code); (void)ret_code; @@ -2114,7 +2175,7 @@ ReturnCode_t DataReaderImpl::delete_readcondition( # ifdef __cpp_lib_enable_shared_from_this std::weak_ptr wp = impl->weak_from_this(); # else - // remove when C++17 is enforced + // Remove when C++17 is enforced auto wp = std::weak_ptr(impl->shared_from_this()); # endif // ifdef __cpp_lib_enable_shared_from_this @@ -2123,10 +2184,10 @@ ReturnCode_t DataReaderImpl::delete_readcondition( if (!!ret_code) { - // delete the condition + // Delete the condition delete a_condition; - // check if we must remove the implementation object + // Check if we must remove the implementation object if (wp.expired()) { read_conditions_.erase(it); @@ -2171,7 +2232,7 @@ void DataReaderImpl::try_notify_read_conditions() noexcept last_mask_state_.instance_states & ~old_mask.instance_states; } - // traverse the conditions notifying + // Traverse the conditions notifying std::lock_guard _(get_conditions_mutex()); for (detail::ReadConditionImpl* impl : read_conditions_) { diff --git a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp index 857a336a2cf..49be7c0cd58 100644 --- a/src/cpp/fastdds/subscriber/DataReaderImpl.hpp +++ b/src/cpp/fastdds/subscriber/DataReaderImpl.hpp @@ -647,6 +647,18 @@ class DataReaderImpl private: + /** + * (Re)configures the deadline timer: + * In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and + * for non-infinite positive values store period. + */ + void configure_deadline_timer_(); + + /** + * Notifies listeners that a deadline has been missed. + */ + void notify_deadline_missed_nts_(); + void update_rtps_reader_qos(); DataReaderQos get_datareader_qos_from_settings( diff --git a/test/blackbox/CMakeLists.txt b/test/blackbox/CMakeLists.txt index a9f3566168d..130251f30b9 100644 --- a/test/blackbox/CMakeLists.txt +++ b/test/blackbox/CMakeLists.txt @@ -226,7 +226,8 @@ if(FASTRTPS_API_TESTS) ) target_include_directories(BlackboxTests_FastRTPS PRIVATE ${Asio_INCLUDE_DIR} - api/fastrtps_deprecated) + api/fastrtps_deprecated + ${PROJECT_SOURCE_DIR}/test/utils) target_link_libraries(BlackboxTests_FastRTPS fastrtps fastcdr @@ -282,7 +283,8 @@ if(FASTDDS_PIM_API_TESTS) ) target_include_directories(BlackboxTests_DDS_PIM PRIVATE ${Asio_INCLUDE_DIR} - api/dds-pim) + api/dds-pim + ${PROJECT_SOURCE_DIR}/test/utils) target_link_libraries(BlackboxTests_DDS_PIM fastrtps fastcdr diff --git a/test/blackbox/api/dds-pim/PubSubReader.hpp b/test/blackbox/api/dds-pim/PubSubReader.hpp index 494b18e77f1..f80364bb96b 100644 --- a/test/blackbox/api/dds-pim/PubSubReader.hpp +++ b/test/blackbox/api/dds-pim/PubSubReader.hpp @@ -173,7 +173,6 @@ class PubSubReader Listener( PubSubReader& reader) : reader_(reader) - , times_deadline_missed_(0) { } @@ -223,8 +222,8 @@ class PubSubReader const eprosima::fastrtps::RequestedDeadlineMissedStatus& status) override { (void)datareader; - - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + requested_deadline_status_ = status; } void on_requested_incompatible_qos( @@ -275,7 +274,14 @@ class PubSubReader unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count_change; } private: @@ -284,6 +290,9 @@ class PubSubReader const Listener&) = delete; PubSubReader& reader_; + mutable std::mutex mutex_; + + eprosima::fastdds::dds::RequestedDeadlineMissedStatus requested_deadline_status_{}; //! Number of times deadline was missed unsigned int times_deadline_missed_; @@ -1760,6 +1769,11 @@ class PubSubReader return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + void liveliness_lost() { std::unique_lock lock(liveliness_mutex_); @@ -1838,6 +1852,22 @@ class PubSubReader return status; } + bool set_qos() + { + return (ReturnCode_t::RETCODE_OK == datareader_->set_qos(datareader_qos_)); + } + + bool set_qos( + const eprosima::fastdds::dds::DataReaderQos& att) + { + return (ReturnCode_t::RETCODE_OK == datareader_->set_qos(att)); + } + + eprosima::fastdds::dds::DataReaderQos get_qos() + { + return (datareader_->get_qos()); + } + bool is_matched() const { return matched_ > 0; diff --git a/test/blackbox/api/dds-pim/PubSubWriter.hpp b/test/blackbox/api/dds-pim/PubSubWriter.hpp index bc1d9dbcc4f..510489a1d79 100644 --- a/test/blackbox/api/dds-pim/PubSubWriter.hpp +++ b/test/blackbox/api/dds-pim/PubSubWriter.hpp @@ -180,7 +180,6 @@ class PubSubWriter Listener( PubSubWriter& writer) : writer_(writer) - , times_deadline_missed_(0) , times_liveliness_lost_(0) , times_unack_sample_removed_(0) { @@ -211,7 +210,8 @@ class PubSubWriter const eprosima::fastrtps::OfferedDeadlineMissedStatus& status) override { static_cast(datawriter); - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + offered_deadline_status_ = status; } void on_offered_incompatible_qos( @@ -242,7 +242,14 @@ class PubSubWriter unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count_change; } unsigned int times_liveliness_lost() const @@ -266,9 +273,10 @@ class PubSubWriter const Listener&) = delete; PubSubWriter& writer_; + mutable std::mutex mutex_; + + eprosima::fastdds::dds::OfferedDeadlineMissedStatus offered_deadline_status_{}; - //! The number of times deadline was missed - unsigned int times_deadline_missed_; //! The number of times liveliness was lost unsigned int times_liveliness_lost_; //! The number of times a sample has been removed unacknowledged @@ -1669,6 +1677,11 @@ class PubSubWriter return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + unsigned int times_liveliness_lost() const { return listener_.times_liveliness_lost(); diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp index 45b2c02df0b..aa4bcfbac68 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubReader.hpp @@ -144,7 +144,6 @@ class PubSubReader Listener( PubSubReader& reader) : reader_(reader) - , times_deadline_missed_(0) { } @@ -191,8 +190,8 @@ class PubSubReader const eprosima::fastrtps::RequestedDeadlineMissedStatus& status) override { (void)sub; - - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + requested_deadline_status_ = status; } void on_liveliness_changed( @@ -217,7 +216,14 @@ class PubSubReader unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return requested_deadline_status_.total_count_change; } private: @@ -226,9 +232,9 @@ class PubSubReader const Listener&) = delete; PubSubReader& reader_; + mutable std::mutex mutex_; - //! Number of times deadline was missed - unsigned int times_deadline_missed_; + eprosima::fastrtps::RequestedDeadlineMissedStatus requested_deadline_status_{}; } listener_; @@ -1229,7 +1235,7 @@ class PubSubReader PubSubReader& ownership_exclusive() { - subscriber_attr_.qos.m_ownership.kind = eprosima::fastdds::dds::EXCLUSIVE_OWNERSHIP_QOS; + subscriber_attr_.qos.m_ownership.kind = eprosima::fastrtps::EXCLUSIVE_OWNERSHIP_QOS; return *this; } @@ -1393,6 +1399,11 @@ class PubSubReader return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + void liveliness_lost() { std::unique_lock lock(liveliness_mutex_); @@ -1436,6 +1447,40 @@ class PubSubReader return liveliness_changed_status_; } + struct ReaderQosView + { + eprosima::fastrtps::SubscriberAttributes* att; + eprosima::fastrtps::DeadlineQosPolicy& deadline() + { + return att->qos.m_deadline; // has .period + } + + }; + + bool set_qos() + { + return subscriber_->updateAttributes(subscriber_attr_); + } + + bool set_qos( + const ReaderQosView& v) + { + (void)v; + return subscriber_->updateAttributes(subscriber_attr_); + } + + bool set_qos( + const eprosima::fastrtps::SubscriberAttributes& att) + { + subscriber_attr_ = att; + return subscriber_->updateAttributes(subscriber_attr_); + } + + ReaderQosView get_qos() + { + return ReaderQosView{& subscriber_attr_ }; + } + bool is_matched() const { return matched_ > 0; diff --git a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp index 7419b6f4bed..82c11cd5315 100644 --- a/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp +++ b/test/blackbox/api/fastrtps_deprecated/PubSubWriter.hpp @@ -164,7 +164,6 @@ class PubSubWriter Listener( PubSubWriter& writer) : writer_(writer) - , times_deadline_missed_(0) , times_liveliness_lost_(0) { } @@ -194,7 +193,8 @@ class PubSubWriter const eprosima::fastrtps::OfferedDeadlineMissedStatus& status) override { (void)pub; - times_deadline_missed_ = status.total_count; + std::lock_guard lk(mutex_); + offered_deadline_status_ = status; } void on_liveliness_lost( @@ -208,7 +208,14 @@ class PubSubWriter unsigned int missed_deadlines() const { - return times_deadline_missed_; + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count; + } + + unsigned int missed_deadlines_change() const + { + std::lock_guard lk(mutex_); + return offered_deadline_status_.total_count_change; } unsigned int times_liveliness_lost() const @@ -222,9 +229,10 @@ class PubSubWriter const Listener&) = delete; PubSubWriter& writer_; + mutable std::mutex mutex_; + + eprosima::fastrtps::OfferedDeadlineMissedStatus offered_deadline_status_{}; - //! The number of times deadline was missed - unsigned int times_deadline_missed_; //! The number of times liveliness was lost unsigned int times_liveliness_lost_; @@ -1236,7 +1244,7 @@ class PubSubWriter PubSubWriter& ownership_strength( uint32_t strength) { - publisher_attr_.qos.m_ownership.kind = eprosima::fastdds::dds::EXCLUSIVE_OWNERSHIP_QOS; + publisher_attr_.qos.m_ownership.kind = eprosima::fastrtps::EXCLUSIVE_OWNERSHIP_QOS; publisher_attr_.qos.m_ownershipStrength.value = strength; return *this; } @@ -1332,11 +1340,40 @@ class PubSubWriter return publisher_->updateAttributes(publisher_attr_); } + struct WriterQosView + { + eprosima::fastrtps::PublisherAttributes* att; + eprosima::fastrtps::DeadlineQosPolicy& deadline() + { + return att->qos.m_deadline; // has .period + } + + }; + bool set_qos() { return publisher_->updateAttributes(publisher_attr_); } + bool set_qos( + const WriterQosView& v) + { + (void)v; + return publisher_->updateAttributes(publisher_attr_); + } + + bool set_qos( + const eprosima::fastrtps::PublisherAttributes& att) + { + publisher_attr_ = att; + return publisher_->updateAttributes(publisher_attr_); + } + + WriterQosView get_qos() + { + return WriterQosView{& publisher_attr_ }; + } + bool remove_all_changes( size_t* number_of_changes_removed) { @@ -1370,6 +1407,11 @@ class PubSubWriter return listener_.missed_deadlines(); } + unsigned int missed_deadlines_change() const + { + return listener_.missed_deadlines_change(); + } + unsigned int times_liveliness_lost() const { return listener_.times_liveliness_lost(); diff --git a/test/blackbox/common/BlackboxTestsDeadlineQos.cpp b/test/blackbox/common/BlackboxTestsDeadlineQos.cpp index b97bd366859..2db884b3766 100644 --- a/test/blackbox/common/BlackboxTestsDeadlineQos.cpp +++ b/test/blackbox/common/BlackboxTestsDeadlineQos.cpp @@ -25,8 +25,11 @@ #include "ReqRepAsReliableHelloWorldReplier.hpp" #include "ReqRepAsReliableHelloWorldRequester.hpp" +#include + using namespace eprosima::fastrtps; using namespace eprosima::fastrtps::rtps; +using fastlog = eprosima::fastdds::dds::Log; enum communication_type { @@ -35,7 +38,7 @@ enum communication_type DATASHARING }; -class DeadlineQos : public testing::TestWithParam +class DeadlineQos : public ::testing::TestWithParam { public: @@ -45,7 +48,8 @@ class DeadlineQos : public testing::TestWithParam switch (GetParam()) { case INTRAPROCESS: - library_settings.intraprocess_delivery = IntraprocessDeliveryType::INTRAPROCESS_FULL; + library_settings.intraprocess_delivery = + IntraprocessDeliveryType::INTRAPROCESS_FULL; xmlparser::XMLProfileManager::library_settings(library_settings); break; case DATASHARING: @@ -312,6 +316,179 @@ TEST_P(DeadlineQos, KeyedTopicBestEffortReaderVolatileWriterSetDeadline) EXPECT_GE(writer.missed_deadlines(), 1u); } +/** + * Testing Redmine issue #23289. + * Writer-side version of ZeroDeadlinePeriod. + * Regression test for the zero-deadline period bug. + * Creating a DataWriter with a deadline of 0. + * Checking if a warning is logged exactly once, the timer is cancelled without missed deadline + * messages and a total count and count change set to max integer. + * Checking warnings, total count and count change when changing the deadline. + */ +TEST_P(DeadlineQos, ZeroDeadlinePeriodWriter) +{ + auto observer = std::make_shared(/*store=*/ false); + auto consumer = std::make_unique(observer); + + fastlog::ClearConsumers(); + fastlog::RegisterConsumer(std::move(consumer)); + fastlog::SetVerbosity(fastlog::Kind::Warning); + + observer->set_global_needle("Deadline period is 0"); + + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS); + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + + writer.deadline_period(0.0).init(); + reader.init(); + ASSERT_TRUE(writer.isInitialized()); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(1); + writer.send_sample(data.front()); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // Writer offered-deadline counters should be saturated + EXPECT_EQ(writer.missed_deadlines(), + std::numeric_limits::max()) << "Expected the max value after a zero-deadline warning."; + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + + const auto prev = observer->matched_global(); + EXPECT_EQ(prev, 1u) << "Expected exactly one 'deadline=0' warning\n"; + + const auto pre_total = writer.missed_deadlines(); + const auto pre_change = writer.missed_deadlines_change(); + + // Wait for a period long enough to expect a new miss if the timer were still active + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + const auto post_total = writer.missed_deadlines(); + const auto post_change = writer.missed_deadlines_change(); + + EXPECT_EQ(pre_total, post_total) << "The total count should not change, as the timer was canceled."; + EXPECT_EQ(pre_change, post_change) << "The total_count_change should not change, as the timer was canceled."; + EXPECT_EQ(observer->matched_global(), prev) << "No extra warnings after cancel."; + + auto q = writer.get_qos(); + q.deadline().period = Duration_t(0.1); + + ASSERT_TRUE(writer.set_qos(q)); // Update 0 -> finite + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(writer.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev) << "No new warning when moving reader from 0 -> finite"; + + q.deadline().period = Duration_t(0.0); + + ASSERT_TRUE(writer.set_qos(q)); // Update finite -> 0 + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(writer.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(writer.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev + 1) << "Exactly one new warning."; + + fastlog::ClearConsumers(); +} + +/** + * Testing Redmine issue #23289. + * Regression test for the zero-deadline period bug. + * Reader-side version of ZeroDeadlinePeriod. + * Creating a DataReader with a deadline of 0. + * Checking if a warning is logged exactly once, the timer is cancelled without missed deadline + * messages and a total count and count change set to max integer. + * Checking warnings, total count and count change when changing the deadline. + */ +TEST_P(DeadlineQos, ZeroDeadlinePeriodReader) +{ + PubSubWriter writer(TEST_TOPIC_NAME); + PubSubReader reader(TEST_TOPIC_NAME); + + writer.durability_kind(eprosima::fastdds::dds::VOLATILE_DURABILITY_QOS); + writer.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + reader.reliability(eprosima::fastdds::dds::RELIABLE_RELIABILITY_QOS); + + // Writer deadline must also be 0 to satisfy the matching rule and ensure discovery + writer.deadline_period(0.0).init(); + ASSERT_TRUE(writer.isInitialized()); + + auto observer = std::make_shared(/*store=*/ false); + auto consumer = std::make_unique(observer); + + fastlog::ClearConsumers(); + fastlog::RegisterConsumer(std::move(consumer)); + fastlog::SetVerbosity(fastlog::Kind::Warning); + + observer->set_global_needle("Deadline period is 0"); + + // Zero deadline on the READER + reader.deadline_period(0.0).init(); + ASSERT_TRUE(reader.isInitialized()); + + writer.wait_discovery(); + reader.wait_discovery(); + + auto data = default_keyedhelloworld_data_generator(1); + writer.send_sample(data.front()); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + // Reader requested-deadline counters should be saturated + EXPECT_EQ(reader.missed_deadlines(), + std::numeric_limits::max()) << "Expected the max value after a zero-deadline warning."; + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + + const auto prev = observer->matched_global(); + EXPECT_EQ(prev, 1u) << "Expected exactly one 'deadline=0' warning\n"; + + const auto pre_total = reader.missed_deadlines(); + const auto pre_change = reader.missed_deadlines_change(); + + // Wait for a period long enough to expect a new miss if the timer were still active + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + const auto post_total = reader.missed_deadlines(); + const auto post_change = reader.missed_deadlines_change(); + + EXPECT_EQ(pre_total, post_total) << "Timer canceled on reader; total must not change."; + EXPECT_EQ(pre_change, post_change) << "Timer canceled on reader; total_count_change must not change."; + EXPECT_EQ(observer->matched_global(), prev) << "No extra warnings after cancel."; + + // Now change reader's deadline from 0 -> finite; still no additional warning; counters remain saturated + auto q = reader.get_qos(); + q.deadline().period = Duration_t(0.1); + ASSERT_TRUE(reader.set_qos(q)); + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(reader.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev) << "No new warning when moving reader from 0 -> finite"; + + q.deadline().period = Duration_t(0.0); + + ASSERT_TRUE(reader.set_qos(q)); // Update finite -> 0 + + std::this_thread::sleep_for(std::chrono::milliseconds(150)); + + EXPECT_EQ(reader.missed_deadlines(), std::numeric_limits::max()); + EXPECT_EQ(reader.missed_deadlines_change(), std::numeric_limits::max()); + EXPECT_EQ(observer->matched_global(), prev + 1) << "Exactly one new warning."; + + fastlog::ClearConsumers(); +} + #ifdef INSTANTIATE_TEST_SUITE_P #define GTEST_INSTANTIATE_TEST_MACRO(x, y, z, w) INSTANTIATE_TEST_SUITE_P(x, y, z, w) #else @@ -320,8 +497,8 @@ TEST_P(DeadlineQos, KeyedTopicBestEffortReaderVolatileWriterSetDeadline) GTEST_INSTANTIATE_TEST_MACRO(DeadlineQos, DeadlineQos, - testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), - [](const testing::TestParamInfo& info) + ::testing::Values(TRANSPORT, INTRAPROCESS, DATASHARING), + [](const ::testing::TestParamInfo& info) { switch (info.param) { diff --git a/test/utils/LogCounter.hpp b/test/utils/LogCounter.hpp new file mode 100644 index 00000000000..a75556e626f --- /dev/null +++ b/test/utils/LogCounter.hpp @@ -0,0 +1,149 @@ +// Copyright 2025 Proyectos y Sistemas de Mantenimiento SL (eProsima). +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/** + * @file LogCounter.hpp + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +#include + +namespace eprosima { +namespace fastdds { +namespace testing { + +/** + * This class holds all counting/state logic: + * - Counts per Log::Kind (Warning, Error, Info, ...) + * - Optional storage of full entries (disabled by default) + * - "Needle" matcher for exact message occurrences + * Intended to be used behind a LogCounterConsumer + */ +class LogCounterObserver +{ +public: + + using Log = eprosima::fastdds::dds::Log; + using Kind = Log::Kind; + + explicit LogCounterObserver( + bool store_logs = false) + : store_(store_logs) + { + matched_.store(0, std::memory_order_relaxed); + } + + // Set / reset the message substring to match and count + void set_global_needle( + std::string s) + { + std::lock_guard lk(m_); + needle_ = std::move(s); + matched_.store(0, std::memory_order_relaxed); + } + + // Number of messages that matched the current needle (substring match) + size_t matched_global() const + { + return matched_.load(std::memory_order_relaxed); + } + + // Count of logs for a specific kind (Warning, Error, ...) + size_t count( + Kind k) const + { + std::lock_guard lk(m_); + auto it = counts_.find(k); + return (it == counts_.end()) ? 0 : it->second; + } + + // Get stored entries (only if constructed with store_logs=true) + const std::vector& entries() const + { + return entries_; + } + + // Called by the consumer + void on_log( + const Log::Entry& e) + { + std::string local_needle; + { + std::lock_guard lk(m_); + ++counts_[e.kind]; + local_needle = needle_; + if (store_) + { + entries_.push_back(e); + } + } + if (!local_needle.empty() && e.message.find(local_needle) != std::string::npos) + { + matched_.fetch_add(1, std::memory_order_relaxed); + } + } + +private: + + std::map counts_; + std::atomic matched_{0}; + + bool store_; + mutable std::mutex m_; + std::string needle_; + std::vector entries_; +}; + +/** + * Class holding a shared_ptr that ensures the observer + * outlives asynchronous logging, so tests can safely read counters even if a + * failure occurs mid-test. + */ +class LogCounterConsumer : public eprosima::fastdds::dds::LogConsumer +{ +public: + + using Log = eprosima::fastdds::dds::Log; + + explicit LogCounterConsumer( + std::shared_ptr obs) + : observer_(std::move(obs)) + { + } + + void Consume( + const Log::Entry& e) override + { + if (observer_) + { + observer_->on_log(e); + } + } + +private: + + std::shared_ptr observer_; +}; + +} // namespace testing +} // namespace fastdds +} // namespace eprosima