@@ -314,6 +314,8 @@ def process(conf, event_count)
314314
315315 before { subject . register }
316316
317+ let ( :channel_name ) { 'foo' }
318+
317319 def run_it_thread ( inst )
318320 Thread . new ( inst ) do |subj |
319321 subj . run ( queue )
@@ -324,7 +326,7 @@ def publish_thread(new_redis, prefix)
324326 Thread . new ( new_redis , prefix ) do |r , p |
325327 sleep 0.1
326328 2 . times do |i |
327- r . publish ( 'foo' , { data : "#{ p } #{ i . next } " } . to_json )
329+ r . publish ( channel_name , { data : "#{ p } #{ i . next } " } . to_json )
328330 end
329331 end
330332 end
@@ -342,6 +344,15 @@ def close_thread(inst, rt)
342344 end
343345 end
344346
347+ def wait_for_channel ( timeout = 2 )
348+ end_time = Time . now + timeout
349+ redis_client = subject . send ( :new_redis_instance )
350+ until redis_client . pubsub ( "channels" ) . include? ( channel_name )
351+ break if Time . now > end_time
352+ sleep ( 0.1 )
353+ end
354+ end
355+
345356 before ( :example , type : :mocked ) do
346357 subject . register
347358 allow_any_instance_of ( Redis ::Client ) . to receive ( :connected? ) . and_return true , false
@@ -368,26 +379,27 @@ def close_thread(inst, rt)
368379 it 'calling the run method, adds events to the queue' do
369380 #simulate the input thread
370381 rt = run_it_thread ( subject )
371- sleep ( 0.5 ) # Give the subscribe request a chance to complete
382+ wait_for_channel
372383 #simulate the other system thread
373384 publish_thread ( subject . send ( :new_redis_instance ) , 'c' ) . join
374385 #simulate the pipeline thread
375386 close_thread ( subject , rt ) . join
376387
377388 expect ( queue . size ) . to eq ( 2 )
378389 end
390+
379391 it 'events had redis_channel' do
380392 #simulate the input thread
381393 rt = run_it_thread ( subject )
382- sleep ( 0.5 ) # Give the subscribe request a chance to complete
394+ wait_for_channel
383395 #simulate the other system thread
384396 publish_thread ( subject . send ( :new_redis_instance ) , 'c' ) . join
385397 #simulate the pipeline thread
386398 close_thread ( subject , rt ) . join
387399 e1 = queue . pop
388400 e2 = queue . pop
389- expect ( e1 . get ( '[@metadata][redis_channel]' ) ) . to eq ( 'foo' )
390- expect ( e2 . get ( '[@metadata][redis_channel]' ) ) . to eq ( 'foo' )
401+ expect ( e1 . get ( '[@metadata][redis_channel]' ) ) . to eq ( channel_name )
402+ expect ( e2 . get ( '[@metadata][redis_channel]' ) ) . to eq ( channel_name )
391403 end
392404 end
393405 end
@@ -408,7 +420,7 @@ def close_thread(inst, rt)
408420 it 'calling the run method, adds events to the queue' do
409421 #simulate the input thread
410422 rt = run_it_thread ( subject )
411- sleep ( 0.5 ) # Give the subscribe request a chance to complete
423+ wait_for_channel
412424 #simulate the other system thread
413425 publish_thread ( subject . send ( :new_redis_instance ) , 'pc' ) . join
414426 #simulate the pipeline thread
@@ -420,15 +432,15 @@ def close_thread(inst, rt)
420432 it 'events had redis_channel' do
421433 #simulate the input thread
422434 rt = run_it_thread ( subject )
423- sleep ( 0.5 ) # Give the subscribe request a chance to complete
435+ wait_for_channel
424436 #simulate the other system thread
425437 publish_thread ( subject . send ( :new_redis_instance ) , 'pc' ) . join
426438 #simulate the pipeline thread
427439 close_thread ( subject , rt ) . join
428440 e1 = queue . pop
429441 e2 = queue . pop
430- expect ( e1 . get ( '[@metadata][redis_channel]' ) ) . to eq ( 'foo' )
431- expect ( e2 . get ( '[@metadata][redis_channel]' ) ) . to eq ( 'foo' )
442+ expect ( e1 . get ( '[@metadata][redis_channel]' ) ) . to eq ( channel_name )
443+ expect ( e2 . get ( '[@metadata][redis_channel]' ) ) . to eq ( channel_name )
432444 end
433445 end
434446 end
0 commit comments