Skip to content

Commit e2790a2

Browse files
authored
Merge pull request #255 from Canner/feature/ksqldb-driver
Feature: Support KSqlDb driver
2 parents 57773b2 + 03a2ffa commit e2790a2

25 files changed

+1231
-0
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# KsqlDB
2+
3+
## Installation
4+
5+
1. Install the package:
6+
7+
**If you are developing with binary, the package is already bundled in the binary. You can skip this step.**
8+
9+
```bash
10+
npm i @vulcan-sql/extension-driver-ksqldb
11+
```
12+
13+
2. Update your `vulcan.yaml` file to enable the extension:
14+
15+
```yaml
16+
extensions:
17+
...
18+
// highlight-next-line
19+
ksqldb: '@vulcan-sql/extension-driver-ksqldb' # Add this line
20+
```
21+
22+
3. Create a new profile in your `profiles.yaml` file or in the designated profile paths. For example:
23+
24+
```yaml
25+
- name: ksql # profile name
26+
type: ksqldb
27+
connection:
28+
host: www.example.com:8088
29+
username: <username>
30+
password: <password>
31+
timeout: 25000
32+
allow: '*'
33+
```
34+
35+
## Configuration
36+
37+
| Name | Required | Default | Description |
38+
| ------- | -------- | --------- | ----------- |
39+
| host | N | http://localhost:8088 | ksqlDB instance. |
40+
| username | N | | The name of the user on whose behalf requests are made. |
41+
| password | N | | The user's password. |
42+
| timeout | N | 25000 | Request timeout in milliseconds. |

packages/doc/sidebars.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,10 @@ const sidebars = {
7676
type: 'doc',
7777
id: 'connectors/clickhouse',
7878
},
79+
{
80+
type: 'doc',
81+
id: 'connect/ksqldb',
82+
},
7983
],
8084
},
8185
{
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
{
2+
"extends": ["../../.eslintrc.json"],
3+
"ignorePatterns": ["!**/*"],
4+
"overrides": [
5+
{
6+
"files": ["*.ts", "*.tsx", "*.js", "*.jsx"],
7+
"rules": {}
8+
},
9+
{
10+
"files": ["*.ts", "*.tsx"],
11+
"rules": {}
12+
},
13+
{
14+
"files": ["*.js", "*.jsx"],
15+
"rules": {}
16+
}
17+
]
18+
}
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# extension-driver-ksqldb
2+
3+
This is the KSqlDb driver for VulcanSQL, provided by [Canner](https://cannerdata.com/).
4+
5+
## Installation
6+
7+
1. Install the package:
8+
9+
```bash
10+
npm i @vulcan-sql/extension-driver-ksqldb
11+
```
12+
13+
2. Update your `vulcan.yaml` file to enable the extension:
14+
15+
```yaml
16+
extensions:
17+
ksqldb: '@vulcan-sql/extension-driver-ksqldb'
18+
```
19+
20+
3. Create a new profile in your `profiles.yaml` file or in the designated profile paths. For more information, please refer to the [KsqlDb documentation](https://ksqldb.io/) for the available arguments.
21+
22+
```yaml
23+
- name: ksql # Profile name
24+
type: ksqldb
25+
connection:
26+
# Optional: KSqlDb instance URL. Default is http://localhost:8088.
27+
host: 'www.example.com:8088'
28+
# Optional: The name of the user on whose behalf requests are made.
29+
username: '<username>',
30+
# The user's password
31+
password: '<password>',
32+
# Optional: Request timeout in milliseconds. Default value: 25000
33+
timeout: 25000
34+
```
35+
36+
## Testing
37+
38+
To run tests for the `extension-driver-ksqldb` module, use the following command:
39+
40+
```bash
41+
nx test extension-driver-ksqldb
42+
```
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
module.exports = {
2+
displayName: 'extension-driver-ksqldb',
3+
preset: '../../jest.preset.ts',
4+
// Use node environment to avoid facing "TypeError: The "listener" argument must be of type function. Received an instance of Object" error
5+
// when using ksqldb client executing query in the jest environment.
6+
testEnvironment: 'node',
7+
globals: {
8+
'ts-jest': {
9+
tsconfig: '<rootDir>/tsconfig.spec.json',
10+
},
11+
},
12+
transform: {
13+
'^.+\\.[tj]s$': 'ts-jest',
14+
},
15+
moduleFileExtensions: ['ts', 'js', 'html'],
16+
coverageDirectory: '../../coverage/packages/extension-driver-ksqldb',
17+
};
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
{
2+
"name": "@vulcan-sql/extension-driver-ksqldb",
3+
"description": "KSqlDb driver for VulcanSQL",
4+
"version": "0.7.1",
5+
"type": "commonjs",
6+
"publishConfig": {
7+
"access": "public"
8+
},
9+
"keywords": [
10+
"vulcan",
11+
"vulcan-sql",
12+
"data",
13+
"sql",
14+
"database",
15+
"data-warehouse",
16+
"data-lake",
17+
"api-builder",
18+
"ksqldb"
19+
],
20+
"repository": {
21+
"type": "git",
22+
"url": "https://github.com/Canner/vulcan.git"
23+
},
24+
"license": "MIT",
25+
"peerDependencies": {
26+
"@vulcan-sql/core": "~0.7.1-0"
27+
}
28+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
{
2+
"root": "packages/extension-driver-ksqldb",
3+
"sourceRoot": "packages/extension-driver-ksqldb/src",
4+
"targets": {
5+
"build": {
6+
"executor": "@nrwl/js:tsc",
7+
"outputs": ["{options.outputPath}"],
8+
"options": {
9+
"outputPath": "dist/packages/extension-driver-ksqldb",
10+
"main": "packages/extension-driver-ksqldb/src/index.ts",
11+
"tsConfig": "packages/extension-driver-ksqldb/tsconfig.lib.json",
12+
"assets": ["packages/extension-driver-ksqldb/*.md"]
13+
}
14+
},
15+
"lint": {
16+
"executor": "@nrwl/linter:eslint",
17+
"outputs": ["{options.outputFile}"],
18+
"options": {
19+
"lintFilePatterns": ["packages/extension-driver-ksqldb/**/*.ts"]
20+
}
21+
},
22+
"test": {
23+
"executor": "@nrwl/jest:jest",
24+
"outputs": ["coverage/packages/extension-driver-ksqldb"],
25+
"options": {
26+
"jestConfig": "packages/extension-driver-ksqldb/jest.config.ts",
27+
"passWithNoTests": true
28+
},
29+
"dependsOn": [
30+
{
31+
"projects": "self",
32+
"target": "install-dependencies"
33+
}
34+
]
35+
},
36+
"publish": {
37+
"executor": "@nrwl/workspace:run-commands",
38+
"options": {
39+
"command": "node ../../../tools/scripts/publish.mjs {args.tag} {args.version}",
40+
"cwd": "dist/packages/extension-driver-ksqldb"
41+
},
42+
"dependsOn": [
43+
{
44+
"projects": "self",
45+
"target": "build"
46+
}
47+
]
48+
}
49+
},
50+
"tags": []
51+
}
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export * from './lib/ksqldbDataSource';
2+
import { KSQLDBDataSource } from './lib/ksqldbDataSource';
3+
export default [KSQLDBDataSource];
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import {
2+
DataColumn,
3+
DataResult,
4+
DataSource,
5+
ExecuteOptions,
6+
InternalError,
7+
RequestParameter,
8+
VulcanExtensionId,
9+
} from '@vulcan-sql/core';
10+
import { Stream } from 'stream';
11+
import { buildSQL, convertSchemaToColumns } from './sqlBuilder';
12+
import { mapFromKsqlDbType } from './typeMapper';
13+
import {
14+
RestfulClient,
15+
RestfulClientOptions,
16+
QueryResponse,
17+
Header,
18+
Row,
19+
FinalMessage,
20+
} from './restfulClient';
21+
22+
@VulcanExtensionId('ksqldb')
23+
export class KSQLDBDataSource extends DataSource<any, any> {
24+
private logger = this.getLogger();
25+
private clientMapping = new Map<
26+
string,
27+
{ client: RestfulClient; options?: RestfulClientOptions }
28+
>();
29+
public override async onActivate() {
30+
const profiles = this.getProfiles().values();
31+
for (const profile of profiles) {
32+
this.logger.debug(
33+
`Initializing profile: ${profile.name} using ksqldb driver`
34+
);
35+
const options = {
36+
...profile.connection!,
37+
};
38+
const client = new RestfulClient(options);
39+
this.clientMapping.set(profile.name, { client, options });
40+
41+
// Testing connection
42+
const isRunning = await client.checkConnectionRunning();
43+
if (!isRunning) {
44+
throw new Error('KsqlDb server is not running');
45+
}
46+
47+
this.logger.debug(`Profile ${profile.name} initialized`);
48+
}
49+
}
50+
51+
public async prepare({ parameterIndex }: RequestParameter) {
52+
return `$${parameterIndex}`;
53+
}
54+
55+
public async execute({
56+
statement: sql,
57+
bindParams,
58+
profileName,
59+
operations,
60+
}: ExecuteOptions): Promise<DataResult> {
61+
this.checkProfileExist(profileName);
62+
const { client } = this.clientMapping.get(profileName)!;
63+
64+
const params = Object.fromEntries(bindParams);
65+
try {
66+
const builtSQL = buildSQL(sql, operations);
67+
const data = await client.query({
68+
query: builtSQL,
69+
query_params: params,
70+
});
71+
return await this.getResultFromRestfulResponse(data);
72+
} catch (e) {
73+
this.logger.debug(
74+
`Errors occurred, release connection from ${profileName}`
75+
);
76+
throw e;
77+
}
78+
}
79+
80+
private async getResultFromRestfulResponse(data: QueryResponse[]) {
81+
const dataRowStream = new Stream.Readable({
82+
objectMode: true,
83+
read: () => null,
84+
// automatically destroy() the stream when it emits 'finish' or errors. Node > 10.16
85+
autoDestroy: true,
86+
});
87+
88+
const headerData = (data[0] as Header).header;
89+
const columns: DataColumn[] = convertSchemaToColumns(headerData.schema);
90+
// add the data row to the stream
91+
for (const innerData of data) {
92+
// format the ksqldb table response to VulcanSQL Data API
93+
// https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-rest-api/query-endpoint/#example-table-response
94+
if ((innerData as Row).row) {
95+
const rowColumns = (innerData as Row).row.columns;
96+
const outputData = rowColumns.reduce((result, value, index) => {
97+
return { ...result, [columns[index].name]: value };
98+
}, {});
99+
dataRowStream.push(outputData);
100+
}
101+
102+
// the end of query result
103+
if ((innerData as FinalMessage).finalMessage) {
104+
dataRowStream.push(null);
105+
}
106+
}
107+
108+
return {
109+
getColumns: () => {
110+
return columns.map((column) => ({
111+
name: column.name || '',
112+
// Convert KsqlDb type to FieldDataType supported by VulcanSQL for generating the response schema in the specification, see: https://github.com/Canner/vulcan-sql/pull/78#issuecomment-1621532674
113+
type: mapFromKsqlDbType(column.type || ''),
114+
}));
115+
},
116+
getData: () => dataRowStream,
117+
};
118+
}
119+
120+
private checkProfileExist(profileName: string) {
121+
if (!this.clientMapping.has(profileName)) {
122+
throw new InternalError(`Profile instance ${profileName} not found`);
123+
}
124+
}
125+
}

0 commit comments

Comments
 (0)