Skip to content

Commit 1b532fe

Browse files
author
gituser
committed
Merge branch '1.10_release_4.1.x' into 1.10_release_4.2.x
2 parents 60d3676 + 0b0fe8c commit 1b532fe

File tree

2 files changed

+36
-0
lines changed

2 files changed

+36
-0
lines changed

localTest/pom.xml

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,32 @@
129129
<version>1.0-SNAPSHOT</version>
130130
</dependency>
131131

132+
133+
<dependency>
134+
<groupId>com.dtstack.flink</groupId>
135+
<artifactId>sql.postgresql</artifactId>
136+
<version>1.0-SNAPSHOT</version>
137+
</dependency>
138+
139+
<dependency>
140+
<groupId>com.dtstack.flink</groupId>
141+
<artifactId>sql.side.all.postgresql</artifactId>
142+
<version>1.0-SNAPSHOT</version>
143+
</dependency>
144+
145+
<dependency>
146+
<groupId>com.dtstack.flink</groupId>
147+
<artifactId>sql.sink.postgresql</artifactId>
148+
<version>1.0-SNAPSHOT</version>
149+
</dependency>
150+
151+
<dependency>
152+
<groupId>com.dtstack.flink</groupId>
153+
<artifactId>sql.side.async.postgresql</artifactId>
154+
<version>1.0-SNAPSHOT</version>
155+
</dependency>
156+
157+
132158
<dependency>
133159
<groupId>com.dtstack.flink</groupId>
134160
<artifactId>sql.hbase</artifactId>

postgresql/postgresql-sink/src/main/java/com/dtstack/flink/sql/sink/postgresql/PostgresqlDialect.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121

2222
import com.dtstack.flink.sql.sink.rdb.dialect.JDBCDialect;
23+
import org.apache.commons.lang3.StringUtils;
2324

2425
import java.util.Arrays;
2526
import java.util.Optional;
@@ -48,9 +49,18 @@ public Optional<String> getUpsertStatement(String schema, String tableName, Stri
4849
.collect(Collectors.joining(", "));
4950

5051
String updateClause = Arrays.stream(fieldNames)
52+
.filter(item -> !Arrays.asList(uniqueKeyFields).contains(item))
5153
.map(f -> quoteIdentifier(f) + "=EXCLUDED." + quoteIdentifier(f))
5254
.collect(Collectors.joining(", "));
5355

56+
if (StringUtils.isBlank(updateClause)) {
57+
return Optional.of(
58+
getInsertIntoStatement(schema, tableName, fieldNames, null)
59+
+ " ON CONFLICT ("
60+
+ uniqueColumns
61+
+ ") DO NOTHING");
62+
}
63+
5464
return Optional.of(getInsertIntoStatement(schema, tableName, fieldNames, null) +
5565
" ON CONFLICT (" + uniqueColumns + ")" +
5666
" DO UPDATE SET " + updateClause

0 commit comments

Comments
 (0)