Skip to content

Commit dea1375

Browse files
committed
[feat-37734][postgresql] postgresql sink support adb-postgresql update.
1 parent 9ae1097 commit dea1375

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

localTest/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,32 @@
9999
<version>1.0-SNAPSHOT</version>
100100
</dependency>
101101

102+
103+
<dependency>
104+
<groupId>com.dtstack.flink</groupId>
105+
<artifactId>sql.postgresql</artifactId>
106+
<version>1.0-SNAPSHOT</version>
107+
</dependency>
108+
109+
<dependency>
110+
<groupId>com.dtstack.flink</groupId>
111+
<artifactId>sql.side.all.postgresql</artifactId>
112+
<version>1.0-SNAPSHOT</version>
113+
</dependency>
114+
115+
<dependency>
116+
<groupId>com.dtstack.flink</groupId>
117+
<artifactId>sql.sink.postgresql</artifactId>
118+
<version>1.0-SNAPSHOT</version>
119+
</dependency>
120+
121+
<dependency>
122+
<groupId>com.dtstack.flink</groupId>
123+
<artifactId>sql.side.async.postgresql</artifactId>
124+
<version>1.0-SNAPSHOT</version>
125+
</dependency>
126+
127+
102128
<dependency>
103129
<groupId>com.dtstack.flink</groupId>
104130
<artifactId>sql.hbase</artifactId>

postgresql/postgresql-sink/src/main/java/com/dtstack/flink/sql/sink/postgresql/PostgresqlDialect.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
23+
import org.apache.commons.lang3.StringUtils;
2324

2425
import java.util.Arrays;
2526
import java.util.Optional;
@@ -48,9 +49,18 @@ public Optional<String> getUpsertStatement(String schema, String tableName, Stri
4849
.collect(Collectors.joining(", "));
4950

5051
String updateClause = Arrays.stream(fieldNames)
52+
.filter(item -> !Arrays.asList(uniqueKeyFields).contains(item))
5153
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
5254
.collect(Collectors.joining(", "));
5355

56+
if (StringUtils.isBlank(updateClause)) {
57+
return Optional.of(
58+
getInsertIntoStatement(schema, tableName, fieldNames, null)
59+
+ " ON CONFLICT ("
60+
+ uniqueColumns
61+
+ ") DO NOTHING");
62+
}
63+
5464
return Optional.of(getInsertIntoStatement(schema, tableName, fieldNames, null) +
5565
" ON CONFLICT (" + uniqueColumns + ")" +
5666
" DO UPDATE SET " + updateClause

0 commit comments

Comments
 (0)