Skip to content

Commit 03a2ffa

Browse files
committed
fix(extension-driver-ksqldb): review suggestions, add comments, timeout option
1 parent 4e90906 commit 03a2ffa

File tree

5 files changed

+57
-8
lines changed

5 files changed

+57
-8
lines changed

packages/doc/docs/connect/ksqldb.mdx

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
host: www.example.com:8088
2929
username: <username>
3030
password: <password>
31+
timeout: 25000
3132
allow: '*'
3233
```
3334

@@ -37,4 +38,5 @@
3738
| ------- | -------- | --------- | ----------- |
3839
| host | N | http://localhost:8088 | ksqlDB instance. |
3940
| username | N | | The name of the user on whose behalf requests are made. |
40-
| password | N | | The user's password. |
41+
| password | N | | The user's password. |
42+
| timeout | N | 25000 | Request timeout in milliseconds. |

packages/extension-driver-ksqldb/README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,9 @@ This is the KSqlDb driver for VulcanSQL, provided by [Canner](https://cannerdata
2828
# Optional: The name of the user on whose behalf requests are made.
2929
username: '<username>',
3030
# The user's password
31-
password: '<password>'
31+
password: '<password>',
32+
# Optional: Request timeout in milliseconds. Default value: 25000
33+
timeout: 25000
3234
```
3335

3436
## Testing

packages/extension-driver-ksqldb/package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@vulcan-sql/extension-driver-ksqldb",
33
"description": "KSqlDb driver for VulcanSQL",
4-
"version": "0.7.0",
4+
"version": "0.7.1",
55
"type": "commonjs",
66
"publishConfig": {
77
"access": "public"
@@ -23,6 +23,6 @@
2323
},
2424
"license": "MIT",
2525
"peerDependencies": {
26-
"@vulcan-sql/core": "~0.7.0-0"
26+
"@vulcan-sql/core": "~0.7.1-0"
2727
}
2828
}

packages/extension-driver-ksqldb/project.json

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,19 @@
3232
"target": "install-dependencies"
3333
}
3434
]
35+
},
36+
"publish": {
37+
"executor": "@nrwl/workspace:run-commands",
38+
"options": {
39+
"command": "node ../../../tools/scripts/publish.mjs {args.tag} {args.version}",
40+
"cwd": "dist/packages/extension-driver-ksqldb"
41+
},
42+
"dependsOn": [
43+
{
44+
"projects": "self",
45+
"target": "build"
46+
}
47+
]
3548
}
3649
},
3750
"tags": []

packages/extension-driver-ksqldb/src/lib/restfulClient.ts

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ export interface RestfulClientOptions {
5454
host?: string;
5555
username?: string;
5656
password?: string;
57+
timeout?: number;
58+
}
59+
60+
const DEFAULT_OPTIONS: Required<Pick<RestfulClientOptions, 'host'|'timeout'>> = {
61+
host: 'http://localhost:8088',
62+
timeout: 25000,
5763
}
5864

5965
export class RestfulClient {
@@ -67,10 +73,24 @@ export class RestfulClient {
6773
this.connect();
6874
}
6975

76+
/**
77+
* The connect method will create a promise "startSession" method, not really to connect http2 immediately.
78+
* To let users establish a "startSession" promise request only when they need to query or exec a statement.
79+
*/
7080
public connect() {
7181
this.startSession = () =>
7282
new Promise((resolve, reject) => {
73-
this.client = http2.connect(this.options.host || 'http://localhost:8088');
83+
this.client = http2.connect(this.options.host || DEFAULT_OPTIONS.host, {
84+
timeout: this.options.timeout || DEFAULT_OPTIONS.timeout,
85+
});
86+
87+
this.client.setTimeout(this.options.timeout || DEFAULT_OPTIONS.timeout, () => {
88+
if (this.connected === false) {
89+
const timeoutError = new Error("Connection timeout.");
90+
reject(timeoutError);
91+
this.client?.destroy(timeoutError);
92+
}
93+
});
7494

7595
this.client.on('connect', () => {
7696
this.connected = true;
@@ -83,7 +103,7 @@ export class RestfulClient {
83103
});
84104
}
85105

86-
public close(): Promise<void> {
106+
public closeSession(): Promise<void> {
87107
return new Promise((resolve, reject) => {
88108
if (this.client) {
89109
!this.client.destroyed && this.client.destroy();
@@ -116,6 +136,11 @@ export class RestfulClient {
116136
return isRunning;
117137
}
118138

139+
/**
140+
* According to ksqldb restful API: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/query-endpoint
141+
* To run a SELECT statement and stream back the results.
142+
* SELECT statement: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/select-pull-query
143+
*/
119144
public async query({
120145
query,
121146
query_params = {},
@@ -131,6 +156,11 @@ export class RestfulClient {
131156
return res;
132157
}
133158

159+
/**
160+
* According to ksqldb restful API: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/ksql-endpoint
161+
* All statements, except those starting with SELECT and PRINT, can be run on this exec method.
162+
* To run SELECT and PRINT statements use the "query" method instead.
163+
*/
134164
public async exec({
135165
query,
136166
query_params = {},
@@ -153,6 +183,8 @@ export class RestfulClient {
153183
const ksql = query.replace(/\$(\d+)/g, (_, index) => {
154184
const valueIndex = parseInt(index) - 1;
155185
const paramValue = values[valueIndex];
186+
// Because the ksqldb queries are expressed using a strict subset of ANSI SQL.
187+
// It didn't support the string auto conversion, so we need to add the single quote for string value.
156188
return typeof paramValue === 'string' ? `'${paramValue}'` : paramValue;
157189
});
158190

@@ -203,12 +235,12 @@ export class RestfulClient {
203235
} else {
204236
reject(responseData);
205237
}
206-
this.close();
238+
this.closeSession();
207239
});
208240

209241
req.on('error', (error) => {
210242
reject(error);
211-
this.close();
243+
this.closeSession();
212244
});
213245

214246
buffer && req.write(buffer);

0 commit comments

Comments
 (0)