diff --git a/pom.xml b/pom.xml
index 1d45db2..b73ca6c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -39,6 +39,11 @@
spring-boot-starter-test
test
+
+ org.apache.kafka
+ kafka-clients
+ 3.1.1
+
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java
new file mode 100644
index 0000000..b759b73
--- /dev/null
+++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KafkaConfiguration.java
@@ -0,0 +1,69 @@
+package com.techprimers.kafka.springbootkafkaproducerexample.config;
+
+import com.techprimers.kafka.springbootkafkaproducerexample.model.User;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@EnableKafka
+@Configuration
+public class KafkaConfiguration {
+
+ public static final String KAFKA_EXAMPLE = "Kafka_Example";
+ public static final String KAFKA_EXAMPLE_JSON = "Kafka_Example_User";
+
+ @Value("${kafka.bootstrapAddress}")
+ private String bootstrapAddress;
+
+ // Create related kafka topics
+ @Bean
+ public NewTopic testTopic() {
+ return new NewTopic(KAFKA_EXAMPLE, 1, (short) 1);
+ }
+
+ @Bean
+ public NewTopic testJsonTopic() {
+ return new NewTopic(KAFKA_EXAMPLE_JSON, 1, (short) 1);
+ }
+
+ @Bean
+ public KafkaTemplate kafkaSimpleMessageTemplate() {
+ return new KafkaTemplate<>(simpleMessageProducerFactory());
+ }
+
+ // Default Factory is to send String messages
+ private ProducerFactory simpleMessageProducerFactory() {
+ final Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+
+ // Object producer factory & template
+ @Bean
+ public KafkaTemplate kafkaObjectTemplate() {
+ return new KafkaTemplate<>(objectProducerFactory());
+ }
+
+ private ProducerFactory objectProducerFactory() {
+ final Map configProps = new HashMap<>();
+ configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
+ configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
+
+ return new DefaultKafkaProducerFactory<>(configProps);
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java
deleted file mode 100644
index 38165d4..0000000
--- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/config/KakfaConfiguration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package com.techprimers.kafka.springbootkafkaproducerexample.config;
-
-import com.techprimers.kafka.springbootkafkaproducerexample.model.User;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-
-import java.util.HashMap;
-import java.util.Map;
-
-@Configuration
-public class KakfaConfiguration {
-
- @Bean
- public ProducerFactory producerFactory() {
- Map config = new HashMap<>();
-
- config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
- config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
-
- return new DefaultKafkaProducerFactory<>(config);
- }
-
-
- @Bean
- public KafkaTemplate kafkaTemplate() {
- return new KafkaTemplate<>(producerFactory());
- }
-
-
-}
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java
index 5b66a54..d738dc7 100644
--- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java
+++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/model/User.java
@@ -6,6 +6,9 @@ public class User {
private String dept;
private Long salary;
+ public User() {
+ }
+
public User(String name, String dept, Long salary) {
this.name = name;
this.dept = dept;
@@ -35,4 +38,13 @@ public Long getSalary() {
public void setSalary(Long salary) {
this.salary = salary;
}
+
+ @Override
+ public String toString() {
+ return "User{" +
+ "name='" + name +
+ ", dept='" + dept +
+ ", salary=" + salary +
+ '}';
+ }
}
diff --git a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java
index cd477ab..27b8c6b 100644
--- a/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java
+++ b/src/main/java/com/techprimers/kafka/springbootkafkaproducerexample/resource/UserResource.java
@@ -1,27 +1,39 @@
package com.techprimers.kafka.springbootkafkaproducerexample.resource;
+import com.techprimers.kafka.springbootkafkaproducerexample.config.KafkaConfiguration;
import com.techprimers.kafka.springbootkafkaproducerexample.model.User;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
-@RequestMapping("kafka")
+@RequestMapping("/kafka")
public class UserResource {
@Autowired
- private KafkaTemplate kafkaTemplate;
+ @Qualifier("kafkaSimpleMessageTemplate")
+ private KafkaTemplate kafkaSimpleMessageTemplate;
- private static final String TOPIC = "Kafka_Example";
-
- @GetMapping("/publish/{name}")
- public String post(@PathVariable("name") final String name) {
+ @Autowired
+ @Qualifier("kafkaObjectTemplate")
+ private KafkaTemplate kafkaObjectTemplate;
- kafkaTemplate.send(TOPIC, new User(name, "Technology", 12000L));
+ @GetMapping("/publish/{message}")
+ public String publishMessage(@PathVariable final String message) {
+ kafkaSimpleMessageTemplate.send(KafkaConfiguration.KAFKA_EXAMPLE, message);
+ return "Message : " + message + " published successfully";
+ }
- return "Published successfully";
+ @PostMapping("/publish")
+ public String publishUser(@RequestBody final User user) {
+ kafkaObjectTemplate.send(KafkaConfiguration.KAFKA_EXAMPLE_JSON, user);
+ return "User : " + user + "published successfully";
}
-}
+
+}
\ No newline at end of file
diff --git a/src/main/resources/application.properties b/src/main/resources/application.properties
index bafddce..a0c8a98 100644
--- a/src/main/resources/application.properties
+++ b/src/main/resources/application.properties
@@ -1 +1 @@
-server.port=8081
\ No newline at end of file
+kafka.bootstrapAddress=localhost:9092
\ No newline at end of file