@@ -4,14 +4,18 @@ import java.io.{ByteArrayInputStream, InputStream}
44import java .sql .{ResultSet , SQLException , SQLFeatureNotSupportedException }
55
66import com .github .mmolimar .ksql .jdbc .utils .TestUtils ._
7- import io .confluent .ksql .metastore .{KsqlStream , KsqlTopic }
7+ import io .confluent .ksql .metastore .SerdeFactory
8+ import io .confluent .ksql .metastore .model .{KeyField , KsqlStream , KsqlTopic }
89import io .confluent .ksql .rest .client .{KsqlRestClient , RestResponse }
910import io .confluent .ksql .rest .entity .{ExecutionPlan , KafkaTopicsList , QueryDescriptionEntity , QueryDescriptionList , _ }
10- import io .confluent .ksql .serde .DataSource .DataSourceSerDe
11- import io .confluent .ksql .serde .json .KsqlJsonTopicSerDe
11+ import io .confluent .ksql .rest .server .computation .CommandId
12+ import io .confluent .ksql .schema .ksql .KsqlSchema
13+ import io .confluent .ksql .serde .Format
14+ import io .confluent .ksql .serde .json .KsqlJsonSerdeFactory
1215import io .confluent .ksql .util .timestamp .LongColumnTimestampExtractionPolicy
1316import javax .ws .rs .core .Response
14- import org .apache .kafka .connect .data .SchemaBuilder
17+ import org .apache .kafka .common .serialization .{Serde , Serdes }
18+ import org .apache .kafka .connect .data .{Schema , SchemaBuilder }
1519import org .scalamock .scalatest .MockFactory
1620import org .scalatest .{Matchers , OneInstancePerTest , WordSpec }
1721
@@ -31,12 +35,12 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
3135
3236 " throw not supported exception if not supported" in {
3337
34- (mockedKsqlRestClient.makeQueryRequest _).expects(* )
38+ (mockedKsqlRestClient.makeQueryRequest _).expects(* , * )
3539 .returns(RestResponse .successful[KsqlRestClient .QueryStream ](mockQueryStream(mockResponse)))
3640 .noMoreThanOnce
3741
3842 val methods = implementedMethods[KsqlStatement ]
39- reflectMethods[KsqlStatement ](methods, false , statement)
43+ reflectMethods[KsqlStatement ](methods = methods, implemented = false , obj = statement)
4044 .foreach(method => {
4145 assertThrows[SQLFeatureNotSupportedException ] {
4246 method()
@@ -60,27 +64,27 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
6064 }
6165
6266 assertThrows[SQLException ] {
63- (mockedKsqlRestClient.makeQueryRequest _).expects(* )
67+ (mockedKsqlRestClient.makeQueryRequest _).expects(* , * )
6468 .returns(RestResponse .erroneous(new KsqlErrorMessage (- 1 , " error" )))
6569 .once
6670 statement.execute(" select * from test" )
6771 }
6872
6973 assertThrows[SQLException ] {
70- (mockedKsqlRestClient.makeQueryRequest _).expects(* )
74+ (mockedKsqlRestClient.makeQueryRequest _).expects(* , * )
7175 .returns(RestResponse .successful[KsqlRestClient .QueryStream ](mockQueryStream(mockResponse)))
7276 .once
73- (mockedKsqlRestClient.makeKsqlRequest _ ).expects(* )
77+ (mockedKsqlRestClient.makeKsqlRequest( _ : String ) ).expects(* )
7478 .returns(RestResponse .erroneous(new KsqlErrorMessage (- 1 , " error" )))
7579 .once
7680 statement.execute(" select * from test" )
7781 }
7882
7983 assertThrows[SQLException ] {
80- (mockedKsqlRestClient.makeQueryRequest _).expects(* )
84+ (mockedKsqlRestClient.makeQueryRequest _).expects(* , * )
8185 .returns(RestResponse .successful[KsqlRestClient .QueryStream ](mockQueryStream(mockResponse)))
8286 .once
83- (mockedKsqlRestClient.makeKsqlRequest _ ).expects(* )
87+ (mockedKsqlRestClient.makeKsqlRequest( _ : String ) ).expects(* )
8488 .returns(RestResponse .successful[KsqlEntityList ](new KsqlEntityList ))
8589 .once
8690 statement.execute(" select * from test" )
@@ -110,21 +114,21 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
110114 val entityList = new KsqlEntityList
111115 entityList.add(new QueryDescriptionEntity (" select * from test;" , queryDesc))
112116
113- (mockedKsqlRestClient.makeQueryRequest _).expects(* )
117+ (mockedKsqlRestClient.makeQueryRequest _).expects(* , * )
114118 .returns(RestResponse .successful[KsqlRestClient .QueryStream ](mockQueryStream(mockResponse)))
115119 .once
116- (mockedKsqlRestClient.makeKsqlRequest _ ).expects(* )
120+ (mockedKsqlRestClient.makeKsqlRequest( _ : String ) ).expects(* )
117121 .returns(RestResponse .successful[KsqlEntityList ](entityList))
118122 .once
119123 statement.execute(" select * from test" ) should be(true )
120124
121- (mockedKsqlRestClient.makeQueryRequest _).expects(* )
125+ (mockedKsqlRestClient.makeQueryRequest _).expects(* , * )
122126 .returns(RestResponse .successful[KsqlRestClient .QueryStream ](mockQueryStream(mockResponse)))
123127 .once
124- (mockedKsqlRestClient.makeKsqlRequest _ ).expects(* )
128+ (mockedKsqlRestClient.makeKsqlRequest( _ : String ) ).expects(* )
125129 .returns(RestResponse .successful[KsqlEntityList ](entityList))
126130 .once
127- Option (statement.executeQuery(" select * from test;" )) should not be ( None )
131+ Option (statement.executeQuery(" select * from test;" )) should not be None
128132
129133 statement.getMaxRows should be(0 )
130134 statement.getResultSet shouldNot be(None .orNull)
@@ -140,36 +144,36 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
140144 statement.getWarnings should be(None .orNull)
141145
142146 assertThrows[SQLException ] {
143- (mockedKsqlRestClient.makeQueryRequest _).expects(* )
147+ (mockedKsqlRestClient.makeQueryRequest _).expects(* , * )
144148 .returns(RestResponse .successful[KsqlRestClient .QueryStream ](mockQueryStream(mockResponse)))
145149 .once
146150 val multipleResults = new KsqlEntityList
147151 multipleResults.add(new QueryDescriptionEntity (" select * from test;" , queryDesc))
148152 multipleResults.add(new QueryDescriptionEntity (" select * from test;" , queryDesc))
149- (mockedKsqlRestClient.makeKsqlRequest _ ).expects(* )
153+ (mockedKsqlRestClient.makeKsqlRequest( _ : String ) ).expects(* )
150154 .returns(RestResponse .successful[KsqlEntityList ](multipleResults))
151155 .once
152156 statement.execute(" select * from test" )
153157 }
154158 assertThrows[SQLException ] {
155159 statement.getResultSet
156160 }
157- statement.cancel
161+ statement.cancel()
158162
159163 statement.isClosed should be(false )
160- statement.close
161- statement.close
164+ statement.close()
165+ statement.close()
162166 statement.isClosed should be(true )
163167 assertThrows[SQLException ] {
164168 statement.executeQuery(" select * from test;" )
165169 }
166170 }
167171
168172 " work when printing topics" in {
169- (mockedKsqlRestClient.makePrintTopicRequest _).expects(* )
173+ (mockedKsqlRestClient.makePrintTopicRequest _).expects(* , * )
170174 .returns(RestResponse .successful[InputStream ](new ByteArrayInputStream (" test" .getBytes)))
171175 .once
172- Option (statement.executeQuery(" print 'test'" )) should not be ( None )
176+ Option (statement.executeQuery(" print 'test'" )) should not be None
173177 statement.getResultSet.next should be(true )
174178 statement.getResultSet.getString(1 ) should be(" test" )
175179 }
@@ -180,24 +184,24 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
180184 def validateCommand (entity : KsqlEntity , headers : List [HeaderField ]): Unit = {
181185 val entityList = new KsqlEntityList
182186 entityList.add(entity)
183- (mockedKsqlRestClient.makeKsqlRequest _ ).expects(* )
187+ (mockedKsqlRestClient.makeKsqlRequest( _ : String ) ).expects(* )
184188 .returns(RestResponse .successful[KsqlEntityList ](entityList))
185189 .once
186190 statement.execute(entity.getStatementText) should be(true )
187191 statement.getResultSet.getMetaData.getColumnCount should be(headers.size)
188- headers.zipWithIndex.map { case (c, index) => {
192+ headers.zipWithIndex.map { case (c, index) =>
189193 statement.getResultSet.getMetaData.getColumnName(index + 1 ) should be(c.name)
190194 statement.getResultSet.getMetaData.getColumnLabel(index + 1 ).toUpperCase should be(c.name)
191195 }
192- }
193196 }
194197
195- val commandStatus = new CommandStatusEntity (" REGISTER TOPIC TEST" , " topic/1/create" , " SUCCESS" , " Success Message" )
198+ val commandStatus = new CommandStatusEntity (" REGISTER TOPIC TEST" , CommandId .fromString(" topic/1/create" ),
199+ new CommandStatus (CommandStatus .Status .SUCCESS , " Success Message" ), null )
196200 val executionPlan = new ExecutionPlan (" DESCRIBE test" )
197201 val functionDescriptionList = new FunctionDescriptionList (" DESCRIBE FUNCTION test;" ,
198202 " TEST" , " Description" , " author" , " version" , " path" ,
199203 List (
200- new FunctionInfo (List (new ArgumentInfo (" arg1" , " INT" , " Description" )).asJava, " BIGINT" , " Description" )
204+ new FunctionInfo (List (new ArgumentInfo (" arg1" , " INT" , " Description" , false )).asJava, " BIGINT" , " Description" )
201205 ).asJava,
202206 FunctionType .scalar
203207 )
@@ -211,7 +215,7 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
211215 )
212216 val ksqlTopicsList = new KsqlTopicsList (
213217 " SHOW TOPICS;" ,
214- List (new KsqlTopicInfo (" ksqltopic" , " kafkatopic" , DataSourceSerDe .JSON )).asJava
218+ List (new KsqlTopicInfo (" ksqltopic" , " kafkatopic" , Format .JSON )).asJava
215219 )
216220 val propertiesList = new PropertiesList (
217221 " list properties;" ,
@@ -250,15 +254,22 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
250254 " EXPLAIN select * from test;" ,
251255 List (queryDescription.getQueryDescription).asJava
252256 )
257+ val schema = SchemaBuilder
258+ .struct
259+ .field(" key" , Schema .OPTIONAL_INT64_SCHEMA )
260+ .build()
253261 val sourceDescEntity = new SourceDescriptionEntity (
254262 " DESCRIBE TEST;" ,
255263 new SourceDescription (
256264 new KsqlStream (" sqlExpression" ,
257265 " datasource" ,
258- SchemaBuilder .struct ,
259- SchemaBuilder .struct .field(" key" ),
266+ KsqlSchema .of(schema) ,
267+ KeyField .of( " key " , schema .field(" key" ) ),
260268 new LongColumnTimestampExtractionPolicy (" timestamp" ),
261- new KsqlTopic (" input" , " input" , new KsqlJsonTopicSerDe )),
269+ new KsqlTopic (" input" , " input" , new KsqlJsonSerdeFactory , true ),
270+ new SerdeFactory [String ] {
271+ override def create (): Serde [String ] = Serdes .String ()
272+ }),
262273 true ,
263274 " JSON" ,
264275 List .empty.asJava,
@@ -302,7 +313,7 @@ class KsqlStatementSpec extends WordSpec with Matchers with MockFactory with One
302313 " throw not supported exception if not supported" in {
303314
304315 val resultSet = new StatementNotSupported
305- reflectMethods[StatementNotSupported ](Seq .empty, false , resultSet)
316+ reflectMethods[StatementNotSupported ](methods = Seq .empty, implemented = false , obj = resultSet)
306317 .foreach(method => {
307318 assertThrows[SQLFeatureNotSupportedException ] {
308319 method()
0 commit comments