Skip to content

Commit 97f4ea8

Browse files
committed
Improved threads creation
1 parent 475fe76 commit 97f4ea8

File tree

3 files changed

+13
-17
lines changed

3 files changed

+13
-17
lines changed

src/main/java/blog/buildon/aws/streaming/kafka/AllOrdersConsumer.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package blog.buildon.aws.streaming.kafka;
22

33
import java.time.Duration;
4-
import java.util.ArrayList;
54
import java.util.Arrays;
6-
import java.util.List;
75
import java.util.Properties;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
88

99
import org.apache.kafka.clients.consumer.ConsumerConfig;
1010
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -18,7 +18,7 @@
1818

1919
public class AllOrdersConsumer {
2020

21-
private class ConsumerThread extends Thread {
21+
private class ConsumerThread implements Runnable {
2222

2323
private String threadName;
2424
private KafkaConsumer<String, String> consumer;
@@ -55,18 +55,16 @@ public void run() {
5555

5656
}
5757

58-
private final List<ConsumerThread> consumerThreads = new ArrayList<>();
59-
6058
private void run(int numberOfThreads, Properties configs) {
59+
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
6160
for (int i = 0; i < numberOfThreads; i++) {
6261
String threadName = String.format("Consumer-Thread-%d", i);
63-
consumerThreads.add(new ConsumerThread(threadName, configs));
62+
executorService.submit(new ConsumerThread(threadName, configs));
6463
}
65-
consumerThreads.stream().forEach(ct -> ct.start());
6664
}
6765

6866
public static void main(String[] args) {
69-
createTopic(ALL_ORDERS, 6, (short) 3);
67+
createTopic(ALL_ORDERS, 6, (short) 1);
7068
if (args.length >= 1) {
7169
int numberOfThreads = Integer.parseInt(args[0]);
7270
new AllOrdersConsumer().run(numberOfThreads, getConfigs());

src/main/java/blog/buildon/aws/streaming/kafka/AllOrdersProducer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ private void run(Properties configs) {
5555
}
5656

5757
public static void main(String[] args) {
58-
createTopic(ALL_ORDERS, 6, (short) 3);
58+
createTopic(ALL_ORDERS, 6, (short) 1);
5959
new AllOrdersProducer().run(getConfigs());
6060
}
6161

src/main/java/blog/buildon/aws/streaming/kafka/BucketBasedConsumer.java

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package blog.buildon.aws.streaming.kafka;
22

33
import java.time.Duration;
4-
import java.util.ArrayList;
54
import java.util.Arrays;
6-
import java.util.List;
75
import java.util.Properties;
6+
import java.util.concurrent.ExecutorService;
7+
import java.util.concurrent.Executors;
88

99
import org.apache.kafka.clients.consumer.ConsumerConfig;
1010
import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -21,7 +21,7 @@
2121

2222
public class BucketBasedConsumer {
2323

24-
private class ConsumerThread extends Thread {
24+
private class ConsumerThread implements Runnable {
2525

2626
private String threadName;
2727
private KafkaConsumer<String, String> consumer;
@@ -69,14 +69,12 @@ public void run() {
6969

7070
}
7171

72-
private final List<ConsumerThread> consumerThreads = new ArrayList<>();
73-
7472
private void run(String bucketName, int numberOfThreads, Properties configs) {
73+
ExecutorService executorService = Executors.newFixedThreadPool(numberOfThreads);
7574
for (int i = 0; i < numberOfThreads; i++) {
76-
String threadName = String.format("%s-Thread-%d", bucketName, i);
77-
consumerThreads.add(new ConsumerThread(bucketName, threadName, configs));
75+
String threadName = String.format("Consumer-Thread-%d", i);
76+
executorService.submit(new ConsumerThread(bucketName, threadName, configs));
7877
}
79-
consumerThreads.stream().forEach(ct -> ct.start());
8078
}
8179

8280
public static void main(String[] args) {

0 commit comments

Comments
 (0)