Skip to content
44 changes: 37 additions & 7 deletions spec/inputs/redis_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def process(conf, event_count)
allow_any_instance_of( Redis::Client ).to receive(:call_with_timeout) do |_, command, timeout, &block|
expect(command[0]).to eql :blpop
expect(command[1]).to eql ['foo', 0]
end.and_return ['foo', "{\"foo1\":\"bar\""], nil
end.and_return ['foo', "{\"foo1\":\"bar\"}"], nil

tt = Thread.new do
sleep 0.25
Expand Down Expand Up @@ -250,7 +250,7 @@ def process(conf, event_count)
allow_any_instance_of( Redis ).to receive(:script)
allow_any_instance_of( Redis::Client ).to receive(:call) do |_, command|
expect(command[0]).to eql :evalsha
end.and_return ['{"a": 1}', '{"b":'], []
end.and_return ['{"a": 1}', '{"b": 2}'], []

tt = Thread.new do
sleep 0.25
Expand Down Expand Up @@ -314,6 +314,8 @@ def process(conf, event_count)

before { subject.register }

let(:channel_name) { 'foo' }

def run_it_thread(inst)
Thread.new(inst) do |subj|
subj.run(queue)
Expand All @@ -324,7 +326,7 @@ def publish_thread(new_redis, prefix)
Thread.new(new_redis, prefix) do |r, p|
sleep 0.1
2.times do |i|
r.publish('foo', "#{p}#{i.next}")
r.publish(channel_name, { data: "#{p}#{i.next}" }.to_json)
end
end
end
Expand All @@ -342,6 +344,13 @@ def close_thread(inst, rt)
end
end

def subscription_ready?(timeout = 2)
redis_client = subject.send(:new_redis_instance)
Stud.try(10.times) do
raise unless yield(redis_client)
end
end

before(:example, type: :mocked) do
subject.register
allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false
Expand All @@ -368,24 +377,35 @@ def close_thread(inst, rt)
it 'calling the run method, adds events to the queue' do
#simulate the input thread
rt = run_it_thread(subject)
begin
subscription_ready? { |client| client.pubsub('channels').include?(channel_name) }
rescue
fail "Channel not ready in time for publishing."
end
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'c').join
#simulate the pipeline thread
close_thread(subject, rt).join

expect(queue.size).to eq(2)
end

it 'events had redis_channel' do
#simulate the input thread
rt = run_it_thread(subject)
begin
subscription_ready? { |client| client.pubsub('channels').include?(channel_name) }
rescue
fail "Channel not ready in time for publishing."
end
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'c').join
#simulate the pipeline thread
close_thread(subject, rt).join
e1 = queue.pop
e2 = queue.pop
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
expect(e1.get('[@metadata][redis_channel]')).to eq(channel_name)
expect(e2.get('[@metadata][redis_channel]')).to eq(channel_name)
end
end
end
Expand All @@ -406,6 +426,11 @@ def close_thread(inst, rt)
it 'calling the run method, adds events to the queue' do
#simulate the input thread
rt = run_it_thread(subject)
begin
subscription_ready? { |client| client.pubsub('numpat') > 0 }
rescue
fail "Channel not ready in time for publishing."
end
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'pc').join
#simulate the pipeline thread
Expand All @@ -417,14 +442,19 @@ def close_thread(inst, rt)
it 'events had redis_channel' do
#simulate the input thread
rt = run_it_thread(subject)
begin
subscription_ready? { |client| client.pubsub('numpat') > 0 }
rescue
fail "Channel not ready in time for publishing."
end
#simulate the other system thread
publish_thread(subject.send(:new_redis_instance), 'pc').join
#simulate the pipeline thread
close_thread(subject, rt).join
e1 = queue.pop
e2 = queue.pop
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
expect(e1.get('[@metadata][redis_channel]')).to eq(channel_name)
expect(e2.get('[@metadata][redis_channel]')).to eq(channel_name)
end
end
end
Expand Down