diff --git a/.gitignore b/.gitignore
index 692e68f..b7e2b3a 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,4 +1,10 @@
target/
.idea/
*.iml
-.DS_STORE
\ No newline at end of file
+.DS_STORE
+/.classpath
+*/.classpath
+/.project
+*/.project
+*/.settings
+bin/
diff --git a/README.md b/README.md
index b520e72..ced3cc5 100644
--- a/README.md
+++ b/README.md
@@ -18,4 +18,4 @@ Happy learning!
- Word Count to learn the basic API
- Favourite Colour for a more advanced example (`Scala` version included)
- Bank Balance to demonstrate exactly once semantics
- - User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
\ No newline at end of file
+ - User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
diff --git a/bank-balance-exactly-once/pom.xml b/bank-balance-exactly-once/pom.xml
index 95e0266..dbfbebc 100644
--- a/bank-balance-exactly-once/pom.xml
+++ b/bank-balance-exactly-once/pom.xml
@@ -16,7 +16,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.1
+ 1.0.0
@@ -24,7 +24,7 @@
org.apache.kafka
kafka-clients
- 0.11.0.1
+ 1.0.0
diff --git a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java
index a71f573..4bcb4cb 100644
--- a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java
+++ b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java
@@ -8,16 +8,20 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.streams.Consumed;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.*;
+import org.apache.kafka.streams.processor.ThreadMetadata;
+import org.apache.kafka.streams.state.KeyValueStore;
import java.time.Instant;
import java.util.Properties;
+import java.util.Set;
public class BankBalanceExactlyOnceApp {
@@ -33,18 +37,17 @@ public static void main(String[] args) {
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
-
+
// json Serde
final Serializer jsonSerializer = new JsonSerializer();
final Deserializer jsonDeserializer = new JsonDeserializer();
final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
+
+ StreamsBuilder builder = new StreamsBuilder();
-
- KStreamBuilder builder = new KStreamBuilder();
-
- KStream bankTransactions =
- builder.stream(Serdes.String(), jsonSerde, "bank-transactions");
-
+ KStream bankTransactions = builder.stream("bank-transactions",
+ Consumed.with(Serdes.String(), jsonSerde));
+
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
@@ -53,22 +56,23 @@ public static void main(String[] args) {
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
KTable bankBalance = bankTransactions
- .groupByKey(Serdes.String(), jsonSerde)
+ .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
- jsonSerde,
- "bank-balance-agg"
+ Materialized.>as("bank-balance-agg")
+ .withKeySerde(Serdes.String())
+ .withValueSerde(jsonSerde)
);
- bankBalance.to(Serdes.String(), jsonSerde,"bank-balance-exactly-once");
+ bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/favourite-colour-java/pom.xml b/favourite-colour-java/pom.xml
index a973825..eef3d1a 100644
--- a/favourite-colour-java/pom.xml
+++ b/favourite-colour-java/pom.xml
@@ -15,7 +15,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 1.0.0
diff --git a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java
index 4828745..c5d115f 100644
--- a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java
+++ b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java
@@ -2,16 +2,20 @@
import java.util.Properties;
import java.util.Arrays;
-import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.state.KeyValueStore;
public class FavouriteColourApp {
@@ -26,8 +30,7 @@ public static void main(String[] args) {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
- KStreamBuilder builder = new KStreamBuilder();
-
+ StreamsBuilder builder = new StreamsBuilder();
// Step 1: We create the topic of users keys to colours
KStream textLines = builder.stream("favourite-colour-input");
@@ -43,6 +46,9 @@ public static void main(String[] args) {
usersAndColours.to("user-keys-and-colours");
+ Serde stringSerde = Serdes.String();
+ Serde longSerde = Serdes.Long();
+
// step 2 - we read that topic as a KTable so that updates are read correctly
KTable usersAndColoursTable = builder.table("user-keys-and-colours");
@@ -50,18 +56,20 @@ public static void main(String[] args) {
KTable favouriteColours = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
- .count("CountsByColours");
+ .count(Materialized.>as("CountsByColours")
+ .withKeySerde(stringSerde)
+ .withValueSerde(longSerde));
// 6 - we output the results to a Kafka Topic - don't forget the serializers
- favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output");
+ favouriteColours.toStream().to("favourite-colour-output", Produced.with(Serdes.String(),Serdes.Long()));
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
// only do this in dev - not in prod
streams.cleanUp();
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/favourite-colour-scala/build.sbt b/favourite-colour-scala/build.sbt
index ba01c4a..de6c6f2 100644
--- a/favourite-colour-scala/build.sbt
+++ b/favourite-colour-scala/build.sbt
@@ -5,7 +5,7 @@ scalaVersion := "2.12.3"
// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies ++= Seq(
- "org.apache.kafka" % "kafka-streams" % "0.11.0.0",
+ "org.apache.kafka" % "kafka-streams" % "1.0.0",
"org.slf4j" % "slf4j-api" % "1.7.25",
"org.slf4j" % "slf4j-log4j12" % "1.7.25"
)
diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala
index d3f1073..18d5c25 100644
--- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala
+++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala
@@ -4,9 +4,11 @@ import java.lang
import java.util.Properties
import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.common.serialization.Serdes
-import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
-import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
+import org.apache.kafka.common.serialization.{Serde, Serdes}
+import org.apache.kafka.common.utils.Bytes
+import org.apache.kafka.streams.kstream._
+import org.apache.kafka.streams.state.KeyValueStore
+import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}
object FavouriteColourAppScala {
def main(args: Array[String]): Unit = {
@@ -21,7 +23,7 @@ object FavouriteColourAppScala {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")
- val builder: KStreamBuilder = new KStreamBuilder
+ val builder: StreamsBuilder = new StreamsBuilder
// Step 1: We create the topic of users keys to colours
val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input")
@@ -42,21 +44,29 @@ object FavouriteColourAppScala {
// step 2 - we read that topic as a KTable so that updates are read correctly
val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic)
+ val stringSerde: Serde[String] = Serdes.String
+ val longSerde: Serde[lang.Long] = Serdes.Long
+
// step 3 - we count the occurences of colours
val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable
// 5 - we group by colour within the KTable
- .groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour))
- .count("CountsByColours")
+ .groupBy(
+ (user: String, colour: String) => new KeyValue[String, String](colour, colour),
+ Serialized.`with`(stringSerde, stringSerde)
+ )
+ .count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours")
+ .withKeySerde(stringSerde)
+ .withValueSerde(longSerde))
// 6 - we output the results to a Kafka Topic - don't forget the serializers
- favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala")
+ favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(stringSerde, longSerde))
- val streams: KafkaStreams = new KafkaStreams(builder, config)
+ val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.cleanUp()
streams.start()
// print the topology
- System.out.println(streams.toString)
+ streams.localThreadsMetadata().forEach(t => System.out.print(t.toString))
// shutdown hook to correctly close the streams application
Runtime.getRuntime.addShutdownHook(new Thread {
diff --git a/streams-starter-project/pom.xml b/streams-starter-project/pom.xml
index 84f1104..3e5713c 100644
--- a/streams-starter-project/pom.xml
+++ b/streams-starter-project/pom.xml
@@ -13,7 +13,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 1.0.0
diff --git a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java
index 5a88d5f..3f4f842 100644
--- a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java
+++ b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java
@@ -3,9 +3,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
@@ -20,18 +20,18 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
KStream kStream = builder.stream("input-topic-name");
// do stuff
kStream.to("word-count-output");
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/user-event-enricher/pom.xml b/user-event-enricher/pom.xml
index 0be5452..d8d6cef 100644
--- a/user-event-enricher/pom.xml
+++ b/user-event-enricher/pom.xml
@@ -14,7 +14,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 1.0.0
diff --git a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java
index 180aae6..314e1fd 100644
--- a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java
+++ b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java
@@ -3,10 +3,10 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import java.util.Properties;
@@ -20,7 +20,7 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
// we get a global table out of Kafka. This table will be replicated on each Kafka Streams application
// the key of our globalKTable is the user ID
@@ -55,12 +55,12 @@ public static void main(String[] args) {
userPurchasesEnrichedLeftJoin.to("user-purchases-enriched-left-join");
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();
// print the topology
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
diff --git a/word-count/.gitignore b/word-count/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/word-count/.gitignore
@@ -0,0 +1 @@
+/bin/
diff --git a/word-count/pom.xml b/word-count/pom.xml
index 8e0f93f..ea7b6d6 100644
--- a/word-count/pom.xml
+++ b/word-count/pom.xml
@@ -15,7 +15,7 @@
org.apache.kafka
kafka-streams
- 0.11.0.0
+ 1.0.0
diff --git a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java
index 21f64a9..2cdbf20 100644
--- a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java
+++ b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java
@@ -6,10 +6,12 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
public class WordCountApp {
public static void main(String[] args) {
@@ -20,7 +22,7 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
- KStreamBuilder builder = new KStreamBuilder();
+ StreamsBuilder builder = new StreamsBuilder();
// 1 - stream from Kafka
KStream textLines = builder.stream("word-count-input");
@@ -36,18 +38,21 @@ public static void main(String[] args) {
// 5 - group by key before aggregation
.groupByKey()
// 6 - count occurences
- .count("Counts");
+ .count(Materialized.as("Counts"));
// 7 - to in order to write the results back to kafka
- wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output");
+ wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long()));
- KafkaStreams streams = new KafkaStreams(builder, config);
+ KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.start();
+ // shutdown hook to correctly close the streams application
+ Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
+
// Update:
// print the topology every 10 seconds for learning purposes
while(true){
- System.out.println(streams.toString());
+ streams.localThreadsMetadata().forEach(data -> System.out.println(data));
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
@@ -55,7 +60,6 @@ public static void main(String[] args) {
}
}
- // shutdown hook to correctly close the streams application
- Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
+
}
}