Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 103 additions & 43 deletions src/cpp/fastdds/publisher/DataWriterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
#ifdef FASTDDS_STATISTICS
#include <statistics/fastdds/domain/DomainParticipantImpl.hpp>
#include <statistics/types/monitorservice_types.h>
#endif //FASTDDS_STATISTICS
#endif // FASTDDS_STATISTICS

using namespace eprosima::fastrtps;
using namespace eprosima::fastrtps::rtps;
Expand Down Expand Up @@ -397,12 +397,7 @@ ReturnCode_t DataWriterImpl::enable()
// In case it has been loaded from the persistence DB, rebuild instances on history
history_.rebuild_instances();

deadline_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(),
[&]() -> bool
{
return deadline_missed();
},
qos_.deadline().period.to_ns() * 1e-6);
configure_deadline_timer_();

lifespan_timer_ = new TimedEvent(publisher_->get_participant()->get_resource_event(),
[&]() -> bool
Expand Down Expand Up @@ -642,8 +637,8 @@ ReturnCode_t DataWriterImpl::check_write_preconditions(
type_.get()->getKey(data, &instance_handle, is_key_protected);
}

//Check if the Handle is different from the special value HANDLE_NIL and
//does not correspond with the instance referred by the data
// Check if the Handle is different from the special value HANDLE_NIL and
// does not correspond with the instance referred by the data
if (handle.isDefined() && handle != instance_handle)
{
return ReturnCode_t::RETCODE_PRECONDITION_NOT_MET;
Expand Down Expand Up @@ -992,6 +987,7 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
}
}

// new_change seeds the next per-instance deadline and reschedules the timer for the next sample
CacheChange_t* ch = writer_->new_change(change_kind, handle);
if (ch != nullptr)
{
Expand Down Expand Up @@ -1024,7 +1020,8 @@ ReturnCode_t DataWriterImpl::perform_create_new_change(
return ReturnCode_t::RETCODE_TIMEOUT;
}

if (qos_.deadline().period != c_TimeInfinite)
if (qos_.deadline().period.to_ns() > 0 && qos_.deadline().period != c_TimeInfinite &&
deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max())
{
if (!history_.set_next_deadline(
handle,
Expand Down Expand Up @@ -1135,7 +1132,7 @@ void DataWriterImpl::publisher_qos_updated()
{
if (writer_ != nullptr)
{
//NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
// NOTIFY THE BUILTIN PROTOCOLS THAT THE WRITER HAS CHANGED
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
publisher_->rtps_participant()->updateWriter(writer_, get_topic_attributes(qos_, *topic_, type_), wqos);
}
Expand Down Expand Up @@ -1170,6 +1167,9 @@ ReturnCode_t DataWriterImpl::set_qos(
return ReturnCode_t::RETCODE_IMMUTABLE_POLICY;
}

// Take a snapshot of the current QoS before mutating it
const DataWriterQos old_qos = qos_;

set_qos(qos_, qos_to_set, !enabled);

if (enabled)
Expand All @@ -1185,33 +1185,30 @@ ReturnCode_t DataWriterImpl::set_qos(
writer_->updateAttributes(w_att);
}

//Notify the participant that a Writer has changed its QOS
// Notify the participant that a Writer has changed its QOS
fastrtps::TopicAttributes topic_att = get_topic_attributes(qos_, *topic_, type_);
WriterQos wqos = qos_.get_writerqos(get_publisher()->get_qos(), topic_->get_qos());
publisher_->rtps_participant()->updateWriter(writer_, topic_att, wqos);

// Deadline
if (qos_.deadline().period != c_TimeInfinite)
// If the deadline period actually changed, (re)configure the timer.
if (old_qos.deadline().period != qos_.deadline().period)
{
deadline_duration_us_ =
duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);
deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
}
else
{
deadline_timer_->cancel_timer();
configure_deadline_timer_();
}

// Lifespan
if (qos_.lifespan().duration != c_TimeInfinite)
if (old_qos.lifespan().duration != qos_.lifespan().duration)
{
lifespan_duration_us_ =
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
}
else
{
lifespan_timer_->cancel_timer();
if (qos_.lifespan().duration != c_TimeInfinite)
{
lifespan_duration_us_ =
duration<double, std::ratio<1, 1000000>>(qos_.lifespan().duration.to_ns() * 1e-3);
lifespan_timer_->update_interval_millisec(qos_.lifespan().duration.to_ns() * 1e-6);
}
else
{
lifespan_timer_->cancel_timer();
}
}
}

Expand Down Expand Up @@ -1283,7 +1280,7 @@ void DataWriterImpl::InnerDataWriterListener::on_offered_incompatible_qos(

#ifdef FASTDDS_STATISTICS
notify_status_observer(statistics::INCOMPATIBLE_QOS);
#endif //FASTDDS_STATISTICS
#endif // FASTDDS_STATISTICS

data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
}
Expand Down Expand Up @@ -1322,7 +1319,7 @@ void DataWriterImpl::InnerDataWriterListener::on_liveliness_lost(

#ifdef FASTDDS_STATISTICS
notify_status_observer(statistics::LIVELINESS_LOST);
#endif //FASTDDS_STATISTICS
#endif // FASTDDS_STATISTICS

data_writer_->user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
}
Expand Down Expand Up @@ -1366,7 +1363,7 @@ void DataWriterImpl::InnerDataWriterListener::notify_status_observer(
}
}

#endif //FASTDDS_STATISTICS
#endif // FASTDDS_STATISTICS

ReturnCode_t DataWriterImpl::wait_for_acknowledgments(
const Duration_t& max_wait)
Expand Down Expand Up @@ -1459,10 +1456,12 @@ ReturnCode_t DataWriterImpl::get_publication_matched_status(

bool DataWriterImpl::deadline_timer_reschedule()
{
assert(qos_.deadline().period != c_TimeInfinite);

std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

assert(qos_.deadline().period != c_TimeInfinite);
assert(deadline_timer_ != nullptr);
assert(deadline_missed_status_.total_count < std::numeric_limits<uint32_t>::max());

steady_clock::time_point next_deadline_us;
if (!history_.get_next_deadline(timer_owner_, next_deadline_us))
{
Expand All @@ -1475,28 +1474,89 @@ bool DataWriterImpl::deadline_timer_reschedule()
return true;
}

bool DataWriterImpl::deadline_missed()
void DataWriterImpl::configure_deadline_timer_()
{
assert(qos_.deadline().period != c_TimeInfinite);

std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

deadline_missed_status_.total_count++;
deadline_missed_status_.total_count_change++;
deadline_missed_status_.last_instance_handle = timer_owner_;
// Create the timer once
if (deadline_timer_ == nullptr)
{
deadline_timer_ = new TimedEvent(
publisher_->rtps_participant()->get_resource_event(),
[this]() -> bool
{
return deadline_missed();
},
// Park timer with a huge interval (prevents spurious callbacks); we'll arm/cancel explicitly
std::numeric_limits<double>::max()
);
}

// Handle "infinite" and "zero" outside the callback
if (qos_.deadline().period == c_TimeInfinite)
{
deadline_duration_us_ = std::chrono::duration<double, std::micro>::max();
deadline_timer_->cancel_timer();
return;
}

deadline_duration_us_ =
std::chrono::duration<double, std::ratio<1, 1000000>>(qos_.deadline().period.to_ns() * 1e-3);

if (qos_.deadline().period.to_ns() == 0)
{
deadline_timer_->cancel_timer();

deadline_missed_status_.total_count = std::numeric_limits<uint32_t>::max();
deadline_missed_status_.total_count_change = std::numeric_limits<uint32_t>::max();
EPROSIMA_LOG_WARNING(
DATA_WRITER,
"Deadline period is 0, it will be ignored from now on.");

// Bump once and notify listener exactly once.
notify_deadline_missed_nts_();
return;
}

deadline_timer_->update_interval_millisec(qos_.deadline().period.to_ns() * 1e-6);
}

void DataWriterImpl::notify_deadline_missed_nts_()
{
StatusMask notify_status = StatusMask::offered_deadline_missed();
auto listener = get_listener_for(notify_status);
if (nullptr != listener)
if (auto* listener = get_listener_for(notify_status))
{
listener->on_offered_deadline_missed(user_datawriter_, deadline_missed_status_);
deadline_missed_status_.total_count_change = 0;
}

#ifdef FASTDDS_STATISTICS
writer_listener_.notify_status_observer(statistics::DEADLINE_MISSED);
#endif //FASTDDS_STATISTICS
#endif // FASTDDS_STATISTICS

user_datawriter_->get_statuscondition().get_impl()->set_status(notify_status, true);
}

bool DataWriterImpl::deadline_missed()
{
std::unique_lock<RecursiveTimedMutex> lock(writer_->getMutex());

assert(qos_.deadline().period != c_TimeInfinite);

deadline_missed_status_.total_count++;
deadline_missed_status_.total_count_change++;
deadline_missed_status_.last_instance_handle = timer_owner_;

notify_deadline_missed_nts_();

// If we just reached the max -> log ONCE, stop timer, and bail.
if (deadline_missed_status_.total_count == std::numeric_limits<uint32_t>::max())
{
EPROSIMA_LOG_WARNING(DATA_WRITER,
"Maximum number of deadline missed messages reached. Stopping deadline timer.");
deadline_timer_->cancel_timer();
return false; // do not reschedule
}

if (!history_.set_next_deadline(
timer_owner_,
Expand Down
18 changes: 18 additions & 0 deletions src/cpp/fastdds/publisher/DataWriterImpl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#define _FASTRTPS_DATAWRITERIMPL_HPP_

#include <memory>
#include <mutex>

#include <fastdds/dds/core/status/BaseStatus.hpp>
#include <fastdds/dds/core/status/IncompatibleQosStatus.hpp>
Expand Down Expand Up @@ -578,6 +579,7 @@ class DataWriterImpl : protected rtps::IReaderDataFilter

/**
* @brief A method to reschedule the deadline timer
* @return true if deadline rescheduling succeeded, false otherwise
*/
bool deadline_timer_reschedule();

Expand Down Expand Up @@ -736,6 +738,22 @@ class DataWriterImpl : protected rtps::IReaderDataFilter

private:

/**
* (Re)configures the deadline timer:
* In case of infinite deadline period cancel it, for 0 warn and notify once (with max counts), and
* for non-infinite positive values store period.
*/
void configure_deadline_timer_();

/**
* Notifies listeners that a deadline has been missed.
*/
void notify_deadline_missed_nts_();

void create_history(
const std::shared_ptr<IPayloadPool>& payload_pool,
const std::shared_ptr<IChangePool>& change_pool);

DataWriterQos get_datawriter_qos_from_settings(
const DataWriterQos& qos);

Expand Down
Loading
Loading