Skip to content

Commit f29cd75

Browse files
Merge pull request #37 from pollen-robotics/36-split-reliable-and-unreliable-commands
enhancement #36: add reliable and lossy channels
2 parents 973de02 + 98f05fe commit f29cd75

File tree

11 files changed

+131
-74
lines changed

11 files changed

+131
-74
lines changed

Plugin/src/GstDataPipeline.cpp

Lines changed: 54 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,35 @@
77
#include <gst/sdp/sdp.h>
88
#include <gst/webrtc/webrtc.h>
99

10-
GstDataPipeline::GstDataPipeline() : GstBasePipeline("DataPipeline") {
11-
}
12-
10+
GstDataPipeline::GstDataPipeline() : GstBasePipeline("DataPipeline") {}
1311

14-
void GstDataPipeline::CreatePipeline()
12+
void GstDataPipeline::CreatePipeline()
1513
{
1614
Debug::Log("GstDataPipeline create pipeline", Level::Info);
1715
GstBasePipeline::CreatePipeline();
1816

1917
webrtcbin_ = add_webrtcbin();
20-
auto state = gst_element_set_state(this->pipeline_, GstState::GST_STATE_READY);
18+
auto state = gst_element_set_state(this->pipeline_, GstState::GST_STATE_READY);
2119

2220
CreateBusThread();
2321
}
2422

2523
void GstDataPipeline::DestroyPipeline()
2624
{
2725
gst_webrtc_data_channel_close(channel_service_);
28-
gst_webrtc_data_channel_close(channel_command_);
26+
gst_webrtc_data_channel_close(channel_command_reliable_);
27+
gst_webrtc_data_channel_close(channel_command_lossy_);
2928
gst_webrtc_data_channel_close(channel_audit_);
3029

3130
channel_service_ = nullptr;
3231
channel_audit_ = nullptr;
33-
channel_command_ = nullptr;
32+
channel_command_reliable_ = nullptr;
33+
channel_command_lossy_ = nullptr;
3434

3535
GstBasePipeline::DestroyPipeline();
3636
}
3737

38-
void GstDataPipeline::SetOffer(const char* sdp_offer)
38+
void GstDataPipeline::SetOffer(const char* sdp_offer)
3939
{
4040
Debug::Log("SDP Offer: " + std::string(sdp_offer));
4141
GstSDPMessage* sdpmsg = nullptr;
@@ -44,20 +44,18 @@ void GstDataPipeline::SetOffer(const char* sdp_offer)
4444
GstWebRTCSDPType sdp_type = GST_WEBRTC_SDP_TYPE_OFFER;
4545
GstWebRTCSessionDescription* offer = gst_webrtc_session_description_new(sdp_type, sdpmsg);
4646

47-
48-
GstPromise* promise =
49-
gst_promise_new_with_change_func(on_offer_set, webrtcbin_, nullptr);
47+
GstPromise* promise = gst_promise_new_with_change_func(on_offer_set, webrtcbin_, nullptr);
5048

5149
g_signal_emit_by_name(webrtcbin_, "set-remote-description", offer, promise, nullptr);
5250

53-
//gst_webrtc_session_description_free(offer); //raise breakpoint?
51+
// gst_webrtc_session_description_free(offer); //raise breakpoint?
5452
gst_sdp_message_free(sdpmsg);
5553
gst_promise_unref(promise);
5654
}
5755

58-
void GstDataPipeline::SetICECandidate(const char* candidate, int mline_index)
56+
void GstDataPipeline::SetICECandidate(const char* candidate, int mline_index)
5957
{
60-
Debug::Log("Add ICE Candidate: " + std::string(candidate) + " "+ std::to_string(mline_index));
58+
Debug::Log("Add ICE Candidate: " + std::string(candidate) + " " + std::to_string(mline_index));
6159
g_signal_emit_by_name(webrtcbin_, "add-ice-candidate", mline_index, candidate);
6260
}
6361

@@ -72,7 +70,6 @@ void GstDataPipeline::on_offer_set(GstPromise* promise, gpointer user_data)
7270
gst_promise_unref(promise);
7371
}
7472

75-
7673
void GstDataPipeline::on_answer_created(GstPromise* promise, gpointer user_data)
7774
{
7875
Debug::Log("create answer");
@@ -85,11 +82,10 @@ void GstDataPipeline::on_answer_created(GstPromise* promise, gpointer user_data)
8582
gst_structure_get(reply, "answer", GST_TYPE_WEBRTC_SESSION_DESCRIPTION, &answer, nullptr);
8683
gst_promise_unref(promise);
8784

88-
//promise = gst_promise_new_with_change_func(on_answer_set, webrtc, nullptr);
85+
// promise = gst_promise_new_with_change_func(on_answer_set, webrtc, nullptr);
8986
g_signal_emit_by_name(webrtc, "set-local-description", answer, nullptr);
90-
//gst_promise_interrupt(promise);
91-
// gst_promise_unref(promise);
92-
87+
// gst_promise_interrupt(promise);
88+
// gst_promise_unref(promise);
9389

9490
if (callbackICEInstance != nullptr)
9591
{
@@ -115,7 +111,6 @@ void GstDataPipeline::on_ice_candidate(GstElement* webrtcbin, guint mline_index,
115111
}
116112
}
117113

118-
119114
/* void GstDataPipeline::on_message_data(GstWebRTCDataChannel* channel, GBytes* data, gpointer user_data)
120115
{
121116
Debug::Log("Data channel message received", Level::Info);
@@ -142,7 +137,6 @@ void GstDataPipeline::on_ice_candidate(GstElement* webrtcbin, guint mline_index,
142137
gst_structure_free(options);
143138
}*/
144139

145-
146140
void GstDataPipeline::on_ice_gathering_state_notify(GstElement* webrtcbin, GParamSpec* pspec, gpointer user_data)
147141
{
148142
GstWebRTCICEGatheringState ice_gather_state;
@@ -161,7 +155,7 @@ void GstDataPipeline::on_ice_gathering_state_notify(GstElement* webrtcbin, GPara
161155
new_state = "complete";
162156
break;
163157
}
164-
Debug::Log("ICE gathering state changed to "+ new_state);
158+
Debug::Log("ICE gathering state changed to " + new_state);
165159
}
166160

167161
bool GstDataPipeline::starts_with(const std::string& str, const std::string& prefix)
@@ -186,36 +180,43 @@ void GstDataPipeline::on_data_channel(GstElement* webrtcbin, GstWebRTCDataChanne
186180

187181
if (label_str == CHANNEL_SERVICE)
188182
{
189-
self->channel_service_ = channel;
183+
self->channel_service_ = channel;
190184

191185
g_signal_connect(channel, "on-message-data", G_CALLBACK(on_message_data_service), nullptr);
192186

193-
if (callbackChannelServiceOpenInstance != nullptr)
194-
callbackChannelServiceOpenInstance();
187+
if (callbackChannelServiceOpenInstance != nullptr)
188+
callbackChannelServiceOpenInstance();
195189
else
196190
Debug::Log("Fails to notify opening of service channel", Level::Warning);
197191
}
198-
else if (starts_with(label_str,CHANNEL_REACHY_STATE))
192+
else if (starts_with(label_str, CHANNEL_REACHY_STATE))
199193
{
200194
g_signal_connect(channel, "on-message-data", G_CALLBACK(on_message_data_state), nullptr);
201195
}
202196
else if (starts_with(label_str, CHANNEL_REACHY_AUDIT))
203197
{
204198
g_signal_connect(channel, "on-message-data", G_CALLBACK(on_message_data_audit), nullptr);
205199
}
206-
else if (starts_with(label_str, CHANNEL_REACHY_COMMAND))
200+
else if (starts_with(label_str, CHANNEL_REACHY_COMMAND_RELIABLE))
201+
{
202+
self->channel_command_reliable_ = channel;
203+
if (callbackChannelCommandReliableOpenInstance != nullptr)
204+
callbackChannelCommandReliableOpenInstance();
205+
else
206+
Debug::Log("Fails to notify opening of reliable command channel", Level::Warning);
207+
}
208+
else if (starts_with(label_str, CHANNEL_REACHY_COMMAND_LOSSY))
207209
{
208-
self->channel_command_ = channel;
209-
if (callbackChannelCommandOpenInstance != nullptr)
210-
callbackChannelCommandOpenInstance();
210+
self->channel_command_lossy_ = channel;
211+
if (callbackChannelCommandLossyOpenInstance != nullptr)
212+
callbackChannelCommandLossyOpenInstance();
211213
else
212-
Debug::Log("Fails to notify opening of command channel", Level::Warning);
214+
Debug::Log("Fails to notify opening of lossy command channel", Level::Warning);
213215
}
214216
else
215217
{
216218
Debug::Log("unknown data channel : " + label_str, Level::Warning);
217219
}
218-
219220
}
220221

221222
void GstDataPipeline::send_byte_array(GstWebRTCDataChannel* channel, const unsigned char* data, size_t size)
@@ -227,25 +228,33 @@ void GstDataPipeline::send_byte_array(GstWebRTCDataChannel* channel, const unsig
227228
g_bytes_unref(bytes);
228229
}
229230

230-
void GstDataPipeline::send_byte_array_channel_service(const unsigned char* data, size_t size)
231+
void GstDataPipeline::send_byte_array_channel_service(const unsigned char* data, size_t size)
231232
{
232233
if (channel_service_ != nullptr)
233-
send_byte_array(channel_service_, data, size);
234+
send_byte_array(channel_service_, data, size);
234235
else
235236
Debug::Log("channel service is not initialized ", Level::Warning);
236237
}
237238

238-
void GstDataPipeline::send_byte_array_channel_command(const unsigned char* data, size_t size)
239+
void GstDataPipeline::send_byte_array_channel_command_reliable(const unsigned char* data, size_t size)
240+
{
241+
if (channel_command_reliable_ != nullptr)
242+
send_byte_array(channel_command_reliable_, data, size);
243+
else
244+
Debug::Log("channel reliable command is not initialized ", Level::Warning);
245+
}
246+
247+
void GstDataPipeline::send_byte_array_channel_command_lossy(const unsigned char* data, size_t size)
239248
{
240-
if (channel_command_ != nullptr)
241-
send_byte_array(channel_command_, data, size);
249+
if (channel_command_reliable_ != nullptr)
250+
send_byte_array(channel_command_lossy_, data, size);
242251
else
243-
Debug::Log("channel command is not initialized ", Level::Warning);
252+
Debug::Log("channel lossy command is not initialized ", Level::Warning);
244253
}
245254

246255
void GstDataPipeline::on_message_data_service(GstWebRTCDataChannel* channel, GBytes* data, gpointer user_data)
247256
{
248-
//Debug::Log("Data channel service message received", Level::Info);
257+
// Debug::Log("Data channel service message received", Level::Info);
249258
if (callbackChannelServiceDataInstance != nullptr)
250259
{
251260
gsize size = g_bytes_get_size(data);
@@ -256,7 +265,7 @@ void GstDataPipeline::on_message_data_service(GstWebRTCDataChannel* channel, GBy
256265

257266
void GstDataPipeline::on_message_data_state(GstWebRTCDataChannel* channel, GBytes* data, gpointer user_data)
258267
{
259-
//Debug::Log("Data channel state message received", Level::Info);
268+
// Debug::Log("Data channel state message received", Level::Info);
260269
if (callbackChannelStateDataInstance != nullptr)
261270
{
262271
gsize size = g_bytes_get_size(data);
@@ -294,18 +303,19 @@ GstElement* GstDataPipeline::add_webrtcbin()
294303
return webrtcbin;
295304
}
296305

297-
298306
// Create a callback delegate
299307
void RegisterICECallback(FuncCallBackICE cb) { callbackICEInstance = cb; }
300308
void RegisterSDPCallback(FuncCallBackSDP cb) { callbackSDPInstance = cb; }
301-
void RegisterChannelCommandOpenCallback(FuncCallBackChannelOpen cb) { callbackChannelCommandOpenInstance = cb; }
309+
void RegisterChannelReliableCommandOpenCallback(FuncCallBackChannelOpen cb) { callbackChannelCommandReliableOpenInstance = cb; }
310+
void RegisterChannelLossyCommandOpenCallback(FuncCallBackChannelOpen cb) { callbackChannelCommandLossyOpenInstance = cb; }
302311
void RegisterChannelServiceOpenCallback(FuncCallBackChannelOpen cb) { callbackChannelServiceOpenInstance = cb; }
303312
void RegisterChannelServiceDataCallback(FuncCallBackChannelData cb) { callbackChannelServiceDataInstance = cb; }
304313
void RegisterChannelStateDataCallback(FuncCallBackChannelData cb) { callbackChannelStateDataInstance = cb; }
305314
void RegisterChannelAuditDataCallback(FuncCallBackChannelData cb) { callbackChannelAuditDataInstance = cb; }
306315

307-
//const
316+
// const
308317
const std::string GstDataPipeline::CHANNEL_SERVICE = "service";
309318
const std::string GstDataPipeline::CHANNEL_REACHY_STATE = "reachy_state";
310-
const std::string GstDataPipeline::CHANNEL_REACHY_COMMAND = "reachy_command";
319+
const std::string GstDataPipeline::CHANNEL_REACHY_COMMAND_RELIABLE = "reachy_command_reliable";
320+
const std::string GstDataPipeline::CHANNEL_REACHY_COMMAND_LOSSY = "reachy_command_lossy";
311321
const std::string GstDataPipeline::CHANNEL_REACHY_AUDIT = "reachy_audit";

Plugin/src/GstDataPipeline.h

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,11 @@ extern "C"
2525
static FuncCallBackChannelOpen callbackChannelServiceOpenInstance = nullptr;
2626
DLLExport void RegisterChannelServiceOpenCallback(FuncCallBackChannelOpen cb);
2727

28-
static FuncCallBackChannelOpen callbackChannelCommandOpenInstance = nullptr;
29-
DLLExport void RegisterChannelCommandOpenCallback(FuncCallBackChannelOpen cb);
28+
static FuncCallBackChannelOpen callbackChannelCommandReliableOpenInstance = nullptr;
29+
DLLExport void RegisterChannelReliableCommandOpenCallback(FuncCallBackChannelOpen cb);
30+
31+
static FuncCallBackChannelOpen callbackChannelCommandLossyOpenInstance = nullptr;
32+
DLLExport void RegisterChannelLossyCommandOpenCallback(FuncCallBackChannelOpen cb);
3033

3134
typedef void (*FuncCallBackChannelData)(const uint8_t * message, int size);
3235
static FuncCallBackChannelData callbackChannelServiceDataInstance = nullptr;
@@ -45,10 +48,12 @@ class GstDataPipeline : GstBasePipeline
4548
GstElement* webrtcbin_ = nullptr;
4649
static const std::string CHANNEL_SERVICE;
4750
static const std::string CHANNEL_REACHY_STATE;
48-
static const std::string CHANNEL_REACHY_COMMAND;
51+
static const std::string CHANNEL_REACHY_COMMAND_RELIABLE;
52+
static const std::string CHANNEL_REACHY_COMMAND_LOSSY;
4953
static const std::string CHANNEL_REACHY_AUDIT;
5054
GstWebRTCDataChannel* channel_service_ = nullptr;
51-
GstWebRTCDataChannel* channel_command_ = nullptr;
55+
GstWebRTCDataChannel* channel_command_reliable_ = nullptr;
56+
GstWebRTCDataChannel* channel_command_lossy_ = nullptr;
5257
GstWebRTCDataChannel* channel_audit_ = nullptr;
5358

5459

@@ -59,7 +64,8 @@ class GstDataPipeline : GstBasePipeline
5964
void SetOffer(const char* sdp_offer);
6065
void SetICECandidate(const char* candidate, int mline_index);
6166
void send_byte_array_channel_service(const unsigned char * data, size_t size);
62-
void send_byte_array_channel_command(const unsigned char* data, size_t size);
67+
void send_byte_array_channel_command_reliable(const unsigned char* data, size_t size);
68+
void send_byte_array_channel_command_lossy(const unsigned char* data, size_t size);
6369

6470
private:
6571
GstElement* add_webrtcbin();

Plugin/src/RenderingPlugin.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,15 @@ extern "C" void UNITY_INTERFACE_EXPORT UNITY_INTERFACE_API SendBytesChannelServi
6363
gstDataPipeline->send_byte_array_channel_service(data, size);
6464
}
6565

66-
extern "C" void UNITY_INTERFACE_EXPORT UNITY_INTERFACE_API SendBytesChannelCommand(const unsigned char* data, size_t size)
66+
extern "C" void UNITY_INTERFACE_EXPORT UNITY_INTERFACE_API SendBytesChannelReliableCommand(const unsigned char* data, size_t size)
6767
{
68-
gstDataPipeline->send_byte_array_channel_command(data, size);
68+
gstDataPipeline->send_byte_array_channel_command_reliable(data, size);
69+
}
70+
71+
extern "C" void UNITY_INTERFACE_EXPORT UNITY_INTERFACE_API SendBytesChannelLossyCommand(const unsigned char* data,
72+
size_t size)
73+
{
74+
gstDataPipeline->send_byte_array_channel_command_lossy(data, size);
6975
}
7076

7177
// --------------------------------------------------------------------------

UnityProject/Assets/Scenes/gstreamer_scene.unity

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,13 @@ MonoBehaviour:
566566
m_EditorClassIdentifier:
567567
leftRawImage: {fileID: 380550528}
568568
rightRawImage: {fileID: 31189211}
569-
ip_address: 10.0.1.36
569+
ip_address: 10.0.1.30
570+
event_OnPipelineRenderingRunning:
571+
m_PersistentCalls:
572+
m_Calls: []
573+
event_OnPipelineDataRunning:
574+
m_PersistentCalls:
575+
m_Calls: []
570576
--- !u!1 &1875013641
571577
GameObject:
572578
m_ObjectHideFlags: 0
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
version https://git-lfs.github.com/spec/v1
2-
oid sha256:ac7fca39f57c511564e4b0125c0cd73926022b9f0f5813382e3d92a03fbfb272
3-
size 53248
2+
oid sha256:17cd0a625be133bfcdbeb18e4da84607ad86e1a32e74e2e7831e4e95ea138aa5
3+
size 54272

0 commit comments

Comments
 (0)