diff --git a/.gitignore b/.gitignore index 692e68f..b7e2b3a 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,10 @@ target/ .idea/ *.iml -.DS_STORE \ No newline at end of file +.DS_STORE +/.classpath +*/.classpath +/.project +*/.project +*/.settings +bin/ diff --git a/README.md b/README.md index b520e72..ced3cc5 100644 --- a/README.md +++ b/README.md @@ -18,4 +18,4 @@ Happy learning! - Word Count to learn the basic API - Favourite Colour for a more advanced example (`Scala` version included) - Bank Balance to demonstrate exactly once semantics - - User Event matcher to learn about joins between `KStream` and `GlobalKTable`. \ No newline at end of file + - User Event matcher to learn about joins between `KStream` and `GlobalKTable`. diff --git a/bank-balance-exactly-once/pom.xml b/bank-balance-exactly-once/pom.xml index 95e0266..dbfbebc 100644 --- a/bank-balance-exactly-once/pom.xml +++ b/bank-balance-exactly-once/pom.xml @@ -16,7 +16,7 @@ org.apache.kafka kafka-streams - 0.11.0.1 + 1.0.0 @@ -24,7 +24,7 @@ org.apache.kafka kafka-clients - 0.11.0.1 + 1.0.0 diff --git a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java index a71f573..4bcb4cb 100644 --- a/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java +++ b/bank-balance-exactly-once/src/main/java/com/github/simplesteph/udemy/kafka/streams/BankBalanceExactlyOnceApp.java @@ -8,16 +8,20 @@ import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.connect.json.JsonDeserializer; import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.*; +import org.apache.kafka.streams.processor.ThreadMetadata; +import org.apache.kafka.streams.state.KeyValueStore; import java.time.Instant; import java.util.Properties; +import java.util.Set; public class BankBalanceExactlyOnceApp { @@ -33,18 +37,17 @@ public static void main(String[] args) { // Exactly once processing!! config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); - + // json Serde final Serializer jsonSerializer = new JsonSerializer(); final Deserializer jsonDeserializer = new JsonDeserializer(); final Serde jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer); + + StreamsBuilder builder = new StreamsBuilder(); - - KStreamBuilder builder = new KStreamBuilder(); - - KStream bankTransactions = - builder.stream(Serdes.String(), jsonSerde, "bank-transactions"); - + KStream bankTransactions = builder.stream("bank-transactions", + Consumed.with(Serdes.String(), jsonSerde)); + // create the initial json object for balances ObjectNode initialBalance = JsonNodeFactory.instance.objectNode(); @@ -53,22 +56,23 @@ public static void main(String[] args) { initialBalance.put("time", Instant.ofEpochMilli(0L).toString()); KTable bankBalance = bankTransactions - .groupByKey(Serdes.String(), jsonSerde) + .groupByKey(Serialized.with(Serdes.String(), jsonSerde)) .aggregate( () -> initialBalance, (key, transaction, balance) -> newBalance(transaction, balance), - jsonSerde, - "bank-balance-agg" + Materialized.>as("bank-balance-agg") + .withKeySerde(Serdes.String()) + .withValueSerde(jsonSerde) ); - bankBalance.to(Serdes.String(), jsonSerde,"bank-balance-exactly-once"); + bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde)); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/favourite-colour-java/pom.xml b/favourite-colour-java/pom.xml index a973825..eef3d1a 100644 --- a/favourite-colour-java/pom.xml +++ b/favourite-colour-java/pom.xml @@ -15,7 +15,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java index 4828745..c5d115f 100644 --- a/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java +++ b/favourite-colour-java/src/main/java/com/github/simplesteph/udemy/kafka/streams/FavouriteColourApp.java @@ -2,16 +2,20 @@ import java.util.Properties; import java.util.Arrays; -import java.util.stream.Collectors; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; +import org.apache.kafka.streams.state.KeyValueStore; public class FavouriteColourApp { @@ -26,8 +30,7 @@ public static void main(String[] args) { // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0"); - KStreamBuilder builder = new KStreamBuilder(); - + StreamsBuilder builder = new StreamsBuilder(); // Step 1: We create the topic of users keys to colours KStream textLines = builder.stream("favourite-colour-input"); @@ -43,6 +46,9 @@ public static void main(String[] args) { usersAndColours.to("user-keys-and-colours"); + Serde stringSerde = Serdes.String(); + Serde longSerde = Serdes.Long(); + // step 2 - we read that topic as a KTable so that updates are read correctly KTable usersAndColoursTable = builder.table("user-keys-and-colours"); @@ -50,18 +56,20 @@ public static void main(String[] args) { KTable favouriteColours = usersAndColoursTable // 5 - we group by colour within the KTable .groupBy((user, colour) -> new KeyValue<>(colour, colour)) - .count("CountsByColours"); + .count(Materialized.>as("CountsByColours") + .withKeySerde(stringSerde) + .withValueSerde(longSerde)); // 6 - we output the results to a Kafka Topic - don't forget the serializers - favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output"); + favouriteColours.toStream().to("favourite-colour-output", Produced.with(Serdes.String(),Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); // only do this in dev - not in prod streams.cleanUp(); streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/favourite-colour-scala/build.sbt b/favourite-colour-scala/build.sbt index ba01c4a..de6c6f2 100644 --- a/favourite-colour-scala/build.sbt +++ b/favourite-colour-scala/build.sbt @@ -5,7 +5,7 @@ scalaVersion := "2.12.3" // https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams libraryDependencies ++= Seq( - "org.apache.kafka" % "kafka-streams" % "0.11.0.0", + "org.apache.kafka" % "kafka-streams" % "1.0.0", "org.slf4j" % "slf4j-api" % "1.7.25", "org.slf4j" % "slf4j-log4j12" % "1.7.25" ) diff --git a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala index d3f1073..18d5c25 100644 --- a/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala +++ b/favourite-colour-scala/src/main/scala/com/github/simplesteph/udemy/kafka/streams/FavouriteColourAppScala.scala @@ -4,9 +4,11 @@ import java.lang import java.util.Properties import org.apache.kafka.clients.consumer.ConsumerConfig -import org.apache.kafka.common.serialization.Serdes -import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} -import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig} +import org.apache.kafka.common.serialization.{Serde, Serdes} +import org.apache.kafka.common.utils.Bytes +import org.apache.kafka.streams.kstream._ +import org.apache.kafka.streams.state.KeyValueStore +import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig} object FavouriteColourAppScala { def main(args: Array[String]): Unit = { @@ -21,7 +23,7 @@ object FavouriteColourAppScala { // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0") - val builder: KStreamBuilder = new KStreamBuilder + val builder: StreamsBuilder = new StreamsBuilder // Step 1: We create the topic of users keys to colours val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input") @@ -42,21 +44,29 @@ object FavouriteColourAppScala { // step 2 - we read that topic as a KTable so that updates are read correctly val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic) + val stringSerde: Serde[String] = Serdes.String + val longSerde: Serde[lang.Long] = Serdes.Long + // step 3 - we count the occurences of colours val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable // 5 - we group by colour within the KTable - .groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour)) - .count("CountsByColours") + .groupBy( + (user: String, colour: String) => new KeyValue[String, String](colour, colour), + Serialized.`with`(stringSerde, stringSerde) + ) + .count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours") + .withKeySerde(stringSerde) + .withValueSerde(longSerde)) // 6 - we output the results to a Kafka Topic - don't forget the serializers - favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala") + favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(stringSerde, longSerde)) - val streams: KafkaStreams = new KafkaStreams(builder, config) + val streams: KafkaStreams = new KafkaStreams(builder.build(), config) streams.cleanUp() streams.start() // print the topology - System.out.println(streams.toString) + streams.localThreadsMetadata().forEach(t => System.out.print(t.toString)) // shutdown hook to correctly close the streams application Runtime.getRuntime.addShutdownHook(new Thread { diff --git a/streams-starter-project/pom.xml b/streams-starter-project/pom.xml index 84f1104..3e5713c 100644 --- a/streams-starter-project/pom.xml +++ b/streams-starter-project/pom.xml @@ -13,7 +13,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java index 5a88d5f..3f4f842 100644 --- a/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java +++ b/streams-starter-project/src/main/java/com/github/simplesteph/udemy/kafka/streams/StreamsStarterApp.java @@ -3,9 +3,9 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import java.util.Properties; @@ -20,18 +20,18 @@ public static void main(String[] args) { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); KStream kStream = builder.stream("input-topic-name"); // do stuff kStream.to("word-count-output"); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); // only do this in dev - not in prod streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/user-event-enricher/pom.xml b/user-event-enricher/pom.xml index 0be5452..d8d6cef 100644 --- a/user-event-enricher/pom.xml +++ b/user-event-enricher/pom.xml @@ -14,7 +14,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java index 180aae6..314e1fd 100644 --- a/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java +++ b/user-event-enricher/src/main/java/com/github/simplesteph/udemy/kafka/streams/UserEventEnricherApp.java @@ -3,10 +3,10 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.GlobalKTable; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import java.util.Properties; @@ -20,7 +20,7 @@ public static void main(String[] args) { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); // we get a global table out of Kafka. This table will be replicated on each Kafka Streams application // the key of our globalKTable is the user ID @@ -55,12 +55,12 @@ public static void main(String[] args) { userPurchasesEnrichedLeftJoin.to("user-purchases-enriched-left-join"); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.cleanUp(); // only do this in dev - not in prod streams.start(); // print the topology - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); // shutdown hook to correctly close the streams application Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); diff --git a/word-count/.gitignore b/word-count/.gitignore new file mode 100644 index 0000000..ae3c172 --- /dev/null +++ b/word-count/.gitignore @@ -0,0 +1 @@ +/bin/ diff --git a/word-count/pom.xml b/word-count/pom.xml index 8e0f93f..ea7b6d6 100644 --- a/word-count/pom.xml +++ b/word-count/pom.xml @@ -15,7 +15,7 @@ org.apache.kafka kafka-streams - 0.11.0.0 + 1.0.0 diff --git a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java index 21f64a9..2cdbf20 100644 --- a/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java +++ b/word-count/src/main/java/com/github/simplesteph/udemy/kafka/streams/WordCountApp.java @@ -6,10 +6,12 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.kstream.Produced; public class WordCountApp { public static void main(String[] args) { @@ -20,7 +22,7 @@ public static void main(String[] args) { config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - KStreamBuilder builder = new KStreamBuilder(); + StreamsBuilder builder = new StreamsBuilder(); // 1 - stream from Kafka KStream textLines = builder.stream("word-count-input"); @@ -36,18 +38,21 @@ public static void main(String[] args) { // 5 - group by key before aggregation .groupByKey() // 6 - count occurences - .count("Counts"); + .count(Materialized.as("Counts")); // 7 - to in order to write the results back to kafka - wordCounts.to(Serdes.String(), Serdes.Long(), "word-count-output"); + wordCounts.toStream().to("word-count-output", Produced.with(Serdes.String(), Serdes.Long())); - KafkaStreams streams = new KafkaStreams(builder, config); + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); + // shutdown hook to correctly close the streams application + Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); + // Update: // print the topology every 10 seconds for learning purposes while(true){ - System.out.println(streams.toString()); + streams.localThreadsMetadata().forEach(data -> System.out.println(data)); try { Thread.sleep(5000); } catch (InterruptedException e) { @@ -55,7 +60,6 @@ public static void main(String[] args) { } } - // shutdown hook to correctly close the streams application - Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); + } }