66import java .util .List ;
77import java .util .Map ;
88import java .util .Optional ;
9- import java .util .concurrent .ConcurrentHashMap ;
10- import java .util .concurrent .Executors ;
11- import java .util .concurrent .Future ;
12- import java .util .concurrent .ScheduledExecutorService ;
13- import java .util .concurrent .TimeUnit ;
9+ import java .util .concurrent .*;
1410import java .util .function .Consumer ;
1511
1612import com .amazonaws .services .sqs .model .QueueNameExistsException ;
@@ -190,17 +186,17 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
190186 throw new IllegalArgumentException ();
191187 }
192188
189+ String retentionPeriodString = retentionPeriod .get ().toString ();
190+ long currentTimestamp = System .currentTimeMillis ();
193191 CreateQueueRequest superRequest = request .clone ()
194192 .withQueueName (queueName )
195- .withAttributes (attributes );
193+ .withAttributes (attributes )
194+ .addTagsEntry (IDLE_QUEUE_RETENTION_PERIOD_TAG , retentionPeriodString )
195+ .addTagsEntry (LAST_HEARTBEAT_TIMESTAMP_TAG , String .valueOf (currentTimestamp ));
196196
197197 CreateQueueResult result = super .createQueue (superRequest );
198198 String queueUrl = result .getQueueUrl ();
199199
200- String retentionPeriodString = retentionPeriod .get ().toString ();
201- amazonSqsToBeExtended .tagQueue (queueUrl ,
202- Collections .singletonMap (IDLE_QUEUE_RETENTION_PERIOD_TAG , retentionPeriodString ));
203-
204200 // TODO-RS: Filter more carefully to all attributes valid for createQueue
205201 List <String > attributeNames = Arrays .asList (QueueAttributeName .ReceiveMessageWaitTimeSeconds .toString (),
206202 QueueAttributeName .VisibilityTimeout .toString ());
@@ -210,8 +206,9 @@ public CreateQueueResult createQueue(CreateQueueRequest request) {
210206 QueueMetadata metadata = new QueueMetadata (queueName , queueUrl , createdAttributes );
211207 queues .put (queueUrl , metadata );
212208
213- metadata .heartbeater = executor .scheduleAtFixedRate (() -> heartbeatToQueue (queueUrl ),
214- 0 , heartbeatIntervalSeconds , TimeUnit .SECONDS );
209+ long initialDelay = ThreadLocalRandom .current ().nextLong (heartbeatIntervalSeconds );
210+ metadata .heartbeater = executor .scheduleAtFixedRate (() -> heartbeatToQueue (queueUrl ),
211+ initialDelay , heartbeatIntervalSeconds , TimeUnit .SECONDS );
215212
216213 return result ;
217214 }
0 commit comments