Skip to content

Commit aab61ef

Browse files
authored
Merge pull request #4 from mmolimar/develop
Release 0.2 version
2 parents 92feb65 + e45c777 commit aab61ef

File tree

13 files changed

+166
-74
lines changed

13 files changed

+166
-74
lines changed

.travis.yml

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,30 @@
11
before_install:
2+
- git clone https://github.com/confluentinc/common.git
3+
- cd common
4+
- git checkout v4.1.0-rc3
5+
- mv pom.xml pom.xml.bak && sed '58s/.*/ <kafka.version>1.1.0<\/kafka.version>/' pom.xml.bak > pom.xml
6+
- mvn -B clean install -Dmaven.test.skip=true
7+
- cd ..
8+
- git clone https://github.com/confluentinc/support-metrics-common.git
9+
- cd support-metrics-common
10+
- git checkout v4.1.0-rc3
11+
- mvn -B clean install -Dmaven.test.skip=true
12+
- cd ..
13+
- git clone https://github.com/confluentinc/rest-utils.git
14+
- cd rest-utils
15+
- git checkout v4.1.0-rc3
16+
- mvn -B clean install -Dmaven.test.skip=true
17+
- cd ..
18+
- git clone https://github.com/confluentinc/schema-registry.git
19+
- cd schema-registry
20+
- git checkout v4.1.0-rc3
21+
- mvn -B clean install -Dmaven.test.skip=true
22+
- cd ..
223
- git clone https://github.com/confluentinc/ksql.git
324
- cd ksql
4-
- git checkout 0.1.x
5-
- mvn -pl '!ksql-cli' clean install -Dmaven.test.skip=true
25+
- git checkout v4.1.0-rc3
26+
- ex -sc '55d7|x' ksql-cli/pom.xml && ex -sc '37d7|x' ksql-engine/pom.xml && ex -sc '25d7|x' ksql-parser/pom.xml && ex -sc '84d7|x' ksql-rest-app/pom.xml && ex -sc '37d7|x' ksql-tools/pom.xml
27+
- mvn -B clean install -Dmaven.test.skip=true
628
- cd ..
729
script:
830
- sbt clean coverage test it:test coverageReport && sbt coverageAggregate

README.md

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,22 @@ perform queries to Kafka and then, the engine translates those requests to Kafka
1010

1111
### Building from source ###
1212

13-
First of all, the KSQL lib has to be installed into your local repo (till now, there isn't a release available).
13+
First of all, the KSQL lib has to be installed into your local repo.
1414

1515
So, cloning the KSQL repo:
1616

17-
``git clone https://github.com/confluentinc/ksql.git && cd ksql && git checkout 0.1.x``
17+
``git clone https://github.com/confluentinc/ksql.git && cd ksql && git v4.1.0-rc3``
1818

1919
and installing it:
2020

2121
``mvn clean install -Dmaven.skip.test=true``
2222

23+
Probably, you'll have to do the same things for these Confluent projects (previous to the KSQL project installation):
24+
* [Confluent common](https://github.com/confluentinc/common.git)
25+
* [Confluent support-metrics-common](https://github.com/confluentinc/support-metrics-common.git)
26+
* [Confluent rest-utils](https://github.com/confluentinc/rest-utils.git)
27+
* [Confluent schema-registry](https://github.com/confluentinc/schema-registry.git)
28+
2329
Once you did that, just have to clone the ksql-jdbc-driver repo and package it:
2430

2531
``git clone https://github.com/mmolimar/ksql-jdbc-driver.git && cd ksql-jdbc-driver``
@@ -57,15 +63,16 @@ where:
5763

5864
* **\<ksql-engine>**: represents the KSQL engine host.
5965
* **\<port>**: is the KSQL engine port.
60-
* **\<propertyN>**: are the custom client properties (optionals). Fow now, there is only one property and it's to
61-
set if the KSQL connection is secured or not. The property name is ``secured`` and its value is a boolean
62-
(``true``|``false``). Its default value is ``false``.
66+
* **\<propertyN>**: are the custom client properties (optionals). Available properties:
67+
* ``secured``: sets if the KSQL connection is secured or not. It's a boolean (``true``|``false``) and its default
68+
value is ``false``.
69+
* ``timeout``: sets the max wait time between each message when receiving them. It's a long and its default
70+
value is ``0`` which means that is infinite.
6371

6472
## TODO's
6573

6674
- [ ] Standalone mode: connecting directly to Kafka brokers.
6775
- [ ] Make the driver more compliant with the JDBC spec.
68-
- [ ] Enable a timeout when waiting for messages in a query.
6976

7077
## Contribute
7178

build.sbt

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
name := "ksql-jdbc-driver"
22

3-
version := "0.2-SNAPSHOT"
3+
version := "0.2"
44

55
initialize := {
66
assert(Integer.parseInt(sys.props("java.specification.version").split("\\.")(1)) >= 7, "Java 7 or above required")
@@ -9,10 +9,11 @@ initialize := {
99
scalaVersion := "2.11.11"
1010

1111
resolvers += "Confluent Maven Repo" at "http://packages.confluent.io/maven/"
12+
resolvers += "Confluent Snapshots Maven Repo" at "https://s3-us-west-2.amazonaws.com/confluent-snapshots/"
1213
resolvers += Resolver.mavenLocal
1314

14-
libraryDependencies += "io.confluent.ksql" % "ksql-rest-app" % "0.1-SNAPSHOT"
15-
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.11.0.0-cp1" % "test"
16-
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.4" % "test"
15+
libraryDependencies += "io.confluent.ksql" % "ksql-rest-app" % "4.1.0"
16+
libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0" % "test"
17+
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.5" % "test"
1718
libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "3.6.0" % "test"
1819

src/it/scala/com/github/mmolimar/ksql/jdbc/KsqlDriverIntegrationTest.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,9 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
1515
val kafkaCluster = new EmbeddedKafkaCluster(zkServer.getConnection)
1616
val ksqlEngine = new EmbeddedKsqlEngine(kafkaCluster.getBrokerList)
1717

18-
val kafkaProducer = TestUtils.buildProducer(kafkaCluster.getBrokerList)
18+
lazy val kafkaProducer = TestUtils.buildProducer(kafkaCluster.getBrokerList)
1919

20-
val ksqlUrl = s"jdbc:ksql://localhost:${ksqlEngine.getPort}"
20+
val ksqlUrl = s"jdbc:ksql://localhost:${ksqlEngine.getPort}?timeout=20000"
2121
var ksqlConnection: Connection = _
2222
val topic = TestUtils.randomString()
2323

@@ -224,15 +224,15 @@ class KsqlDriverIntegrationTest extends WordSpec with Matchers with BeforeAndAft
224224
|}
225225
""".stripMargin.getBytes
226226
val record = new ProducerRecord[Array[Byte], Array[Byte]](topic, key, value)
227-
kafkaProducer.send(record).get(1000, TimeUnit.MILLISECONDS)
227+
kafkaProducer.send(record).get(10000, TimeUnit.MILLISECONDS)
228228
Thread.sleep(100)
229229

230230
}
231231

232232
private def createTestTableOrStream(str: String, isStream: Boolean = false) = {
233233
ksqlConnection.createStatement.execute(s"CREATE ${if (isStream) "STREAM" else "TABLE"} $str " +
234234
s"(FIELD1 INT, FIELD2 DOUBLE, FIELD3 VARCHAR) " +
235-
s"WITH (KAFKA_TOPIC='$topic', VALUE_FORMAT='JSON');") should be(true)
235+
s"WITH (KAFKA_TOPIC='$topic', VALUE_FORMAT='JSON', KEY='FIELD1');") should be(true)
236236
}
237237

238238
override def beforeAll = {

src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKafkaCluster.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@ import java.io.File
44
import java.util.Properties
55

66
import com.github.mmolimar.ksql.jdbc.utils.TestUtils
7-
import kafka.admin.AdminUtils
87
import kafka.server.{KafkaConfig, KafkaServer}
9-
import kafka.utils.{Logging, ZkUtils}
8+
import kafka.utils.Logging
9+
import kafka.zk.AdminZkClient
1010

1111
class EmbeddedKafkaCluster(zkConnection: String,
1212
ports: Seq[Int] = Seq(TestUtils.getAvailablePort),
@@ -17,6 +17,9 @@ class EmbeddedKafkaCluster(zkConnection: String,
1717
private var brokers: Seq[KafkaServer] = Seq.empty
1818
private var logDirs: Seq[File] = Seq.empty
1919

20+
private lazy val zkClient = TestUtils.buildZkClient(zkConnection)
21+
private lazy val adminZkClient = new AdminZkClient(zkClient)
22+
2023
def startup = {
2124
info("Starting up embedded Kafka brokers")
2225

@@ -58,21 +61,19 @@ class EmbeddedKafkaCluster(zkConnection: String,
5861

5962
def createTopic(topic: String, numPartitions: Int = 1, replicationFactor: Int = 1) = {
6063
info(s"Creating topic $topic")
61-
AdminUtils.createTopic(getZkUtils, topic, numPartitions, replicationFactor)
64+
adminZkClient.createTopic(topic, numPartitions, replicationFactor)
6265
}
6366

6467
def deleteTopic(topic: String) {
6568
info(s"Deleting topic $topic")
66-
AdminUtils.deleteTopic(getZkUtils, topic)
69+
adminZkClient.deleteTopic(topic)
6770
}
6871

6972
def deleteTopics(topics: Seq[String]) = topics.foreach(deleteTopic(_))
7073

71-
def existTopic(topic: String): Boolean = AdminUtils.topicExists(getZkUtils, topic)
72-
73-
def listTopics = getZkUtils.getAllTopics
74+
def existTopic(topic: String): Boolean = zkClient.topicExists(topic)
7475

75-
private def getZkUtils: ZkUtils = if (brokers.isEmpty) null else brokers.head.zkUtils
76+
def listTopics = zkClient.getAllTopicsInCluster
7677

7778
private def resolvePort(port: Int) = if (port <= 0) TestUtils.getAvailablePort else port
7879

src/it/scala/com/github/mmolimar/ksql/jdbc/embedded/EmbeddedKsqlEngine.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import java.io.IOException
44

55
import com.github.mmolimar.ksql.jdbc.utils.TestUtils
66
import io.confluent.ksql.rest.server.{KsqlRestApplication, KsqlRestConfig}
7+
import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent
78
import kafka.utils.Logging
89
import org.apache.kafka.clients.producer.ProducerConfig
910

@@ -22,7 +23,7 @@ class EmbeddedKsqlEngine(brokerList: String, port: Int = TestUtils.getAvailableP
2223
"ksql.command.topic.suffix" -> "commands"
2324
))
2425

25-
lazy val ksqlEngine = KsqlRestApplication.buildApplication(config, true)
26+
lazy val ksqlEngine = KsqlRestApplication.buildApplication(config, true, new KsqlVersionCheckerAgent)
2627

2728
@throws[IOException]
2829
def startup = {

src/main/scala/com/github/mmolimar/ksql/jdbc/Exceptions.scala

Lines changed: 22 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,47 +2,44 @@ package com.github.mmolimar.ksql.jdbc
22

33
import java.sql.{SQLException, SQLFeatureNotSupportedException}
44

5-
sealed trait KsqlException
5+
sealed trait KsqlException {
6+
def message: String = ""
7+
}
68

7-
case class InvalidUrl(url: String) extends KsqlException
9+
case class InvalidUrl(url: String) extends KsqlException {
10+
override def message = s"URL with value ${url} is not valid. It must match de regex ${KsqlDriver.urlRegex}"
11+
}
812

9-
case class CannotConnect(url: String, msg: String) extends KsqlException
13+
case class CannotConnect(url: String, msg: String) extends KsqlException {
14+
override def message = s"Cannot connect to this URL ${url}. Error message: ${msg}"
15+
}
1016

11-
case class InvalidProperty(name: String) extends KsqlException
17+
case class InvalidProperty(name: String) extends KsqlException {
18+
override def message = s"Invalid property ${name}."
19+
}
1220

13-
case class NotSupported(msg: String = "Feature not supported") extends KsqlException
21+
case class NotSupported(override val message: String = "Feature not supported") extends KsqlException
1422

15-
case class KsqlQueryError(msg: String = "Error executing query") extends KsqlException
23+
case class KsqlQueryError(override val message: String = "Error executing query") extends KsqlException
1624

17-
case class KsqlCommandError(msg: String = "Error executing command") extends KsqlException
25+
case class KsqlCommandError(override val message: String = "Error executing command") extends KsqlException
1826

19-
case class InvalidColumn(msg: String = "Invalid column") extends KsqlException
27+
case class InvalidColumn(override val message: String = "Invalid column") extends KsqlException
2028

21-
case class EmptyRow(msg: String = "Current row is empty") extends KsqlException
29+
case class EmptyRow(override val message: String = "Current row is empty") extends KsqlException
2230

23-
case class UnknownTableType(msg: String = "Table type does not exist") extends KsqlException
31+
case class UnknownTableType(override val message: String = "Table type does not exist") extends KsqlException
2432

25-
case class UnknownCatalog(msg: String = "Catalog does not exist") extends KsqlException
33+
case class UnknownCatalog(override val message: String = "Catalog does not exist") extends KsqlException
2634

27-
case class UnknownSchema(msg: String = "Schema does not exist") extends KsqlException
35+
case class UnknownSchema(override val message: String = "Schema does not exist") extends KsqlException
2836

2937
object Exceptions {
3038

3139
implicit def wrapException(error: KsqlException): SQLException = {
3240
error match {
33-
case e: InvalidUrl => new SQLException(s"URL with value ${e.url} is not valid." +
34-
s"It must match de regex ${KsqlDriver.urlRegex}")
35-
case e: CannotConnect => new SQLException(s"Cannot connect to this URL ${e.url}. Error message: ${e.msg}")
36-
case e: InvalidProperty => new SQLException(e.name)
37-
case e: NotSupported => new SQLFeatureNotSupportedException(e.msg)
38-
case e: KsqlQueryError => new SQLException(e.msg)
39-
case e: KsqlCommandError => new SQLException(e.msg)
40-
case e: InvalidColumn => new SQLException(e.msg)
41-
case e: EmptyRow => new SQLException(e.msg)
42-
case e: UnknownTableType => new SQLException(e.msg)
43-
case e: UnknownCatalog => new SQLException(e.msg)
44-
case e: UnknownSchema => new SQLException(e.msg)
45-
case _ => new SQLException("Unknown KSQL Exception")
41+
case ns: NotSupported => new SQLFeatureNotSupportedException(ns.message)
42+
case e => new SQLException(e.message)
4643
}
4744
}
4845

src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlConnection.scala

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,16 @@ import scala.collection.JavaConverters._
1212
import scala.util.{Failure, Success, Try}
1313

1414
case class KsqlConnectionValues(ksqlServer: String, port: Int, config: Map[String, String]) {
15+
1516
def getKsqlUrl: String = {
1617
val protocol = if (isSecured) "https://" else "http://"
1718
protocol + ksqlServer + ":" + port
1819
}
1920

20-
def isSecured: Boolean = config.get("secured").getOrElse("false").toBoolean
21+
def isSecured: Boolean = config.getOrElse("secured", "false").toBoolean
22+
23+
def timeout: Long = config.getOrElse("timeout", "0").toLong
24+
2125
}
2226

2327
class KsqlConnection(values: KsqlConnectionValues, properties: Properties) extends Connection with WrapperNotSupported {
@@ -88,7 +92,7 @@ class KsqlConnection(values: KsqlConnectionValues, properties: Properties) exten
8892
throw NotSupported("ResultSetType, ResultSetConcurrency and ResultSetHoldability must be" +
8993
" TYPE_FORWARD_ONLY, CONCUR_READ_ONLY, HOLD_CURSORS_OVER_COMMIT respectively ")
9094
}
91-
new KsqlStatement(ksqlClient)
95+
new KsqlStatement(ksqlClient, values.timeout)
9296
}
9397

9498
override def getHoldability: Int = throw NotSupported()

src/main/scala/com/github/mmolimar/ksql/jdbc/KsqlStatement.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ import com.github.mmolimar.ksql.jdbc.Exceptions._
66
import com.github.mmolimar.ksql.jdbc.resultset.KsqlResultSet
77
import io.confluent.ksql.rest.client.KsqlRestClient
88

9-
class KsqlStatement(private val ksqlClient: KsqlRestClient) extends Statement with WrapperNotSupported {
9+
class KsqlStatement(private val ksqlClient: KsqlRestClient, val timeout: Long = 0)
10+
extends Statement with WrapperNotSupported {
1011

1112
override def setMaxFieldSize(max: Int): Unit = throw NotSupported()
1213

@@ -104,6 +105,6 @@ class KsqlStatement(private val ksqlClient: KsqlRestClient) extends Statement wi
104105

105106
private def fixSql(sql: String) = if (sql.trim.endsWith(";")) sql else sql + ";"
106107

107-
private implicit def toResultSet(stream: KsqlRestClient.QueryStream): ResultSet = new KsqlResultSet(stream)
108+
private implicit def toResultSet(stream: KsqlRestClient.QueryStream): ResultSet = new KsqlResultSet(stream, timeout)
108109

109110
}

src/main/scala/com/github/mmolimar/ksql/jdbc/resultset/KsqlResultSet.scala

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,20 +9,35 @@ import io.confluent.ksql.rest.client.KsqlRestClient
99
import io.confluent.ksql.rest.entity.StreamedRow
1010

1111
import scala.collection.JavaConversions._
12+
import scala.concurrent.ExecutionContext.Implicits.global
13+
import scala.concurrent.duration._
14+
import scala.concurrent.{Await, Future, TimeoutException}
15+
import scala.util.{Failure, Success, Try}
1216

13-
class KsqlResultSet(private[jdbc] val stream: KsqlRestClient.QueryStream) extends AbstractResultSet[StreamedRow](stream) {
17+
class KsqlResultSet(private[jdbc] val stream: KsqlRestClient.QueryStream, val timeout: Long = 0)
18+
extends AbstractResultSet[StreamedRow](stream) {
1419

1520
private val emptyRow: StreamedRow = new StreamedRow(new GenericRow, null)
1621

17-
override def next: Boolean = stream.hasNext match {
18-
case true =>
19-
stream.next match {
20-
case record if Option(record.getRow) == None => false
21-
case record =>
22-
currentRow = Some(record)
23-
true
24-
}
25-
case false => false
22+
private val waitDuration = if (timeout > 0) timeout millis else Duration.Inf
23+
24+
override def next: Boolean = {
25+
def hasNext = stream.hasNext match {
26+
case true =>
27+
stream.next match {
28+
case record if Option(record.getRow) == None => false
29+
case record =>
30+
currentRow = Some(record)
31+
true
32+
}
33+
case false => false
34+
}
35+
36+
Try(Await.result(Future(hasNext), waitDuration)) match {
37+
case Success(r) => r
38+
case Failure(_: TimeoutException) => false
39+
case Failure(e) => throw e
40+
}
2641
}
2742

2843
override def isBeforeFirst: Boolean = false

0 commit comments

Comments
 (0)