Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions src/cpp/fastdds/rpc/RequesterImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include <fastdds/dds/domain/qos/RequesterQos.hpp>
#include <fastdds/dds/log/Log.hpp>
#include <fastdds/dds/rpc/RequestInfo.hpp>
#include <fastdds/dds/topic/ContentFilteredTopic.hpp>
#include <fastdds/rtps/common/Guid.hpp>
#include <fastdds/rtps/common/SequenceNumber.hpp>
#include <fastdds/rtps/common/WriteParams.hpp>
Expand Down Expand Up @@ -210,16 +211,35 @@ ReturnCode_t RequesterImpl::create_dds_entities(
return RETCODE_ERROR;
}

ContentFilteredTopic* reply_topic = service_->get_reply_filtered_topic();

requester_reader_ =
service_->get_subscriber()->create_datareader(
<<<<<<< HEAD
service_->get_reply_filtered_topic(), qos.reader_qos, nullptr);
=======
reply_topic, qos.reader_qos, this, StatusMask::subscription_matched());
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))

if (!requester_reader_)
{
EPROSIMA_LOG_ERROR(REQUESTER, "Error creating reply reader");
return RETCODE_ERROR;
}

<<<<<<< HEAD
=======
// Set the content filter signature to be different from the one used in other requesters
std::stringstream guid;
guid << requester_reader_->guid();
std::vector<std::string> expression_parameters;
reply_topic->set_filter_expression(guid.str(), expression_parameters);

// Set the related entity key on both entities
requester_reader_->set_related_datawriter(requester_writer_);
requester_writer_->set_related_datareader(requester_reader_);

>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
return RETCODE_OK;
}

Expand Down
11 changes: 11 additions & 0 deletions test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,14 +114,25 @@ void ReqRepHelloWorldReplier::newNumber(
}

void ReqRepHelloWorldReplier::wait_discovery()
{
wait_discovery(1, 1);
}

void ReqRepHelloWorldReplier::wait_discovery(
unsigned int min_pub_matched,
unsigned int min_sub_matched)
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Replier is waiting discovery..." << std::endl;

cvDiscovery_.wait(lock, [&]()
{
<<<<<<< HEAD
return matched_ > 1;
=======
return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched;
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
});

std::cout << "Replier discovery finished..." << std::endl;
Expand Down
9 changes: 9 additions & 0 deletions test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,16 @@ class ReqRepHelloWorldReplier

void wait_discovery();

<<<<<<< HEAD
void matched();
=======
void wait_discovery(
unsigned int min_pub_matched,
unsigned int min_sub_matched);

void matched(
bool is_pub);
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))

eprosima::fastdds::dds::ReplierQos create_replier_qos();

Expand Down
11 changes: 11 additions & 0 deletions test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,14 +148,25 @@ void ReqRepHelloWorldRequester::block(
}

void ReqRepHelloWorldRequester::wait_discovery()
{
wait_discovery(1, 1);
}

void ReqRepHelloWorldRequester::wait_discovery(
unsigned int min_pub_matched,
unsigned int min_sub_matched)
{
std::unique_lock<std::mutex> lock(mutexDiscovery_);

std::cout << "Requester is waiting discovery..." << std::endl;

cvDiscovery_.wait(lock, [&]()
{
<<<<<<< HEAD
return matched_ > 1;
=======
return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched;
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))
});

std::cout << "Requester discovery finished..." << std::endl;
Expand Down
9 changes: 9 additions & 0 deletions test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,16 @@ class ReqRepHelloWorldRequester

void wait_discovery();

<<<<<<< HEAD
void matched();
=======
void wait_discovery(
unsigned int min_pub_matched,
unsigned int min_sub_matched);

void matched(
bool is_pub);
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))

/**
* Sends a request without checking the matching status.
Expand Down
142 changes: 142 additions & 0 deletions test/blackbox/common/DDSBlackboxTestsRPC.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,3 +128,145 @@ TEST(RPC, ThrowExceptions)
EXPECT_THROW(throw rpc::RemoteUnsupportedError("Still not implemented"), rpc::RpcRemoteException);
EXPECT_THROW(throw rpc::RemoteUnsupportedError("Still not implemented"), rpc::RpcException);
}
<<<<<<< HEAD
=======

/**
* RPC enhanced discovery algotithm.
*
* This test checks that the requester correctly behaves when
* the replier is still unmatched and the request is sent.
*/
TEST(RPC, replier_unmatched_before_sending_request)
{
ReqRepHelloWorldRequester requester;
ReqRepHelloWorldReplier replier;

// Initialize the requester and replier
requester.init();
ASSERT_TRUE(requester.isInitialized());

// Write a request, expecting it to fail
requester.send(0, [](
eprosima::fastdds::dds::rpc::Requester* requester,
eprosima::fastdds::dds::rpc::RequestInfo* info,
void* request)
{
ASSERT_EQ(requester->send_request(request,
*info), eprosima::fastdds::dds::RETCODE_PRECONDITION_NOT_MET);
});

auto future_send = std::async(std::launch::async, [&requester]()
{
// Write a request, this time matching after 300ms
requester.send(0);
});

// At the same time, initialize the replier
std::this_thread::sleep_for(std::chrono::milliseconds(300));
replier.init();
ASSERT_TRUE(replier.isInitialized());

// The requester should now be able to receive the reply
requester.block(std::chrono::seconds(5));
}

/**
* RPC enhanced discovery algotithm.
*
* Requester is unmatched during request processing in the server side.
* This test checks that after waiting for the timeout, the send_reply()
* fails.
*/
TEST(RPC, requester_unmatched_during_request_processing)
{
std::shared_ptr<ReqRepHelloWorldRequester> requester = std::make_shared<ReqRepHelloWorldRequester>();

std::condition_variable replier_finished_cv;
std::atomic<bool> finished{false};
std::mutex replier_finished_mutex;
eprosima::fastdds::dds::Duration_t reply_elapsed;

// Simulate a Replier with heavy processing
ReqRepHelloWorldReplier replier
([&replier_finished_cv, &finished, &reply_elapsed](eprosima::fastdds::dds::rpc::RequestInfo& info,
eprosima::fastdds::dds::rpc::Replier* replier,
const void* const request)
{
// Simulate heavy processing
std::this_thread::sleep_for(std::chrono::seconds(2));
const HelloWorld* hello_request = static_cast<const HelloWorld*>(request);
ASSERT_EQ(hello_request->message().compare("HelloWorld"), 0);
HelloWorld reply;

Duration_t t0, t1;
Duration_t::now(t0);

// send_reply() should fail because the requester will be unmatched
ASSERT_EQ(replier->send_reply((void*)&reply, info), eprosima::fastdds::dds::RETCODE_NO_DATA);
finished.store(true);
Duration_t::now(t1);
reply_elapsed = t1 - t0;
replier_finished_cv.notify_one();
});

// Initialize the requester and replier
requester->init();
ASSERT_TRUE(requester->isInitialized());
replier.init();
ASSERT_TRUE(replier.isInitialized());

// Wait for discovery
requester->wait_discovery();
replier.wait_discovery();

// Write a request
requester->send(0);
std::this_thread::sleep_for(std::chrono::milliseconds(300));
requester.reset();

// Wait for the replier to finish processing
std::unique_lock<std::mutex> lock(replier_finished_mutex);
replier_finished_cv.wait(lock, [&finished]()
{
return finished.load();
});
ASSERT_TRUE(finished.load());
// Check that the reply took at least the wait_matching timeout (3 secs)
ASSERT_GT(reply_elapsed, Duration_t{2});
}

/**
* Test RPC communication with multiple requesters and one replier.
*
* This test checks that multiple requesters can send requests to a single replier
* and receive replies correctly.
*/
TEST(RPC, multiple_requesters_one_replier)
{
ReqRepHelloWorldRequester requester_1;
ReqRepHelloWorldRequester requester_2;
ReqRepHelloWorldReplier replier;

// Initialize the requesters and the replier
requester_1.init();
ASSERT_TRUE(requester_1.isInitialized());
requester_2.init();
ASSERT_TRUE(requester_2.isInitialized());
replier.init();
ASSERT_TRUE(replier.isInitialized());

// Wait for discovery
requester_1.wait_discovery();
requester_2.wait_discovery();
replier.wait_discovery(2, 2);

// Send requests from both requesters
requester_1.send(1);
requester_2.send(2);

// Block to wait for replies
requester_1.block(std::chrono::seconds(5));
requester_2.block(std::chrono::seconds(5));
}
>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))