Skip to content

Commit 6c8feaf

Browse files
committed
fix sql type mapping
1 parent ea93999 commit 6c8feaf

File tree

1 file changed

+8
-6
lines changed

1 file changed

+8
-6
lines changed

src/main/java/com/databend/kafka/connect/sink/DatabendClient.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -439,7 +439,7 @@ public boolean tableExists(
439439
String[] tableTypes = tableTypes(metadata, this.tableTypes);
440440
String tableTypeDisplay = displayableTableTypes(tableTypes, "/");
441441
glog.info("Checking {} dialect for existence of {} {}", this, tableTypeDisplay, tableId);
442-
glog.info("catalogName is {}, schemaName is {}, tableName is {}", tableId.catalogName(),tableId.schemaName(), tableId.tableName());
442+
glog.info("catalogName is {}, schemaName is {}, tableName is {}", tableId.catalogName(), tableId.schemaName(), tableId.tableName());
443443
try (ResultSet rs = connection.getMetaData().getTables(
444444
tableId.catalogName(),
445445
tableId.schemaName(),
@@ -1622,15 +1622,17 @@ protected String getSqlType(SinkRecordField field) {
16221622
case INT64:
16231623
return "BIGINT";
16241624
case FLOAT32:
1625-
return "REAL";
1625+
return "FLOAT";
16261626
case FLOAT64:
1627-
return "DOUBLE PRECISION";
1627+
return "DOUBLE";
16281628
case BOOLEAN:
16291629
return "BOOLEAN";
16301630
case STRING:
1631-
return "TEXT";
1632-
case BYTES:
1633-
return "BYTEA";
1631+
return "STRING";
1632+
case MAP:
1633+
return "STRING";
1634+
case STRUCT:
1635+
return "VARIANT";
16341636
case ARRAY:
16351637
SinkRecordField childField = new SinkRecordField(
16361638
field.schema().valueSchema(),

0 commit comments

Comments
 (0)