@@ -50,6 +50,8 @@ import java.util.concurrent.TimeUnit
5050import kotlin.test.assertEquals
5151import kotlin.time.Duration.Companion.seconds
5252
53+ private val testIterations: Int =
54+ System .getProperties().getProperty(" io.github.nomisrev.kafka.TEST_ITERATIONS" )?.toIntOrNull() ? : 1
5355
5456@TestInstance(TestInstance .Lifecycle .PER_CLASS )
5557abstract class KafkaSpec {
@@ -63,7 +65,7 @@ abstract class KafkaSpec {
6365 fun destroy () {
6466 kafka.stop()
6567 }
66-
68+
6769 @BeforeAll
6870 @JvmStatic
6971 fun setup () {
@@ -86,10 +88,10 @@ abstract class KafkaSpec {
8688 withEnv(" KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND" , " true" )
8789 withReuse(true )
8890 }
89-
91+
9092 fun KafkaReceiver (): KafkaReceiver <String , String > =
9193 KafkaReceiver (receiverSetting())
92-
94+
9395 fun receiverSetting (): ReceiverSettings <String , String > =
9496 ReceiverSettings (
9597 bootstrapServers = kafka.bootstrapServers,
@@ -169,15 +171,17 @@ abstract class KafkaSpec {
169171 partitions : Int = 4,
170172 replicationFactor : Short = 1,
171173 test : suspend TopicTestScope .(NewTopic ) -> Unit
172- ): Unit = runTest {
173- val topic = NewTopic (nextTopicName(), partitions, replicationFactor).configs(topicConfig)
174- admin {
175- createTopic(topic)
176- try {
177- TopicTestScope (topic, this @runTest).test(topic)
178- } finally {
179- topic.shouldBeEmpty()
180- deleteTopic(topic.name())
174+ ): Unit = repeat(testIterations) {
175+ runTest {
176+ val topic = NewTopic (nextTopicName(), partitions, replicationFactor).configs(topicConfig)
177+ admin {
178+ createTopic(topic)
179+ try {
180+ TopicTestScope (topic, this @runTest).test(topic)
181+ } finally {
182+ topic.shouldBeEmpty()
183+ deleteTopic(topic.name())
184+ }
181185 }
182186 }
183187 }
@@ -298,6 +302,7 @@ abstract class KafkaSpec {
298302 object : Producer <String , String > {
299303 override fun clientInstanceId (p0 : Duration ? ): Uuid =
300304 producer.clientInstanceId(p0)
305+
301306 override fun close () {}
302307
303308 override fun close (timeout : Duration ? ) {}
0 commit comments