Skip to content

Commit 55dcc3a

Browse files
committed
Check specifically if the channel has been created
1 parent 90a02c1 commit 55dcc3a

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

spec/inputs/redis_spec.rb

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -314,6 +314,8 @@ def process(conf, event_count)
314314

315315
before { subject.register }
316316

317+
let(:channel_name) { 'foo' }
318+
317319
def run_it_thread(inst)
318320
Thread.new(inst) do |subj|
319321
subj.run(queue)
@@ -342,6 +344,15 @@ def close_thread(inst, rt)
342344
end
343345
end
344346

347+
def wait_for_channel(timeout = 2)
348+
end_time = Time.now + timeout
349+
redis_client = subject.send(:new_redis_instance)
350+
until redis_client.pubsub("channels").include?(channel_name)
351+
break if Time.now > end_time
352+
sleep(0.1)
353+
end
354+
end
355+
345356
before(:example, type: :mocked) do
346357
subject.register
347358
allow_any_instance_of( Redis::Client ).to receive(:connected?).and_return true, false
@@ -368,26 +379,27 @@ def close_thread(inst, rt)
368379
it 'calling the run method, adds events to the queue' do
369380
#simulate the input thread
370381
rt = run_it_thread(subject)
371-
sleep(0.5) # Give the subscribe request a chance to complete
382+
wait_for_channel
372383
#simulate the other system thread
373384
publish_thread(subject.send(:new_redis_instance), 'c').join
374385
#simulate the pipeline thread
375386
close_thread(subject, rt).join
376387

377388
expect(queue.size).to eq(2)
378389
end
390+
379391
it 'events had redis_channel' do
380392
#simulate the input thread
381393
rt = run_it_thread(subject)
382-
sleep(0.5) # Give the subscribe request a chance to complete
394+
wait_for_channel
383395
#simulate the other system thread
384396
publish_thread(subject.send(:new_redis_instance), 'c').join
385397
#simulate the pipeline thread
386398
close_thread(subject, rt).join
387399
e1 = queue.pop
388400
e2 = queue.pop
389-
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
390-
expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
401+
expect(e1.get('[@metadata][redis_channel]')).to eq(channel_name)
402+
expect(e2.get('[@metadata][redis_channel]')).to eq(channel_name)
391403
end
392404
end
393405
end
@@ -408,7 +420,7 @@ def close_thread(inst, rt)
408420
it 'calling the run method, adds events to the queue' do
409421
#simulate the input thread
410422
rt = run_it_thread(subject)
411-
sleep(0.5) # Give the subscribe request a chance to complete
423+
wait_for_channel
412424
#simulate the other system thread
413425
publish_thread(subject.send(:new_redis_instance), 'pc').join
414426
#simulate the pipeline thread
@@ -420,15 +432,15 @@ def close_thread(inst, rt)
420432
it 'events had redis_channel' do
421433
#simulate the input thread
422434
rt = run_it_thread(subject)
423-
sleep(0.5) # Give the subscribe request a chance to complete
435+
wait_for_channel
424436
#simulate the other system thread
425437
publish_thread(subject.send(:new_redis_instance), 'pc').join
426438
#simulate the pipeline thread
427439
close_thread(subject, rt).join
428440
e1 = queue.pop
429441
e2 = queue.pop
430-
expect(e1.get('[@metadata][redis_channel]')).to eq('foo')
431-
expect(e2.get('[@metadata][redis_channel]')).to eq('foo')
442+
expect(e1.get('[@metadata][redis_channel]')).to eq(channel_name)
443+
expect(e2.get('[@metadata][redis_channel]')).to eq(channel_name)
432444
end
433445
end
434446
end

0 commit comments

Comments
 (0)