@@ -16,11 +16,17 @@ def populate(key, event_count)
1616 end
1717end
1818
19- def process ( conf , event_count )
19+ def wait_events ( conf , event_count )
2020 events = input ( conf ) do |_ , queue |
2121 sleep 0.1 until queue . size >= event_count
2222 queue . size . times . map { queue . pop }
2323 end
24+ expect ( events . size ) . to eq event_count
25+ events
26+ end
27+
28+ def process ( conf , event_count )
29+ events = wait_events ( conf , event_count )
2430 # due multiple workers we get events out-of-order in the output
2531 events . sort! { |a , b | a . get ( 'sequence' ) <=> b . get ( 'sequence' ) }
2632 expect ( events [ 0 ] . get ( 'sequence' ) ) . to eq ( 0 )
@@ -73,7 +79,7 @@ def process(conf, event_count)
7379 input {
7480 redis {
7581 type => "blah"
76- key => "#{ key } .*"
82+ key => "#{ key_base } .*"
7783 data_type => "pattern_list"
7884 batch_count => 1
7985 }
@@ -85,7 +91,7 @@ def process(conf, event_count)
8591 total_event_count += event_count
8692 populate ( "#{ key_base } .#{ idx } " , event_count )
8793 end
88- process ( conf , total_event_count )
94+ wait_events ( conf , total_event_count )
8995 end
9096
9197 it "should read events from a list pattern using batch_count (default 125)" do
@@ -94,7 +100,7 @@ def process(conf, event_count)
94100 input {
95101 redis {
96102 type => "blah"
97- key => "#{ key } .*"
103+ key => "#{ key_base } .*"
98104 data_type => "pattern_list"
99105 batch_count => 125
100106 }
@@ -106,7 +112,7 @@ def process(conf, event_count)
106112 total_event_count += event_count
107113 populate ( "#{ key_base } .#{ idx } " , event_count )
108114 end
109- process ( conf , total_event_count )
115+ wait_events ( conf , total_event_count )
110116 end
111117end
112118
0 commit comments