Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ Not all methods have been implemented. Please check [store types method support
##### keyValueStore
Using defaults, for a state store named "my-kv-store" following CQL Schema applies:
```sql
CREATE TABLE IF NOT EXISTS my_kv_store_kstreams_store (
CREATE TABLE IF NOT EXISTS my_kv_kstreams_store (
partition int,
key blob,
time timestamp,
Expand All @@ -279,14 +279,28 @@ CREATE TABLE IF NOT EXISTS my_kv_store_kstreams_store (
##### globalKeyValueStore
Using defaults, for a state store named "global-kv-store" following CQL Schema applies:
```sql
CREATE TABLE IF NOT EXISTS global_kv_store_kstreams_store (
CREATE TABLE IF NOT EXISTS global_kv_kstreams_store (
key blob,
time timestamp,
value blob,
PRIMARY KEY (key)
) WITH compaction = { 'class' : 'LeveledCompactionStrategy' }
```

##### windowStore
Using defaults, for a state store named "global-kv-store" following CQL Schema applies:
```sql
CREATE TABLE IF NOT EXISTS some_window_kstreams_store (
partition int,
start_time bigint,
end_time bigint,
key blob,
time timestamp,
value blob,
PRIMARY KEY ((partition, start_time), end_time, key)
) WITH compaction = { 'class' : 'LeveledCompactionStrategy' }
```

#### Feat: Cassandra table with default TTL

💡 **Tip:** Cassandra has a table option `default_time_to_live` (default expiration time (“TTL”) in seconds for a table) which can be useful for certain use cases where data (state) can or should expire.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.thriving.oss.kafka.streams.cassandra.state.store;

import com.datastax.oss.driver.api.core.CqlSession;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;
import org.rnorth.ducttape.unreliables.Unreliables;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.assertj.core.api.Assertions.assertThat;

/**
* End-to-end integration test that demonstrates how to perform a join between two KStreams.
*
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class StreamToStreamJoinIntegrationTest extends AbstractIntegrationTest {

private static final String adImpressionsTopic = "adImpressions";
private static final String adClicksTopic = "adClicks";
private static final String outputTopic = "output-topic";

@Test
public void shouldJoinTwoStreams() throws ExecutionException, InterruptedException, TimeoutException {
// Input 1: Ad impressions
final List<KeyValue<String, String>> inputAdImpressions = Arrays.asList(
new KeyValue<>("car-advertisement", "shown"),
new KeyValue<>("newspaper-advertisement", "shown"),
new KeyValue<>("gadget-advertisement", "shown")
);

// Input 2: Ad clicks
final List<KeyValue<String, String>> inputAdClicks = Arrays.asList(
new KeyValue<>("newspaper-advertisement", "clicked"),
new KeyValue<>("gadget-advertisement", "clicked"),
new KeyValue<>("newspaper-advertisement", "clicked")
);

final List<KeyValue<String, String>> expectedResults = Arrays.asList(
new KeyValue<>("car-advertisement", "shown/not-clicked-yet"),
new KeyValue<>("newspaper-advertisement", "shown/not-clicked-yet"),
new KeyValue<>("gadget-advertisement", "shown/not-clicked-yet"),
new KeyValue<>("newspaper-advertisement", "shown/clicked"),
new KeyValue<>("gadget-advertisement", "shown/clicked"),
new KeyValue<>("newspaper-advertisement", "shown/clicked")
);

// configure and start the processor topology.
final Serde<String> stringSerde = Serdes.String();
final Properties props = getStreamsProperties();

// when
try (
final AdminClient adminClient = initAdminClient();
final KafkaProducer<String, String> producer = initProducer(stringSerde, stringSerde);
final KafkaConsumer<String, String> consumer = initConsumer(stringSerde, stringSerde);
final CqlSession session = initSession();
final KafkaStreams streams = initStreams(props, session)
) {
// setup input and output topics.
Collection<NewTopic> topics = Arrays.asList(
new NewTopic(adImpressionsTopic, 6, (short) 1),
new NewTopic(adClicksTopic, 6, (short) 1),
new NewTopic(outputTopic, 3, (short) 1)
);
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

consumer.subscribe(Collections.singletonList(outputTopic));

// start streams.
streams.start();

// produce some input data to the input topics.
inputAdImpressions.forEach(it -> {
try {
producer.send(new ProducerRecord<>(adImpressionsTopic, it.key, it.value)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});
inputAdClicks.forEach(it -> {
try {
producer.send(new ProducerRecord<>(adClicksTopic, it.key, it.value)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

// consume and collect streams output
final List<KeyValue<String, String>> results = new ArrayList<>();
Unreliables.retryUntilTrue(600, TimeUnit.SECONDS, () -> {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
records.iterator().forEachRemaining(record -> results.add(KeyValue.pair(record.key(), record.value())));

return results.size() >= expectedResults.size();
}
);

// then verify the application's output data.
assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
}
}

// note: adapted from https://github.com/confluentinc/kafka-streams-examples/blob/v7.5.0-148/src/test/java/io/confluent/examples/streams/StreamToStreamJoinIntegrationTest.java
@NotNull
private KafkaStreams initStreams(Properties streamsConfiguration, CqlSession session) {
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, String> alerts = builder.stream(adImpressionsTopic);
final KStream<String, String> incidents = builder.stream(adClicksTopic);

// In this example, we opt to perform an OUTER JOIN between the two streams. We picked this
// join type to show how the Streams API will send further join updates downstream whenever,
// for the same join key (e.g. "newspaper-advertisement"), we receive an update from either of
// the two joined streams during the defined join window.
Duration joinTimeDifference = Duration.ofSeconds(5);
Duration windowDuration = joinTimeDifference.multipliedBy(2);
long DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD = 24 * 60 * 60 * 1000L;
long retentionPeriod = DEPRECATED_DEFAULT_24_HR_GRACE_PERIOD;
final KStream<String, String> impressionsAndClicks = alerts.outerJoin(
incidents,
(impressionValue, clickValue) ->
(clickValue == null)? impressionValue + "/not-clicked-yet": impressionValue + "/" + clickValue,
// KStream-KStream joins are always windowed joins, hence we must provide a join window.
JoinWindows.of(joinTimeDifference),
// In this specific example, we don't need to define join serdes explicitly because the key, left value, and
// right value are all of type String, which matches our default serdes configured for the application. However,
// we want to showcase the use of `StreamJoined.with(...)` in case your code needs a different type setup.
StreamJoined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */
Serdes.String() /* right value */
)
.withThisStoreSupplier(
CassandraStores.builder(session, "store-this")
.withKeyspace(CASSANDRA_KEYSPACE)
.windowBytesStore(retentionPeriod, -1, windowDuration.toMillis(), true)
)
.withOtherStoreSupplier(
CassandraStores.builder(session, "store-other")
.withKeyspace(CASSANDRA_KEYSPACE)
.windowBytesStore(retentionPeriod, -1, windowDuration.toMillis(), true)
)
.withLoggingDisabled()
);

// Write the results to the output topic.
impressionsAndClicks.to(outputTopic);

return new KafkaStreams(builder.build(), streamsConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package dev.thriving.oss.kafka.streams.cassandra.state.store;

import com.datastax.oss.driver.api.core.CqlSession;
import dev.thriving.oss.kafka.streams.cassandra.state.store.utils.IntegrationTestUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStore;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Test;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.kafka.streams.StoreQueryParameters.fromNameAndType;

/**
* Demonstrates how to validate an application's expected state through interactive queries.
* <p>
* Note: This example uses lambda expressions and thus works with Java 8+ only.
*/
public class ValidateStateWithInteractiveQueriesLambdaIntegrationTest extends AbstractIntegrationTest {

private static final String inputTopic = "inputTopic";
private static final String STORE_NAME_MAX = "max-store";
private static final String STORE_NAME_MAX_WINDOW = "max-window-store";

@Test
public void shouldCalculateMaxClicksPerUser() throws ExecutionException, InterruptedException, TimeoutException {
// input: A user may be listed multiple times.
final List<KeyValue<String, Long>> inputUserClicks = Arrays.asList(
new KeyValue<>("alice", 13L),
new KeyValue<>("bob", 4L),
new KeyValue<>("chao", 25L),
new KeyValue<>("bob", 19L),
new KeyValue<>("chao", 56L),
new KeyValue<>("alice", 78L),
new KeyValue<>("alice", 40L),
new KeyValue<>("bob", 3L)
);

final Map<String, Long> expectedMaxClicksPerUser = new HashMap<String, Long>() {
{
put("alice", 78L);
put("bob", 19L);
put("chao", 56L);
}
};

// configure and start the processor topology.
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
final Properties props = getStreamsProperties();

// The commit interval for flushing records to state stores and downstream must be lower than
// this integration test's timeout (30 secs) to ensure we observe the expected processing results.
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2 * 1000);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Long().getClass().getName());

// when
try (
final AdminClient adminClient = initAdminClient();
final KafkaProducer<String, Long> producer = initProducer(stringSerde, longSerde);
final CqlSession session = initSession();
final KafkaStreams streams = initStreams(props, session)
) {
// setup input and output topics.
Collection<NewTopic> topics = List.of(
new NewTopic(inputTopic, 6, (short) 1)
);
adminClient.createTopics(topics).all().get(30, TimeUnit.SECONDS);

// start streams.
streams.start();

// produce some input data to the input topics.
inputUserClicks.forEach(it -> {
try {
producer.send(new ProducerRecord<>(inputTopic, it.key, it.value)).get();
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
});

// then verify the application's output data.
final ReadOnlyKeyValueStore<String, Long> keyValueStore =
streams.store(fromNameAndType(STORE_NAME_MAX, QueryableStoreTypes.keyValueStore()));

final ReadOnlyWindowStore<String, Long> windowStore =
streams.store(fromNameAndType(STORE_NAME_MAX_WINDOW, QueryableStoreTypes.windowStore()));

IntegrationTestUtils.assertThatKeyValueStoreContains(keyValueStore, expectedMaxClicksPerUser);
IntegrationTestUtils.assertThatOldestWindowContains(windowStore, expectedMaxClicksPerUser);
}
}

// note: adapted from https://github.com/confluentinc/kafka-streams-examples/blob/v7.5.0-148/src/test/java/io/confluent/examples/streams/ValidateStateWithInteractiveQueriesLambdaIntegrationTest.java
@NotNull
private KafkaStreams initStreams(Properties streamsConfiguration, CqlSession session) {
final StreamsBuilder builder = new StreamsBuilder();

final KStream<String, Long> stream = builder.stream(inputTopic);

// rolling MAX() aggregation
final String maxStore = STORE_NAME_MAX;
stream.groupByKey().aggregate(
() -> Long.MIN_VALUE,
(aggKey, value, aggregate) -> Math.max(value, aggregate),
Materialized.as(maxStore)
);

// windowed MAX() aggregation
final String maxWindowStore = STORE_NAME_MAX_WINDOW;
stream.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1L)).grace(Duration.ZERO))
.aggregate(
() -> Long.MIN_VALUE,
(aggKey, value, aggregate) -> Math.max(value, aggregate),
Materialized.<String, Long, WindowStore<Bytes, byte[]>>as(maxWindowStore).withRetention(Duration.ofMinutes(5L)));

return new KafkaStreams(builder.build(), streamsConfiguration);
}
}
Loading