@@ -77,41 +77,31 @@ deduplicate_message(Config) ->
77
77
78
78
% % Deduplication header present
79
79
% % String
80
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
81
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
82
-
80
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 13 ),
83
81
{# 'basic.get_ok' {delivery_tag = Tag1 }, _ } = amqp_channel :call (Channel , Get ),
84
82
# 'basic.get_empty' {} = amqp_channel :call (Channel , Get ),
85
83
amqp_channel :cast (Channel , # 'basic.ack' {delivery_tag = Tag1 }),
86
84
87
- % % Integer
88
- publish_message (Channel , <<" test" >>, 42 ),
89
- publish_message (Channel , <<" test" >>, 42 ),
90
-
85
+ % % %% Integer
86
+ publish_messages (Channel , <<" test" >>, 42 , 3 ),
91
87
{# 'basic.get_ok' {delivery_tag = Tag2 }, _ } = amqp_channel :call (Channel , Get ),
92
88
# 'basic.get_empty' {} = amqp_channel :call (Channel , Get ),
93
89
amqp_channel :cast (Channel , # 'basic.ack' {delivery_tag = Tag2 }),
94
90
95
- % % Float
96
- publish_message (Channel , <<" test" >>, 4.2 ),
97
- publish_message (Channel , <<" test" >>, 4.2 ),
98
-
91
+ % % %% Float
92
+ publish_messages (Channel , <<" test" >>, 4.2 , 3 ),
99
93
{# 'basic.get_ok' {delivery_tag = Tag3 }, _ } = amqp_channel :call (Channel , Get ),
100
94
# 'basic.get_empty' {} = amqp_channel :call (Channel , Get ),
101
95
amqp_channel :cast (Channel , # 'basic.ack' {delivery_tag = Tag3 }),
102
96
103
- % % None/null/nil/void/undefined
104
- publish_message (Channel , <<" test" >>, undefined ),
105
- publish_message (Channel , <<" test" >>, undefined ),
106
-
97
+ % % %% None/null/nil/void/undefined
98
+ publish_messages (Channel , <<" test" >>, undefined , 3 ),
107
99
{# 'basic.get_ok' {delivery_tag = Tag4 }, _ } = amqp_channel :call (Channel , Get ),
108
100
# 'basic.get_empty' {} = amqp_channel :call (Channel , Get ),
109
101
amqp_channel :cast (Channel , # 'basic.ack' {delivery_tag = Tag4 }),
110
102
111
103
% % Deduplication header absent
112
- publish_message (Channel , <<" test" >>),
113
- publish_message (Channel , <<" test" >>),
114
-
104
+ publish_messages (Channel , <<" test" >>, 2 ),
115
105
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
116
106
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
117
107
@@ -125,17 +115,15 @@ deduplicate_message_ttl(Config) ->
125
115
bind_new_exchange (Channel , <<" test" >>, <<" test" >>),
126
116
127
117
% % Queue default TTL
128
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
118
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
129
119
timer :sleep (2000 ),
130
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
131
-
120
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
132
121
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
133
122
134
123
% % Message TTL override
135
- publish_message (Channel , <<" test" >>, " deduplicate-that" , <<" 500" >>),
124
+ publish_messages (Channel , <<" test" >>, " deduplicate-that" , <<" 500" >>, 1 ),
136
125
timer :sleep (800 ),
137
- publish_message (Channel , <<" test" >>, " deduplicate-that" , <<" 500" >>),
138
-
126
+ publish_messages (Channel , <<" test" >>, " deduplicate-that" , <<" 500" >>, 1 ),
139
127
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
140
128
141
129
deduplicate_message_confirm (Config ) ->
@@ -149,11 +137,10 @@ deduplicate_message_confirm(Config) ->
149
137
bind_new_exchange (Channel , <<" test" >>, <<" test" >>),
150
138
151
139
% % Publish and wait for confirmation
152
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
140
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
153
141
true = amqp_channel :wait_for_confirms (Channel , 3 ),
154
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
142
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 3 ),
155
143
false = amqp_channel :wait_for_confirms (Channel , 3 ),
156
-
157
144
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
158
145
159
146
message_acknowledged (Config ) ->
@@ -163,14 +150,11 @@ message_acknowledged(Config) ->
163
150
# 'queue.declare_ok' {} = amqp_channel :call (Channel , make_queue (<<" test" >>)),
164
151
bind_new_exchange (Channel , <<" test" >>, <<" test" >>),
165
152
166
- publish_message ( Channel , << " test " >>, " deduplicate-this " ),
167
-
153
+ % % Acked message does not get deduplicated
154
+ publish_messages ( Channel , << " test " >>, " deduplicate-this " , 1 ),
168
155
{# 'basic.get_ok' {delivery_tag = Tag }, _ } = amqp_channel :call (Channel , Get ),
169
-
170
156
amqp_channel :cast (Channel , # 'basic.ack' {delivery_tag = Tag }),
171
-
172
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
173
-
157
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
174
158
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ).
175
159
176
160
queue_overflow (Config ) ->
@@ -182,10 +166,10 @@ queue_overflow(Config) ->
182
166
make_queue (<<" test" >>, Args )),
183
167
bind_new_exchange (Channel , <<" test" >>, <<" test" >>),
184
168
185
- publish_message ( Channel , << " test " >>, " deduplicate-this " ),
186
- publish_message (Channel , <<" test" >>, " deduplicate-that " ),
187
- publish_message (Channel , <<" test" >>, " deduplicate-this " ),
188
-
169
+ % % If queue overflows, same happens for cache
170
+ publish_messages (Channel , <<" test" >>, " deduplicate-this " , 1 ),
171
+ publish_messages (Channel , <<" test" >>, " deduplicate-that " , 1 ),
172
+ publish_messages ( Channel , << " test " >>, " deduplicate-this " , 1 ),
189
173
{# 'basic.get_ok' {},
190
174
# amqp_msg {props = # 'P_basic' {headers = [
191
175
{<<" x-deduplication-header" >>,
@@ -206,15 +190,13 @@ dead_letter(Config) ->
206
190
# 'queue.declare_ok' {} = amqp_channel :call (Channel , make_queue (<<" test" >>, Args )),
207
191
bind_new_exchange (Channel , <<" test" >>, <<" test" >>),
208
192
209
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
193
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
210
194
{# 'basic.get_ok' {delivery_tag = Tag }, _ } = amqp_channel :call (Channel , Get ),
211
-
212
195
amqp_channel :cast (Channel , # 'basic.reject' {delivery_tag = Tag , requeue = false }),
213
196
214
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
197
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
215
198
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , Get ),
216
-
217
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
199
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
218
200
{# 'basic.get_ok' {}, _ } = amqp_channel :call (Channel , DLGet ).
219
201
220
202
consume_no_ack (Config ) ->
@@ -229,14 +211,14 @@ consume_no_ack(Config) ->
229
211
# 'basic.consume_ok' {} -> ok
230
212
end ,
231
213
232
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
214
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
233
215
receive
234
216
{# 'basic.deliver' {}, _ } -> ok
235
217
after 1000 ->
236
218
error (message_not_received )
237
219
end ,
238
220
239
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
221
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 1 ),
240
222
receive
241
223
{# 'basic.deliver' {}, _ } -> ok
242
224
after 1000 ->
@@ -254,9 +236,7 @@ queue_policy(Config) ->
254
236
% % Wait for policy propagation
255
237
timer :sleep (1000 ),
256
238
257
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
258
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
259
-
239
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 2 ),
260
240
Get = # 'basic.get' {queue = <<" test" >>},
261
241
{# 'basic.get_ok' {delivery_tag = Tag }, _ } = amqp_channel :call (Channel , Get ),
262
242
# 'basic.get_empty' {} = amqp_channel :call (Channel , Get ),
@@ -266,9 +246,7 @@ queue_policy(Config) ->
266
246
# 'queue.declare_ok' {} = amqp_channel :call (Channel , # 'queue.declare' {queue = <<" test0" >>}),
267
247
bind_new_exchange (Channel , <<" test0" >>, <<" test0" >>),
268
248
269
- publish_message (Channel , <<" test0" >>, " deduplicate-this" ),
270
- publish_message (Channel , <<" test0" >>, " deduplicate-this" ),
271
-
249
+ publish_messages (Channel , <<" test0" >>, " deduplicate-this" , 2 ),
272
250
Get0 = # 'basic.get' {queue = <<" test0" >>},
273
251
{# 'basic.get_ok' {delivery_tag = Tag0 }, _ } = amqp_channel :call (Channel , Get0 ),
274
252
# 'basic.get_empty' {} = amqp_channel :call (Channel , Get0 ),
@@ -280,9 +258,7 @@ queue_policy(Config) ->
280
258
% Policy is cleared, default arguments are restored
281
259
rabbit_ct_broker_helpers :clear_policy (Config , 0 , <<" policy-test" >>),
282
260
283
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
284
- publish_message (Channel , <<" test" >>, " deduplicate-this" ),
285
-
261
+ publish_messages (Channel , <<" test" >>, " deduplicate-this" , 2 ),
286
262
Get = # 'basic.get' {queue = <<" test" >>},
287
263
{# 'basic.get_ok' {delivery_tag = Tag1 }, _ } = amqp_channel :call (Channel , Get ),
288
264
{# 'basic.get_ok' {delivery_tag = Tag2 }, _ } = amqp_channel :call (Channel , Get ),
@@ -311,12 +287,17 @@ bind_new_exchange(Ch, Ex, Q) ->
311
287
Binding = # 'queue.bind' {queue = Q , exchange = Ex , routing_key = <<" #" >>},
312
288
# 'queue.bind_ok' {} = amqp_channel :call (Ch , Binding ).
313
289
314
- publish_message (Ch , Ex ) ->
290
+ publish_messages (_Ch , _Ex , 0 ) ->
291
+ ok ;
292
+ publish_messages (Ch , Ex , N ) ->
315
293
Publish = # 'basic.publish' {exchange = Ex , routing_key = <<" #" >>},
316
294
Msg = # amqp_msg {payload = <<" payload" >>},
317
- amqp_channel :cast (Ch , Publish , Msg ).
295
+ amqp_channel :cast (Ch , Publish , Msg ),
296
+ publish_messages (Ch , Ex , N - 1 ).
318
297
319
- publish_message (Ch , Ex , D ) ->
298
+ publish_messages (_Ch , _Ex , _D , 0 ) ->
299
+ ok ;
300
+ publish_messages (Ch , Ex , D , N ) ->
320
301
Type = case D of
321
302
D when is_integer (D ) -> long ;
322
303
D when is_float (D ) -> float ;
@@ -326,11 +307,15 @@ publish_message(Ch, Ex, D) ->
326
307
Props = # 'P_basic' {headers = [{<<" x-deduplication-header" >>, Type , D }]},
327
308
Publish = # 'basic.publish' {exchange = Ex , routing_key = <<" #" >>},
328
309
Msg = # amqp_msg {props = Props , payload = <<" payload" >>},
329
- amqp_channel :cast (Ch , Publish , Msg ).
310
+ amqp_channel :cast (Ch , Publish , Msg ),
311
+ publish_messages (Ch , Ex , D , N - 1 ).
330
312
331
- publish_message (Ch , Ex , D , E ) ->
313
+ publish_messages (_Ch , _Ex , _D , _E , 0 ) ->
314
+ ok ;
315
+ publish_messages (Ch , Ex , D , E , N ) ->
332
316
Props = # 'P_basic' {headers = [{<<" x-deduplication-header" >>, longstr , D }],
333
317
expiration = E },
334
318
Publish = # 'basic.publish' {exchange = Ex , routing_key = <<" #" >>},
335
319
Msg = # amqp_msg {props = Props , payload = <<" payload" >>},
336
- amqp_channel :cast (Ch , Publish , Msg ).
320
+ amqp_channel :cast (Ch , Publish , Msg ),
321
+ publish_messages (Ch , Ex , D , E , N - 1 ).
0 commit comments