Skip to content

Commit ad982ac

Browse files
committed
refactor(clients): remove unnecessary object classes
1 parent 10f4f8b commit ad982ac

File tree

8 files changed

+20
-31
lines changed

8 files changed

+20
-31
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ val consumerWorker: ConsumerWorker<String, String> = kafka("localhost:9092") {
163163
autoOffsetReset(AutoOffsetReset.Earliest)
164164
}
165165

166-
onDeserializationError(silentlyReplaceWithNull())
166+
onDeserializationError(replaceWithNullOnInvalidRecord())
167167

168168
onPartitionsAssigned { _: Consumer<*, *>, partitions ->
169169
println("Partitions assigned: $partitions")

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/KafkaConsumerWorker.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,9 @@ package io.streamthoughts.kafka.clients.consumer
2020

2121
import io.streamthoughts.kafka.clients.consumer.KafkaConsumerWorker.KafkaConsumerWorker
2222
import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandler
23-
import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandlers.closeTaskOnConsumedError
23+
import io.streamthoughts.kafka.clients.consumer.error.closeTaskOnConsumedError
2424
import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandler
25-
import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandlers
25+
import io.streamthoughts.kafka.clients.consumer.error.serialization.logAndFailOnInvalidRecord
2626
import io.streamthoughts.kafka.clients.consumer.listener.ConsumerBatchRecordsListener
2727
import io.streamthoughts.kafka.clients.consumer.listener.noop
2828
import io.streamthoughts.kafka.clients.loggerFor
@@ -209,7 +209,7 @@ class KafkaConsumerWorker<K, V> (
209209
SimpleConsumerAwareRebalanceListener(),
210210
batchRecordListener ?: noop(),
211211
onConsumedError ?: closeTaskOnConsumedError(),
212-
onDeserializationError ?: DeserializationErrorHandlers.logAndFail(),
212+
onDeserializationError ?: logAndFailOnInvalidRecord(),
213213
consumerFactory ?: ConsumerFactory.DefaultConsumerFactory
214214
)
215215

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/error/ConsumedErrorHandlers.kt

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,8 @@ import org.slf4j.Logger
2727
import java.time.Duration
2828
import kotlin.math.max
2929

30-
31-
object ConsumedErrorHandlers {
32-
fun closeTaskOnConsumedError(): ConsumedErrorHandler = CloseTaskOnConsumedError
33-
fun logAndCommitOnConsumedError(): ConsumedErrorHandler = LogAndCommitOnConsumedError
34-
}
30+
fun closeTaskOnConsumedError(): ConsumedErrorHandler = CloseTaskOnConsumedError
31+
fun logAndCommitOnConsumedError(): ConsumedErrorHandler = LogAndCommitOnConsumedError
3532

3633
/**
3734
* Stops the [ConsumerTask] when an error is thrown while a non-empty batch of [ConsumerRecord] is being processed

clients/src/main/kotlin/io/streamthoughts/kafka/clients/consumer/error/serialization/DeserializationErrorHandlers.kt

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,16 +21,13 @@ package io.streamthoughts.kafka.clients.consumer.error.serialization
2121
import io.streamthoughts.kafka.clients.loggerFor
2222
import org.apache.kafka.clients.consumer.ConsumerRecord
2323

24-
object DeserializationErrorHandlers {
24+
fun <K, V> replaceWithOnInvalidRecord(key: K, value: V): DeserializationErrorHandler<K, V> = ReplaceErrorHandler(key, value)
2525

26-
fun <K, V> silentlyReplaceWith(key: K, value: V): DeserializationErrorHandler<K, V> = ReplaceErrorHandler(key, value)
26+
fun <K, V> replaceWithNullOnInvalidRecord(): DeserializationErrorHandler<K, V> = ReplaceErrorHandler()
2727

28-
fun <K, V> silentlyReplaceWithNull(): DeserializationErrorHandler<K, V> = ReplaceErrorHandler()
28+
fun <K, V> logAndFailOnInvalidRecord(): DeserializationErrorHandler<K, V> = LogAndFailErrorHandler()
2929

30-
fun <K, V> logAndFail(): DeserializationErrorHandler<K, V> = LogAndSkipErrorHandler()
31-
32-
fun <K, V> logAndSkip(): DeserializationErrorHandler<K, V> = LogAndFailErrorHandler()
33-
}
30+
fun <K, V> logAndSkipOnInvalidRecord(): DeserializationErrorHandler<K, V> = LogAndSkipErrorHandler()
3431

3532
private class ReplaceErrorHandler<K, V>(
3633
private val key: K? = null,

clients/src/test/kotlin/io/streamthoughts/kafka/clients/consumer/KafkaConsumerConfigsTest.kt

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
*/
1919
package io.streamthoughts.kafka.clients.consumer
2020

21-
import io.streamthoughts.kafka.clients.Kafka
22-
import io.streamthoughts.kafka.clients.KafkaClientConfigs
2321
import org.apache.kafka.clients.consumer.ConsumerConfig
2422
import org.apache.kafka.common.serialization.StringDeserializer
2523
import org.junit.jupiter.api.Assertions.assertEquals
@@ -29,9 +27,6 @@ import org.junit.jupiter.api.TestInstance
2927
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
3028
class KafkaConsumerConfigsTest {
3129

32-
private val kafka = Kafka(bootstrapServers = arrayOf("dummy:1234"))
33-
private val client = KafkaClientConfigs(kafka = kafka).clientId("clientId")
34-
3530
@Test
3631
fun should_return_valid_kafka_consumer_config() {
3732
val configs: KafkaConsumerConfigs = consumerConfigsOf()

clients/src/test/kotlin/io/streamthoughts/kafka/clients/consumer/KafkaConsumerTaskTest.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,8 @@
1919
package io.streamthoughts.kafka.clients.consumer
2020

2121
import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandler
22-
import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandlers
23-
import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandlers
22+
import io.streamthoughts.kafka.clients.consumer.error.closeTaskOnConsumedError
23+
import io.streamthoughts.kafka.clients.consumer.error.serialization.logAndFailOnInvalidRecord
2424
import io.streamthoughts.kafka.clients.consumer.listener.ConsumerBatchRecordsListener
2525
import io.streamthoughts.kafka.clients.loggerFor
2626
import io.streamthoughts.kafka.clients.producer.Acks
@@ -121,7 +121,7 @@ class KafkaConsumerTaskTest(private val cluster: TestingEmbeddedKafka) {
121121

122122
val consumer = createConsumerFor(
123123
listener = failingListener,
124-
consumedErrorHandler = ConsumedErrorHandlers.closeTaskOnConsumedError()
124+
consumedErrorHandler = closeTaskOnConsumedError()
125125
)
126126
runBlocking {
127127
val job = GlobalScope.launch { consumer.run() }
@@ -147,7 +147,7 @@ class KafkaConsumerTaskTest(private val cluster: TestingEmbeddedKafka) {
147147
}
148148

149149
private fun createConsumerFor(listener: ConsumerBatchRecordsListener<String, String>,
150-
consumedErrorHandler: ConsumedErrorHandler = ConsumedErrorHandlers.closeTaskOnConsumedError()
150+
consumedErrorHandler: ConsumedErrorHandler = closeTaskOnConsumedError()
151151
): KafkaConsumerTask<String, String> {
152152
return KafkaConsumerTask<String, String>(
153153
consumerFactory = ConsumerFactory.DefaultConsumerFactory,
@@ -157,7 +157,7 @@ class KafkaConsumerTaskTest(private val cluster: TestingEmbeddedKafka) {
157157
valueDeserializer = StringDeserializer(),
158158
listener = listener,
159159
clientId = "test-client",
160-
deserializationErrorHandler = DeserializationErrorHandlers.logAndFail(),
160+
deserializationErrorHandler = logAndFailOnInvalidRecord(),
161161
consumedErrorHandler = consumedErrorHandler
162162
)
163163
}

clients/src/test/kotlin/io/streamthoughts/kafka/clients/producer/KafkaProducerContainerTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import org.junit.jupiter.api.TestInstance
3636
import org.junit.jupiter.api.extension.ExtendWith
3737
import org.slf4j.Logger
3838
import java.time.Duration
39-
import java.util.*
39+
import java.util.Properties
4040

4141
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
4242
@ExtendWith(EmbeddedSingleNodeKafkaCluster::class)

examples/src/main/kotlin/io/streamthoughts/kafka/client/examples/ConsumerKotlinDSLExample.kt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ package io.streamthoughts.kafka.client.examples
2121
import io.streamthoughts.kafka.clients.consumer.AutoOffsetReset
2222
import io.streamthoughts.kafka.clients.consumer.ConsumerTask
2323
import io.streamthoughts.kafka.clients.consumer.ConsumerWorker
24-
import io.streamthoughts.kafka.clients.consumer.error.ConsumedErrorHandlers
25-
import io.streamthoughts.kafka.clients.consumer.error.serialization.DeserializationErrorHandlers
24+
import io.streamthoughts.kafka.clients.consumer.error.closeTaskOnConsumedError
25+
import io.streamthoughts.kafka.clients.consumer.error.serialization.replaceWithNullOnInvalidRecord
2626
import io.streamthoughts.kafka.clients.consumer.listener.forEach
2727
import io.streamthoughts.kafka.clients.kafka
2828
import kotlinx.coroutines.delay
@@ -47,9 +47,9 @@ fun main(args: Array<String>) {
4747
autoOffsetReset(AutoOffsetReset.Earliest)
4848
}
4949

50-
onDeserializationError(DeserializationErrorHandlers.silentlyReplaceWithNull())
50+
onDeserializationError(replaceWithNullOnInvalidRecord())
5151

52-
onConsumedError(ConsumedErrorHandlers.closeTaskOnConsumedError())
52+
onConsumedError(closeTaskOnConsumedError())
5353

5454
onPartitionsAssigned { _: Consumer<*, *>, partitions ->
5555
println("Partitions assigned: $partitions")

0 commit comments

Comments
 (0)