From be78cdc4ac6c7a6efe8d1fa7eb7cd0bafd6131d8 Mon Sep 17 00:00:00 2001 From: Carlos Espinoza Curto <148376273+Carlosespicur@users.noreply.github.com> Date: Tue, 26 Aug 2025 09:53:18 +0200 Subject: [PATCH] Set different content filter signatures for each requester (#5972) * Refs #23568: Set different content filter signatures for each requester Signed-off-by: Carlosespicur * Refs #23568: Add test Signed-off-by: Carlosespicur --------- Signed-off-by: Carlosespicur (cherry picked from commit 5e01f498a189e2c1942e030a86df1cc76bef2e3a) # Conflicts: # src/cpp/fastdds/rpc/RequesterImpl.cpp # test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp # test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp # test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp # test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp # test/blackbox/common/DDSBlackboxTestsRPC.cpp --- src/cpp/fastdds/rpc/RequesterImpl.cpp | 20 +++ .../api/dds-pim/ReqRepHelloWorldReplier.cpp | 11 ++ .../api/dds-pim/ReqRepHelloWorldReplier.hpp | 9 ++ .../api/dds-pim/ReqRepHelloWorldRequester.cpp | 11 ++ .../api/dds-pim/ReqRepHelloWorldRequester.hpp | 9 ++ test/blackbox/common/DDSBlackboxTestsRPC.cpp | 142 ++++++++++++++++++ 6 files changed, 202 insertions(+) diff --git a/src/cpp/fastdds/rpc/RequesterImpl.cpp b/src/cpp/fastdds/rpc/RequesterImpl.cpp index d398e74fa7f..84ead4c13af 100644 --- a/src/cpp/fastdds/rpc/RequesterImpl.cpp +++ b/src/cpp/fastdds/rpc/RequesterImpl.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -210,9 +211,15 @@ ReturnCode_t RequesterImpl::create_dds_entities( return RETCODE_ERROR; } + ContentFilteredTopic* reply_topic = service_->get_reply_filtered_topic(); + requester_reader_ = service_->get_subscriber()->create_datareader( +<<<<<<< HEAD service_->get_reply_filtered_topic(), qos.reader_qos, nullptr); +======= + reply_topic, qos.reader_qos, this, StatusMask::subscription_matched()); +>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972)) if (!requester_reader_) { @@ -220,6 +227,19 @@ ReturnCode_t RequesterImpl::create_dds_entities( return RETCODE_ERROR; } +<<<<<<< HEAD +======= + // Set the content filter signature to be different from the one used in other requesters + std::stringstream guid; + guid << requester_reader_->guid(); + std::vector expression_parameters; + reply_topic->set_filter_expression(guid.str(), expression_parameters); + + // Set the related entity key on both entities + requester_reader_->set_related_datawriter(requester_writer_); + requester_writer_->set_related_datareader(requester_reader_); + +>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972)) return RETCODE_OK; } diff --git a/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp b/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp index ad9aa8d9c4c..096dfb80c9b 100644 --- a/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp +++ b/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.cpp @@ -114,6 +114,13 @@ void ReqRepHelloWorldReplier::newNumber( } void ReqRepHelloWorldReplier::wait_discovery() +{ + wait_discovery(1, 1); +} + +void ReqRepHelloWorldReplier::wait_discovery( + unsigned int min_pub_matched, + unsigned int min_sub_matched) { std::unique_lock lock(mutexDiscovery_); @@ -121,7 +128,11 @@ void ReqRepHelloWorldReplier::wait_discovery() cvDiscovery_.wait(lock, [&]() { +<<<<<<< HEAD return matched_ > 1; +======= + return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched; +>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972)) }); std::cout << "Replier discovery finished..." << std::endl; diff --git a/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp b/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp index e6ec998f732..455ae159092 100644 --- a/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp +++ b/test/blackbox/api/dds-pim/ReqRepHelloWorldReplier.hpp @@ -67,7 +67,16 @@ class ReqRepHelloWorldReplier void wait_discovery(); +<<<<<<< HEAD void matched(); +======= + void wait_discovery( + unsigned int min_pub_matched, + unsigned int min_sub_matched); + + void matched( + bool is_pub); +>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972)) eprosima::fastdds::dds::ReplierQos create_replier_qos(); diff --git a/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp b/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp index 11c8fc87cea..a0e02f93fec 100644 --- a/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp +++ b/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.cpp @@ -148,6 +148,13 @@ void ReqRepHelloWorldRequester::block( } void ReqRepHelloWorldRequester::wait_discovery() +{ + wait_discovery(1, 1); +} + +void ReqRepHelloWorldRequester::wait_discovery( + unsigned int min_pub_matched, + unsigned int min_sub_matched) { std::unique_lock lock(mutexDiscovery_); @@ -155,7 +162,11 @@ void ReqRepHelloWorldRequester::wait_discovery() cvDiscovery_.wait(lock, [&]() { +<<<<<<< HEAD return matched_ > 1; +======= + return pub_matched_ >= min_pub_matched && sub_matched_ >= min_sub_matched; +>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972)) }); std::cout << "Requester discovery finished..." << std::endl; diff --git a/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp b/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp index 751ed8890f5..0d74a09513f 100644 --- a/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp +++ b/test/blackbox/api/dds-pim/ReqRepHelloWorldRequester.hpp @@ -76,7 +76,16 @@ class ReqRepHelloWorldRequester void wait_discovery(); +<<<<<<< HEAD void matched(); +======= + void wait_discovery( + unsigned int min_pub_matched, + unsigned int min_sub_matched); + + void matched( + bool is_pub); +>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972)) /** * Sends a request without checking the matching status. diff --git a/test/blackbox/common/DDSBlackboxTestsRPC.cpp b/test/blackbox/common/DDSBlackboxTestsRPC.cpp index c1717089a03..75fbc19844b 100644 --- a/test/blackbox/common/DDSBlackboxTestsRPC.cpp +++ b/test/blackbox/common/DDSBlackboxTestsRPC.cpp @@ -128,3 +128,145 @@ TEST(RPC, ThrowExceptions) EXPECT_THROW(throw rpc::RemoteUnsupportedError("Still not implemented"), rpc::RpcRemoteException); EXPECT_THROW(throw rpc::RemoteUnsupportedError("Still not implemented"), rpc::RpcException); } +<<<<<<< HEAD +======= + +/** + * RPC enhanced discovery algotithm. + * + * This test checks that the requester correctly behaves when + * the replier is still unmatched and the request is sent. + */ +TEST(RPC, replier_unmatched_before_sending_request) +{ + ReqRepHelloWorldRequester requester; + ReqRepHelloWorldReplier replier; + + // Initialize the requester and replier + requester.init(); + ASSERT_TRUE(requester.isInitialized()); + + // Write a request, expecting it to fail + requester.send(0, []( + eprosima::fastdds::dds::rpc::Requester* requester, + eprosima::fastdds::dds::rpc::RequestInfo* info, + void* request) + { + ASSERT_EQ(requester->send_request(request, + *info), eprosima::fastdds::dds::RETCODE_PRECONDITION_NOT_MET); + }); + + auto future_send = std::async(std::launch::async, [&requester]() + { + // Write a request, this time matching after 300ms + requester.send(0); + }); + + // At the same time, initialize the replier + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + replier.init(); + ASSERT_TRUE(replier.isInitialized()); + + // The requester should now be able to receive the reply + requester.block(std::chrono::seconds(5)); +} + +/** + * RPC enhanced discovery algotithm. + * + * Requester is unmatched during request processing in the server side. + * This test checks that after waiting for the timeout, the send_reply() + * fails. + */ +TEST(RPC, requester_unmatched_during_request_processing) +{ + std::shared_ptr requester = std::make_shared(); + + std::condition_variable replier_finished_cv; + std::atomic finished{false}; + std::mutex replier_finished_mutex; + eprosima::fastdds::dds::Duration_t reply_elapsed; + + // Simulate a Replier with heavy processing + ReqRepHelloWorldReplier replier + ([&replier_finished_cv, &finished, &reply_elapsed](eprosima::fastdds::dds::rpc::RequestInfo& info, + eprosima::fastdds::dds::rpc::Replier* replier, + const void* const request) + { + // Simulate heavy processing + std::this_thread::sleep_for(std::chrono::seconds(2)); + const HelloWorld* hello_request = static_cast(request); + ASSERT_EQ(hello_request->message().compare("HelloWorld"), 0); + HelloWorld reply; + + Duration_t t0, t1; + Duration_t::now(t0); + + // send_reply() should fail because the requester will be unmatched + ASSERT_EQ(replier->send_reply((void*)&reply, info), eprosima::fastdds::dds::RETCODE_NO_DATA); + finished.store(true); + Duration_t::now(t1); + reply_elapsed = t1 - t0; + replier_finished_cv.notify_one(); + }); + + // Initialize the requester and replier + requester->init(); + ASSERT_TRUE(requester->isInitialized()); + replier.init(); + ASSERT_TRUE(replier.isInitialized()); + + // Wait for discovery + requester->wait_discovery(); + replier.wait_discovery(); + + // Write a request + requester->send(0); + std::this_thread::sleep_for(std::chrono::milliseconds(300)); + requester.reset(); + + // Wait for the replier to finish processing + std::unique_lock lock(replier_finished_mutex); + replier_finished_cv.wait(lock, [&finished]() + { + return finished.load(); + }); + ASSERT_TRUE(finished.load()); + // Check that the reply took at least the wait_matching timeout (3 secs) + ASSERT_GT(reply_elapsed, Duration_t{2}); +} + +/** + * Test RPC communication with multiple requesters and one replier. + * + * This test checks that multiple requesters can send requests to a single replier + * and receive replies correctly. + */ +TEST(RPC, multiple_requesters_one_replier) +{ + ReqRepHelloWorldRequester requester_1; + ReqRepHelloWorldRequester requester_2; + ReqRepHelloWorldReplier replier; + + // Initialize the requesters and the replier + requester_1.init(); + ASSERT_TRUE(requester_1.isInitialized()); + requester_2.init(); + ASSERT_TRUE(requester_2.isInitialized()); + replier.init(); + ASSERT_TRUE(replier.isInitialized()); + + // Wait for discovery + requester_1.wait_discovery(); + requester_2.wait_discovery(); + replier.wait_discovery(2, 2); + + // Send requests from both requesters + requester_1.send(1); + requester_2.send(2); + + // Block to wait for replies + requester_1.block(std::chrono::seconds(5)); + requester_2.block(std::chrono::seconds(5)); +} +>>>>>>> 5e01f498 (Set different content filter signatures for each requester (#5972))