-
Notifications
You must be signed in to change notification settings - Fork 567
Description
Description
By using Roslibjs
to subscribe to topics using CBOR
compression, I'm having a Rosbridge
error with some custom messages. I can subscribe to standard messages with the compression, but not some of my custom ones, particularly when containing vectors of other custom messages.
- Library Version: Last commit of
humble
branch. - ROS Version:
Humble
- Platform / OS:
Ubuntu 22.04
Expected Behavior
All messages arriving through Roslibjs
on the wep application, when CBOR
compression is requested.
Actual Behavior
The messages don't arrive, and Rosbridge
prints the following error:
[rosbridge_websocket-4] for slot, slot_type in msg.get_fields_and_field_types().items():
[rosbridge_websocket-4] AttributeError: 'numpy.ndarray' object has no attribute 'get_fields_and_field_types'
I don't know where this numpy.array
comes from since my nodes are C++
. I'm guessing is some conversion to Python
on the message generators in between.
Possible solution
I was able to got it to work, but I'm unsure of the solution. Or, if the problem is caused by another completely different issue and this workaround is not the best approach.
On cbor_conversion.py
, I added to the beginning of extract_cbor_values
function:
if isinstance(msg, np.ndarray):
return msg.tolist()
And imported numpy. The final result being:
# (...)
import numpy as np
# (...)
def extract_cbor_values(msg):
"""Extract a dictionary of CBOR-friendly values from a ROS message.
Primitive values will be casted to specific Python primitives.
Typed arrays will be tagged and packed into byte arrays.
"""
if isinstance(msg, np.ndarray):
return msg.tolist()
out = {}
for slot, slot_type in msg.get_fields_and_field_types().items():
val = getattr(msg, slot)
# string
if slot_type in STRING_TYPES:
out[slot] = str(val)
# bool
elif slot_type in BOOL_TYPES:
out[slot] = bool(val)
# integers
elif slot_type in INT_TYPES:
out[slot] = int(val)
# floats
elif slot_type in FLOAT_TYPES:
out[slot] = float(val)
# time/duration
elif slot_type in TIME_TYPES:
out[slot] = {
"sec": int(val.sec),
"nanosec": int(val.nanosec),
}
# byte array
elif slot_type in BYTESTREAM_TYPES:
out[slot] = bytes(val)
# bool array
elif slot_type in BOOL_ARRAY_TYPES:
out[slot] = [bool(i) for i in val]
elif slot_type in STRING_ARRAY_TYPES:
out[slot] = [str(i) for i in val]
# numeric arrays
elif slot_type in TAGGED_ARRAY_FORMATS:
tag, fmt = TAGGED_ARRAY_FORMATS[slot_type]
fmt_to_length = fmt.format(len(val))
packed = struct.pack(fmt_to_length, *val)
out[slot] = Tag(tag=tag, value=packed)
# array of messages
elif type(val) in LIST_TYPES:
out[slot] = [extract_cbor_values(i) for i in val]
# message
else:
out[slot] = extract_cbor_values(val)
return out
It seems to be working for every message I request, and my custom messages are being processed much faster with the compression='cbor'
flag now.
Any tips or suggestions?
Thanks!