Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,10 @@
target/
.idea/
*.iml
.DS_STORE
.DS_STORE
/.classpath
*/.classpath
/.project
*/.project
*/.settings
bin/
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ Happy learning!
- Word Count to learn the basic API
- Favourite Colour for a more advanced example (`Scala` version included)
- Bank Balance to demonstrate exactly once semantics
- User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
- User Event matcher to learn about joins between `KStream` and `GlobalKTable`.
4 changes: 2 additions & 2 deletions bank-balance-exactly-once/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,15 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.1</version>
<version>2.0.1</version>
</dependency>

<!--to write the kafka producer-->
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.1</version>
<version>2.0.1</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,14 @@
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.json.JsonSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;

import java.time.Instant;
import java.util.Properties;
Expand All @@ -33,18 +34,17 @@ public static void main(String[] args) {

// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);

StreamsBuilder builder = new StreamsBuilder();


KStreamBuilder builder = new KStreamBuilder();

KStream<String, JsonNode> bankTransactions =
builder.stream(Serdes.String(), jsonSerde, "bank-transactions");

KStream<String, JsonNode> bankTransactions = builder.stream("bank-transactions",
Consumed.with(Serdes.String(), jsonSerde));


// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
Expand All @@ -53,22 +53,23 @@ public static void main(String[] args) {
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

KTable<String, JsonNode> bankBalance = bankTransactions
.groupByKey(Serdes.String(), jsonSerde)
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
Materialized.<String, JsonNode, KeyValueStore<Bytes, byte[]>>as("bank-balance-agg")
.withKeySerde(Serdes.String())
.withValueSerde(jsonSerde)
);

bankBalance.to(Serdes.String(), jsonSerde,"bank-balance-exactly-once");
bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));

KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
2 changes: 1 addition & 1 deletion favourite-colour-java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<version>2.0.1</version>
</dependency>


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import java.util.Properties;
import java.util.Arrays;
import java.util.stream.Collectors;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.state.KeyValueStore;

public class FavouriteColourApp {

Expand All @@ -26,8 +30,7 @@ public static void main(String[] args) {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

KStreamBuilder builder = new KStreamBuilder();

StreamsBuilder builder = new StreamsBuilder();
// Step 1: We create the topic of users keys to colours
KStream<String, String> textLines = builder.stream("favourite-colour-input");

Expand All @@ -43,25 +46,30 @@ public static void main(String[] args) {

usersAndColours.to("user-keys-and-colours");

Serde<String> stringSerde = Serdes.String();
Serde<Long> longSerde = Serdes.Long();

// step 2 - we read that topic as a KTable so that updates are read correctly
KTable<String, String> usersAndColoursTable = builder.table("user-keys-and-colours");

// step 3 - we count the occurences of colours
KTable<String, Long> favouriteColours = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user, colour) -> new KeyValue<>(colour, colour))
.count("CountsByColours");
.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("CountsByColours")
.withKeySerde(stringSerde)
.withValueSerde(longSerde));

// 6 - we output the results to a Kafka Topic - don't forget the serializers
favouriteColours.to(Serdes.String(), Serdes.Long(),"favourite-colour-output");
favouriteColours.toStream().to("favourite-colour-output", Produced.with(Serdes.String(),Serdes.Long()));

KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
// only do this in dev - not in prod
streams.cleanUp();
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
9 changes: 6 additions & 3 deletions favourite-colour-scala/build.sbt
Original file line number Diff line number Diff line change
@@ -1,16 +1,19 @@
name := "favourite-colour-scala"
organization := "com.github.simplesteph.udemy.kafka.streams"
version := "1.0-SNAPSHOT"
version := "2.0.1-SNAPSHOT"
scalaVersion := "2.12.3"

// needed to resolve weird dependency
libraryDependencies += "javax.ws.rs" % "javax.ws.rs-api" % "2.1" artifacts(
Artifact("javax.ws.rs-api", "jar", "jar"))

// https://mvnrepository.com/artifact/org.apache.kafka/kafka-streams
libraryDependencies ++= Seq(
"org.apache.kafka" % "kafka-streams" % "0.11.0.0",
"org.apache.kafka" % "kafka-streams" % "2.0.1",
"org.slf4j" % "slf4j-api" % "1.7.25",
"org.slf4j" % "slf4j-log4j12" % "1.7.25"
)


// leverage java 8
javacOptions ++= Seq("-source", "1.8", "-target", "1.8", "-Xlint")
scalacOptions := Seq("-target:jvm-1.8")
Expand Down
8 changes: 8 additions & 0 deletions favourite-colour-scala/project/PackagingTypePlugin.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import sbt._

object PackagingTypePlugin extends AutoPlugin {
override val buildSettings = {
sys.props += "packaging.type" -> "jar"
Nil
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import java.lang
import java.util.Properties

import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable}
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsConfig}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.kstream._
import org.apache.kafka.streams.state.KeyValueStore
import org.apache.kafka.streams.{KafkaStreams, KeyValue, StreamsBuilder, StreamsConfig}

object FavouriteColourAppScala {
def main(args: Array[String]): Unit = {
Expand All @@ -21,7 +23,7 @@ object FavouriteColourAppScala {
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0")

val builder: KStreamBuilder = new KStreamBuilder
val builder: StreamsBuilder = new StreamsBuilder

// Step 1: We create the topic of users keys to colours
val textLines: KStream[String, String] = builder.stream[String, String]("favourite-colour-input")
Expand All @@ -32,7 +34,9 @@ object FavouriteColourAppScala {
// 2 - we select a key that will be the user id (lowercase for safety)
.selectKey[String]((key: String, value: String) => value.split(",")(0).toLowerCase)
// 3 - we get the colour from the value (lowercase for safety)
.mapValues[String]((value: String) => value.split(",")(1).toLowerCase)
.mapValues[String](new ValueMapper[String, String] {
override def apply(value: String): String = { value.split(",")(1).toLowerCase }
})
// 4 - we filter undesired colours (could be a data sanitization step)
.filter((user: String, colour: String) => List("green", "blue", "red").contains(colour))

Expand All @@ -42,21 +46,29 @@ object FavouriteColourAppScala {
// step 2 - we read that topic as a KTable so that updates are read correctly
val usersAndColoursTable: KTable[String, String] = builder.table(intermediaryTopic)

val stringSerde: Serde[String] = Serdes.String
val longSerde: Serde[lang.Long] = Serdes.Long

// step 3 - we count the occurences of colours
val favouriteColours: KTable[String, lang.Long] = usersAndColoursTable
// 5 - we group by colour within the KTable
.groupBy((user: String, colour: String) => new KeyValue[String, String](colour, colour))
.count("CountsByColours")
.groupBy(
(user: String, colour: String) => new KeyValue[String, String](colour, colour),
Serialized.`with`(stringSerde, stringSerde)
)
.count(Materialized.as[String, lang.Long, KeyValueStore[Bytes, Array[Byte]]]("CountsByColours")
.withKeySerde(stringSerde)
.withValueSerde(longSerde))

// 6 - we output the results to a Kafka Topic - don't forget the serializers
favouriteColours.to(Serdes.String, Serdes.Long, "favourite-colour-output-scala")
favouriteColours.toStream.to("favourite-colour-output-scala", Produced.`with`(stringSerde, longSerde))

val streams: KafkaStreams = new KafkaStreams(builder, config)
val streams: KafkaStreams = new KafkaStreams(builder.build(), config)
streams.cleanUp()
streams.start()

// print the topology
System.out.println(streams.toString)
streams.localThreadsMetadata().forEach(t => System.out.print(t.toString))

// shutdown hook to correctly close the streams application
Runtime.getRuntime.addShutdownHook(new Thread {
Expand Down
2 changes: 1 addition & 1 deletion streams-starter-project/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<version>2.0.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;

Expand All @@ -20,18 +20,18 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
StreamsBuilder builder = new StreamsBuilder();

KStream<String, String> kStream = builder.stream("input-topic-name");
// do stuff
kStream.to("word-count-output");

KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
2 changes: 1 addition & 1 deletion user-event-enricher/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.11.0.0</version>
<version>2.0.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;

import java.util.Properties;

Expand All @@ -20,7 +20,7 @@ public static void main(String[] args) {
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

KStreamBuilder builder = new KStreamBuilder();
StreamsBuilder builder = new StreamsBuilder();

// we get a global table out of Kafka. This table will be replicated on each Kafka Streams application
// the key of our globalKTable is the user ID
Expand Down Expand Up @@ -55,12 +55,12 @@ public static void main(String[] args) {
userPurchasesEnrichedLeftJoin.to("user-purchases-enriched-left-join");


KafkaStreams streams = new KafkaStreams(builder, config);
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp(); // only do this in dev - not in prod
streams.start();

// print the topology
System.out.println(streams.toString());
streams.localThreadsMetadata().forEach(data -> System.out.println(data));

// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
Expand Down
1 change: 1 addition & 0 deletions word-count/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
/bin/
Loading