Skip to content

Commit c028dbc

Browse files
committed
Fix sending and receiving of custom headers.
1 parent f48e37c commit c028dbc

File tree

4 files changed

+111
-11
lines changed

4 files changed

+111
-11
lines changed

lib/rabbitmq/ffi/ext.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
require_relative 'ext/connection_info'
66
require_relative 'ext/field_value'
7+
require_relative 'ext/array'
78
require_relative 'ext/table'
89

910
require_relative 'ext/method'

lib/rabbitmq/ffi/ext/array.rb

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
2+
module RabbitMQ
3+
module FFI
4+
5+
class Array
6+
include Enumerable
7+
8+
def each(*a, &b)
9+
entry_ptr = self[:entries]
10+
entries = self[:num_entries].times.map do |i|
11+
FFI::FieldValue.new(entry_ptr + i * FFI::FieldValue.size)
12+
end
13+
entries.each(*a, &b)
14+
end
15+
16+
def to_a(free=false)
17+
result = self.map do |item|
18+
item.to_value(free)
19+
end
20+
21+
clear if free
22+
result
23+
end
24+
25+
def free!
26+
self.each(&:free!)
27+
FFI.free(self[:entries])
28+
clear
29+
end
30+
31+
def self.from_a(items)
32+
size = items.size
33+
entry_ptr = Util.mem_ptr(size * FFI::FieldValue.size, release: false)
34+
items.each_with_index do |item, idx|
35+
FFI::FieldValue.new(entry_ptr + idx * FFI::FieldValue.size).apply(item)
36+
end
37+
38+
obj = new
39+
obj[:num_entries] = size
40+
obj[:entries] = entry_ptr
41+
obj
42+
end
43+
end
44+
45+
end
46+
end

lib/rabbitmq/ffi/ext/field_value.rb

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ def to_value(free=false)
1919
when :utf8; value.to_s(free).force_encoding(Encoding::UTF_8)
2020
when :timestamp; Time.at(value / 1000.0)
2121
when :table; value.to_h(free)
22-
when :array; value.to_array_not_yet_implemented!
23-
when :decimal; value.to_value_not_yet_implemented!
22+
when :array; value.to_a(free)
2423
else value
2524
end
2625

@@ -29,19 +28,30 @@ def to_value(free=false)
2928
end
3029

3130
def free!
32-
kind = self[:kind]
33-
value = self[:value][value_member(kind)]
31+
kind = self[:kind]
32+
value = self[:value][value_member(kind)]
3433
value.free! if value.respond_to? :free!
35-
clear
34+
self
3635
end
3736

38-
def self.from(value)
39-
obj = new
40-
obj[:kind], obj[:value] = case value
41-
when String; [:bytes, FieldValueValue.new(Bytes.from_s(value).pointer)]
42-
else raise NotImplementedError
37+
def apply(value)
38+
self[:kind], self[:value] = case value
39+
when ::String; [:bytes, FieldValueValue.new(Bytes.from_s(value).pointer)]
40+
when ::Symbol; [:bytes, FieldValueValue.new(Bytes.from_s(value.to_s).pointer)]
41+
when ::Array; [:array, FieldValueValue.new(Array.from_a(value).pointer)]
42+
when ::Hash; [:table, FieldValueValue.new(Table.from(value).pointer)]
43+
when ::Fixnum; [:i64, (v=FieldValueValue.new; v[:i64]=value; v)]
44+
when ::Float; [:f64, (v=FieldValueValue.new; v[:f64]=value; v)]
45+
when ::Time; [:timestamp, (v=FieldValueValue.new; v[:u64]=value.to_i*1000; v)]
46+
when true; [:boolean, (v=FieldValueValue.new; v[:boolean]=true; v)]
47+
when false; [:boolean, (v=FieldValueValue.new; v[:boolean]=false; v)]
48+
else raise NotImplementedError, "#{self.class}.from(#<#{value.class}>)"
4349
end
44-
obj
50+
self
51+
end
52+
53+
def self.from(value)
54+
new.apply(value)
4555
end
4656
end
4757

spec/channel_spec.rb

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,4 +274,47 @@
274274
subject.queue_delete("my_queue")
275275
end
276276

277+
it "can send and receive messages with custom headers" do
278+
subject.queue_delete("my_queue")
279+
subject.queue_declare("my_queue")
280+
time = Time.now
281+
282+
res = subject.basic_publish("message_body", "", "my_queue",
283+
headers: { "x-my-header" => [
284+
{
285+
"foo" => 88,
286+
"bar" => 88.8,
287+
"baz" => "string1",
288+
"time" => (time + 1),
289+
"bool" => true
290+
},
291+
{
292+
"foo" => 99,
293+
"bar" => 99.9,
294+
"baz" => "string2",
295+
"time" => (time - 1),
296+
"bool" => false
297+
},
298+
]}
299+
)
300+
res.should eq true
301+
302+
res = subject.basic_get("my_queue", no_ack: true)
303+
headers = res[:header][:headers]
304+
headers.should be_a Hash
305+
headers["x-my-header"][0]["foo"].should eq 88
306+
headers["x-my-header"][1]["foo"].should eq 99
307+
headers["x-my-header"][0]["bar"].should eq 88.8
308+
headers["x-my-header"][1]["bar"].should eq 99.9
309+
headers["x-my-header"][0]["baz"].should eq "string1"
310+
headers["x-my-header"][1]["baz"].should eq "string2"
311+
headers["x-my-header"][0]["time"].to_i.should eq((time + 1).to_i)
312+
headers["x-my-header"][1]["time"].to_i.should eq((time - 1).to_i)
313+
headers["x-my-header"][0]["bool"].should eq true
314+
headers["x-my-header"][1]["bool"].should eq false
315+
316+
res = subject.basic_get("my_queue", no_ack: true)
317+
res.has_key?(:header).should_not be
318+
end
319+
277320
end

0 commit comments

Comments
 (0)