@@ -7,7 +7,7 @@ import java.util.concurrent.atomic.AtomicBoolean
77import com .github .mmolimar .ksql .jdbc .KsqlEntityHeaders ._
88import com .github .mmolimar .ksql .jdbc .embedded .{EmbeddedKafkaCluster , EmbeddedKsqlEngine , EmbeddedZookeeperServer }
99import com .github .mmolimar .ksql .jdbc .utils .TestUtils
10- import org .apache .kafka .clients .producer .ProducerRecord
10+ import org .apache .kafka .clients .producer .{ KafkaProducer , ProducerRecord }
1111import org .scalatest ._
1212
1313class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAfterAll {
@@ -16,14 +16,14 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
1616 val kafkaCluster = new EmbeddedKafkaCluster (zkServer.getConnection)
1717 val ksqlEngine = new EmbeddedKsqlEngine (kafkaCluster.getBrokerList)
1818
19- lazy val kafkaProducer = TestUtils .buildProducer(kafkaCluster.getBrokerList)
19+ lazy val kafkaProducer : KafkaProducer [ Array [ Byte ], Array [ Byte ]] = TestUtils .buildProducer(kafkaCluster.getBrokerList)
2020
2121 val ksqlUrl = s " jdbc:ksql://localhost: ${ksqlEngine.getPort}?timeout=20000 "
2222 var ksqlConnection : Connection = _
23- val topic = TestUtils .randomString()
23+ val topic : String = TestUtils .randomString()
2424
2525 val stop = new AtomicBoolean (false )
26- val producerThread = new BackgroundOps (stop, () => produceMessages)
26+ val producerThread = new BackgroundOps (stop, () => produceMessages() )
2727
2828 " A KsqlConnection" when {
2929
@@ -35,24 +35,24 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
3535 " create the table properly" in {
3636 val resultSet = createTestTableOrStream(table)
3737 resultSet.next should be(true )
38- resultSet.getString(commandStatusEntity( 0 ) .name) should be(" TABLE" )
38+ resultSet.getString(commandStatusEntity.head .name) should be(" TABLE" )
3939 resultSet.getString(commandStatusEntity(1 ).name) should be(table.toUpperCase)
4040 resultSet.getString(commandStatusEntity(2 ).name) should be(" CREATE" )
4141 resultSet.getString(commandStatusEntity(3 ).name) should be(" SUCCESS" )
4242 resultSet.getString(commandStatusEntity(4 ).name) should be(" Table created" )
4343 resultSet.next should be(false )
44- resultSet.close
44+ resultSet.close()
4545 }
4646
4747 " list the table already created" in {
4848 val resultSet = ksqlConnection.createStatement.executeQuery(s " SHOW TABLES " )
4949 resultSet.next should be(true )
50- resultSet.getString(tablesListEntity( 0 ) .name) should be(table.toUpperCase)
50+ resultSet.getString(tablesListEntity.head .name) should be(table.toUpperCase)
5151 resultSet.getString(tablesListEntity(1 ).name) should be(topic)
5252 resultSet.getString(tablesListEntity(2 ).name) should be(" JSON" )
5353 resultSet.getBoolean(tablesListEntity(3 ).name) should be(false )
5454 resultSet.next should be(false )
55- resultSet.close
55+ resultSet.close()
5656 }
5757
5858 " be able to get the execution plan for a query in a table" in {
@@ -61,17 +61,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
6161 resultSet.getString(queryDescriptionEntity(1 ).name) should be(" ROWTIME, ROWKEY, FIELD1, FIELD2, FIELD3" )
6262 resultSet.getString(queryDescriptionEntity(2 ).name) should be(table.toUpperCase)
6363 resultSet.next should be(false )
64- resultSet.close
64+ resultSet.close()
6565 }
6666
6767 " be able to query all fields in the table" in {
6868 var counter = 0
6969 val statement = ksqlConnection.createStatement
7070 statement.setMaxRows(maxRecords)
71+ statement.getMoreResults(1 ) should be(false )
7172 val resultSet = statement.executeQuery(s " SELECT * FROM $table" )
73+ statement.getMoreResults(1 ) should be(true )
7274 while (resultSet.next) {
7375 resultSet.getLong(1 ) should not be (- 1 )
74- Option (resultSet.getString(2 )) should not be ( None )
76+ Option (resultSet.getString(2 )) should not be None
7577 resultSet.getInt(3 ) should be(123 )
7678 resultSet.getDouble(4 ) should be(45.4 )
7779 resultSet.getString(5 ) should be(" lorem ipsum" )
@@ -81,9 +83,10 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
8183 counter += 1
8284 }
8385 counter should be(maxRecords)
86+ statement.getMoreResults() should be(false )
8487
85- resultSet.close
86- statement.close
88+ resultSet.close()
89+ statement.close()
8790
8891 val metadata = resultSet.getMetaData
8992 metadata.getColumnCount should be(5 )
@@ -168,19 +171,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
168171 resultSet.getString(sourceDescriptionEntity(3 ).name) should be(" TABLE" )
169172 resultSet.getString(sourceDescriptionEntity(4 ).name) should be(" JSON" )
170173 resultSet.next should be(false )
171- resultSet.close
174+ resultSet.close()
172175 }
173176
174177 " drop the table" in {
175178 val resultSet = ksqlConnection.createStatement.executeQuery(s " DROP TABLE $table" )
176179 resultSet.next should be(true )
177- resultSet.getString(commandStatusEntity( 0 ) .name) should be(" TABLE" )
180+ resultSet.getString(commandStatusEntity.head .name) should be(" TABLE" )
178181 resultSet.getString(commandStatusEntity(1 ).name) should be(table.toUpperCase)
179182 resultSet.getString(commandStatusEntity(2 ).name) should be(" DROP" )
180183 resultSet.getString(commandStatusEntity(3 ).name) should be(" SUCCESS" )
181- resultSet.getString(commandStatusEntity(4 ).name) should be(s " Source ${table.toUpperCase} was dropped. " )
184+ resultSet.getString(commandStatusEntity(4 ).name) should be(s " Source ${table.toUpperCase} (topic: $topic ) was dropped." )
182185 resultSet.next should be(false )
183- resultSet.close
186+ resultSet.close()
184187 }
185188 }
186189
@@ -190,25 +193,25 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
190193 val stream = TestUtils .randomString()
191194
192195 " create the stream properly" in {
193- val resultSet = createTestTableOrStream(stream, true )
196+ val resultSet = createTestTableOrStream(str = stream, isStream = true )
194197 resultSet.next should be(true )
195- resultSet.getString(commandStatusEntity( 0 ) .name) should be(" STREAM" )
198+ resultSet.getString(commandStatusEntity.head .name) should be(" STREAM" )
196199 resultSet.getString(commandStatusEntity(1 ).name) should be(stream.toUpperCase)
197200 resultSet.getString(commandStatusEntity(2 ).name) should be(" CREATE" )
198201 resultSet.getString(commandStatusEntity(3 ).name) should be(" SUCCESS" )
199202 resultSet.getString(commandStatusEntity(4 ).name) should be(" Stream created" )
200203 resultSet.next should be(false )
201- resultSet.close
204+ resultSet.close()
202205 }
203206
204207 " list the stream already created" in {
205208 val resultSet = ksqlConnection.createStatement.executeQuery(s " SHOW STREAMS " )
206209 resultSet.next should be(true )
207- resultSet.getString(streamsListEntity( 0 ) .name) should be(stream.toUpperCase)
210+ resultSet.getString(streamsListEntity.head .name) should be(stream.toUpperCase)
208211 resultSet.getString(streamsListEntity(1 ).name) should be(topic)
209212 resultSet.getString(streamsListEntity(2 ).name) should be(" JSON" )
210213 resultSet.next should be(false )
211- resultSet.close
214+ resultSet.close()
212215 }
213216
214217 " be able to get the execution plan for a query in a stream" in {
@@ -217,17 +220,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
217220 resultSet.getString(queryDescriptionEntity(1 ).name) should be(" ROWTIME, ROWKEY, FIELD1, FIELD2, FIELD3" )
218221 resultSet.getString(queryDescriptionEntity(2 ).name) should be(stream.toUpperCase)
219222 resultSet.next should be(false )
220- resultSet.close
223+ resultSet.close()
221224 }
222225
223226 " be able to query all fields in the stream" in {
224227 var counter = 0
225228 val statement = ksqlConnection.createStatement
226229 statement.setMaxRows(maxRecords)
230+ statement.getMoreResults(1 ) should be(false )
227231 val resultSet = statement.executeQuery(s " SELECT * FROM $stream" )
232+ statement.getMoreResults(1 ) should be(true )
228233 while (resultSet.next) {
229234 resultSet.getLong(1 ) should not be (- 1 )
230- Option (resultSet.getString(2 )) should not be ( None )
235+ Option (resultSet.getString(2 )) should not be None
231236 resultSet.getInt(3 ) should be(123 )
232237 resultSet.getDouble(4 ) should be(45.4 )
233238 resultSet.getString(5 ) should be(" lorem ipsum" )
@@ -237,9 +242,10 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
237242 counter += 1
238243 }
239244 counter should be(maxRecords)
245+ statement.getMoreResults(1 ) should be(false )
240246
241- resultSet.close
242- statement.close
247+ resultSet.close()
248+ statement.close()
243249
244250 val metadata = resultSet.getMetaData
245251 metadata.getColumnCount should be(5 )
@@ -323,19 +329,19 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
323329 resultSet.getString(sourceDescriptionEntity(3 ).name) should be(" STREAM" )
324330 resultSet.getString(sourceDescriptionEntity(4 ).name) should be(" JSON" )
325331 resultSet.next should be(false )
326- resultSet.close
332+ resultSet.close()
327333 }
328334
329335 " drop the stream" in {
330336 val resultSet = ksqlConnection.createStatement.executeQuery(s " DROP STREAM $stream" )
331337 resultSet.next should be(true )
332- resultSet.getString(commandStatusEntity( 0 ) .name) should be(" STREAM" )
338+ resultSet.getString(commandStatusEntity.head .name) should be(" STREAM" )
333339 resultSet.getString(commandStatusEntity(1 ).name) should be(stream.toUpperCase)
334340 resultSet.getString(commandStatusEntity(2 ).name) should be(" DROP" )
335341 resultSet.getString(commandStatusEntity(3 ).name) should be(" SUCCESS" )
336- resultSet.getString(commandStatusEntity(4 ).name) should be(s " Source ${stream.toUpperCase} was dropped. " )
342+ resultSet.getString(commandStatusEntity(4 ).name) should be(s " Source ${stream.toUpperCase} (topic: $topic ) was dropped." )
337343 resultSet.next should be(false )
338- resultSet.close
344+ resultSet.close()
339345 }
340346 }
341347
@@ -344,19 +350,22 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
344350 " show the content of that topic" in {
345351 val statement = ksqlConnection.createStatement
346352 statement.setMaxRows(3 )
353+ statement.getMoreResults(1 ) should be(false )
347354 val resultSet = statement.executeQuery(s " PRINT ' $topic' " )
355+ statement.getMoreResults(1 ) should be(true )
348356 resultSet.next should be(true )
349- resultSet.getString(printTopic( 0 ) .name) should be(" Format:STRING" )
357+ resultSet.getString(printTopic.head .name) should be(" Format:STRING" )
350358 resultSet.next should be(true )
351359 resultSet.next should be(true )
352360 resultSet.next should be(false )
353- resultSet.close
354- statement.close
361+ statement.getMoreResults() should be(false )
362+ resultSet.close()
363+ statement.close()
355364 }
356365 }
357366 }
358367
359- private def produceMessages : Unit = {
368+ private def produceMessages () : Unit = {
360369 val key = TestUtils .randomString().getBytes
361370 val value =
362371 """
@@ -378,53 +387,53 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
378387 s " WITH (KAFKA_TOPIC=' $topic', VALUE_FORMAT='JSON', KEY='FIELD1'); " )
379388 }
380389
381- override def beforeAll = {
382- DriverManager .registerDriver(new KsqlDriver );
390+ override def beforeAll () : Unit = {
391+ DriverManager .registerDriver(new KsqlDriver )
383392
384- zkServer.startup
393+ zkServer.startup()
385394 TestUtils .waitTillAvailable(" localhost" , zkServer.getPort, 5000 )
386395
387- kafkaCluster.startup
396+ kafkaCluster.startup()
388397 kafkaCluster.getPorts.foreach { port =>
389398 TestUtils .waitTillAvailable(" localhost" , port, 5000 )
390399 }
391400
392401 kafkaCluster.createTopic(topic)
393402 kafkaCluster.existTopic(topic) should be(true )
394- producerThread.start
403+ producerThread.start()
395404
396- ksqlEngine.startup
405+ ksqlEngine.startup()
397406 TestUtils .waitTillAvailable(" localhost" , ksqlEngine.getPort, 5000 )
398407
399408 ksqlConnection = DriverManager .getConnection(ksqlUrl)
400409
401410 }
402411
403- override def afterAll = {
412+ override def afterAll () : Unit = {
404413 info(s " Produced ${producerThread.getNumExecs} messages " )
405414 stop.set(true )
406- TestUtils .swallow(producerThread.interrupt)
415+ TestUtils .swallow(producerThread.interrupt() )
407416
408- TestUtils .swallow(ksqlConnection.close)
409- ksqlEngine.shutdown
410- TestUtils .swallow(kafkaProducer.close)
417+ TestUtils .swallow(ksqlConnection.close() )
418+ ksqlEngine.shutdown()
419+ TestUtils .swallow(kafkaProducer.close() )
411420
412- kafkaCluster.shutdown
413- zkServer.shutdown
421+ kafkaCluster.shutdown()
422+ zkServer.shutdown()
414423 }
415424
416425}
417426
418427class BackgroundOps (stop : AtomicBoolean , exec : () => Unit ) extends Thread {
419428 private var count = 0L
420429
421- override def run = {
430+ override def run () : Unit = {
422431 while (! stop.get) {
423432 exec()
424433 this .count += 1
425434 }
426435 }
427436
428- def getNumExecs = this .count
437+ def getNumExecs : Long = this .count
429438}
430439
0 commit comments