From c0006d84cdf232cf7650cd7822614108e5ba9c03 Mon Sep 17 00:00:00 2001 From: Andriy Gelman Date: Sat, 5 Sep 2020 21:21:52 -0400 Subject: [PATCH 1/2] Fix node discovery with dps middleware in ROS2 Currently node discovery does not work with dps middleware in ROS2. The issue is as follows: When a subscriber comes online it sends out a multicast discovery message. All the subscriber topics (bloom filter outputs) are merged in a union and sent out as an "interest" message. When the multicast message is received by the publishing node, it checks whether any topic is in the set of the received interest. If a match is found a unicast connection is created and the published message is sent to the subscriber. However at the moment, the publisher's bloom filter output takes into account wildcards whereas they are not considered in the subscriber. Thus the publishers bloom filter output will not be in the set of the subscriber's interests. Hence, the unicast connection is not created. This commit makes sure the subscriber's interests are created in a symmetrical way to the publisher side which solves the discovery problem. Also when creating the publisher's discovery interest use the wildcard option. This is primarily so that discovery information is sent out on localhost given the above change. Signed-off-by: Andriy Gelman --- src/discovery.c | 3 +-- src/topics.c | 11 +++++------ 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/discovery.c b/src/discovery.c index ea1e041c..c8ceb820 100644 --- a/src/discovery.c +++ b/src/discovery.c @@ -288,7 +288,6 @@ static DPS_Status DecodeDiscoveryPayload(DPS_DiscoveryService* service, uint8_t DPS_DiscoveryService* DPS_CreateDiscoveryService(DPS_Node* node, const char* serviceId) { - static const int noWildcard = DPS_TRUE; DPS_DiscoveryService* service; DPS_Status ret; @@ -325,7 +324,7 @@ DPS_DiscoveryService* DPS_CreateDiscoveryService(DPS_Node* node, const char* ser if (ret != DPS_OK) { goto Exit; } - ret = DPS_InitPublication(service->pub, (const char**)&service->topic, 1, noWildcard, OnAck); + ret = DPS_InitPublication(service->pub, (const char**)&service->topic, 1, DPS_FALSE, OnAck); if (ret != DPS_OK) { goto Exit; } diff --git a/src/topics.c b/src/topics.c index 8989203f..28e76e90 100644 --- a/src/topics.c +++ b/src/topics.c @@ -144,12 +144,13 @@ DPS_Status DPS_AddTopic(DPS_BitVector* bf, const char* topic, const char* separa tp = topic + strcspn(topic, separators); if (!wc) { DPS_BitVectorBloomInsert(bf, (const uint8_t*)topic, tlen); - if (topicType != DPS_PubTopic) { - return DPS_OK; - } } else if (wc != topic) { DPS_BitVectorBloomInsert(bf, (const uint8_t*)topic, wc - topic); } + if (topicType == DPS_PubNoWild) { + return DPS_OK; + } + segment = malloc(tlen + 1); if (!segment) { return DPS_ERR_RESOURCES; @@ -157,7 +158,7 @@ DPS_Status DPS_AddTopic(DPS_BitVector* bf, const char* topic, const char* separa while (*tp) { size_t len; segment[prefix++] = *tp++; - if (topicType == DPS_PubTopic) { + if (wc != topic) { DPS_BitVectorBloomInsert(bf, (const uint8_t*)topic, tp - topic); } len = strcspn(tp, separators); @@ -176,7 +177,6 @@ DPS_Status DPS_AddTopic(DPS_BitVector* bf, const char* topic, const char* separa tp += len; } if (ret == DPS_OK) { - if (topicType == DPS_PubTopic) { segment[prefix] = INFIX_WILDC; DPS_BitVectorBloomInsert(bf, (uint8_t*)segment, prefix + 1); while (prefix >= 0) { @@ -184,7 +184,6 @@ DPS_Status DPS_AddTopic(DPS_BitVector* bf, const char* topic, const char* separa DPS_BitVectorBloomInsert(bf, (uint8_t*)segment, prefix + 1); --prefix; } - } } free(segment); return ret; From e16f61da15d1d06f13cfce8a667c710f45441d4b Mon Sep 17 00:00:00 2001 From: Andriy Gelman Date: Tue, 8 Sep 2020 16:30:17 -0400 Subject: [PATCH 2/2] Realign after previous commit Signed-off-by: Andriy Gelman --- src/topics.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/topics.c b/src/topics.c index 28e76e90..f99f584c 100644 --- a/src/topics.c +++ b/src/topics.c @@ -177,13 +177,13 @@ DPS_Status DPS_AddTopic(DPS_BitVector* bf, const char* topic, const char* separa tp += len; } if (ret == DPS_OK) { - segment[prefix] = INFIX_WILDC; + segment[prefix] = INFIX_WILDC; + DPS_BitVectorBloomInsert(bf, (uint8_t*)segment, prefix + 1); + while (prefix >= 0) { + segment[prefix] = FINAL_WILDC; DPS_BitVectorBloomInsert(bf, (uint8_t*)segment, prefix + 1); - while (prefix >= 0) { - segment[prefix] = FINAL_WILDC; - DPS_BitVectorBloomInsert(bf, (uint8_t*)segment, prefix + 1); - --prefix; - } + --prefix; + } } free(segment); return ret;