Skip to content

Commit b378a0e

Browse files
committed
Merge branch 'feat_where_cn_4_2' into '1.10_test_4.2.x'
Feat where cn 4 2 See merge request dt-insight-engine/flinkStreamSQL!275
2 parents 7270c10 + 97f69c1 commit b378a0e

File tree

38 files changed

+1043
-262
lines changed

38 files changed

+1043
-262
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseAsyncReqRow.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,13 @@ private Map<String, Object> parseInputParam(BaseRow input) {
224224
for (int i = 0; i < sideInfo.getEqualValIndex().size(); i++) {
225225
Integer conValIndex = sideInfo.getEqualValIndex().get(i);
226226
Object equalObj = genericRow.getField(conValIndex);
227-
if (equalObj == null) {
228-
return inputParams;
229-
}
227+
// comment by tiezhu
228+
// 假设SQL中有三个主键[a, b, c],同时主键[b]的值为null,那么
229+
// inputParams中只会有主键[a]的值,主键[b, c]都不包含,导致
230+
// 后面rdb 维表替换where 条件时缺少 value,查询SQL 执行失败
231+
// if (equalObj == null) {
232+
// return inputParams;
233+
// }
230234
String columnName = sideInfo.getEqualFieldList().get(i);
231235
inputParams.put(columnName, equalObj);
232236
}

core/src/main/java/com/dtstack/flink/sql/side/SidePredicatesParser.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.collect.Lists;
2323
import com.google.common.collect.Maps;
2424
import org.apache.calcite.sql.SqlBasicCall;
25+
import org.apache.calcite.sql.SqlCharStringLiteral;
2526
import org.apache.calcite.sql.SqlIdentifier;
2627
import org.apache.calcite.sql.SqlInsert;
2728
import org.apache.calcite.sql.SqlJoin;
@@ -35,7 +36,9 @@
3536
import java.util.List;
3637
import java.util.Map;
3738

38-
import static org.apache.calcite.sql.SqlKind.*;
39+
import static org.apache.calcite.sql.SqlKind.IDENTIFIER;
40+
import static org.apache.calcite.sql.SqlKind.LITERAL;
41+
import static org.apache.calcite.sql.SqlKind.OR;
3942

4043
/**
4144
*
@@ -47,6 +50,7 @@
4750
public class SidePredicatesParser {
4851

4952
private FlinkPlanner flinkPlanner = new FlinkPlanner();
53+
private static final String QUOTE = "'";
5054

5155
public void fillPredicatesForSideTable(String exeSql, Map<String, AbstractSideTableInfo> sideTableMap) throws SqlParseException {
5256
SqlNode sqlNode = flinkPlanner.getParser().parse(exeSql);
@@ -140,26 +144,57 @@ private void extractPredicateInfo(SqlNode whereNode, List<PredicateInfo> predica
140144
}
141145
}
142146

143-
private void fillPredicateInfoToList(SqlBasicCall whereNode, List<PredicateInfo> predicatesInfoList, String operatorName, SqlKind operatorKind,
144-
int fieldIndex, int conditionIndex) {
147+
private void fillPredicateInfoToList(
148+
SqlBasicCall whereNode,
149+
List<PredicateInfo> predicatesInfoList,
150+
String operatorName,
151+
SqlKind operatorKind,
152+
int fieldIndex,
153+
int conditionIndex) {
145154
SqlNode sqlNode = whereNode.getOperands()[fieldIndex];
146155
if (sqlNode.getKind() == SqlKind.IDENTIFIER) {
147156
SqlIdentifier fieldFullPath = (SqlIdentifier) sqlNode;
148157
if (fieldFullPath.names.size() == 2) {
149158
String ownerTable = fieldFullPath.names.get(0);
150159
String fieldName = fieldFullPath.names.get(1);
151-
String content = (operatorKind == SqlKind.BETWEEN) ? whereNode.getOperands()[conditionIndex].toString() + " AND " +
152-
whereNode.getOperands()[2].toString() : whereNode.getOperands()[conditionIndex].toString();
153160

154-
if (StringUtils.containsIgnoreCase(content,SqlKind.CASE.toString())) {
161+
String condition;
162+
SqlNode[] conditionNodes = whereNode.getOperands();
163+
SqlNode literal = conditionNodes[conditionIndex];
164+
if (operatorKind == SqlKind.BETWEEN) {
165+
condition = literal.toString() + " AND " + conditionNodes[2].toString();
166+
} else {
167+
if (literal instanceof SqlCharStringLiteral) {
168+
condition = removeCoding((SqlCharStringLiteral) literal);
169+
} else {
170+
condition = literal.toString();
171+
}
172+
}
173+
174+
if (StringUtils.containsIgnoreCase(condition, SqlKind.CASE.toString())) {
155175
return;
156176
}
157177

158-
PredicateInfo predicateInfo = PredicateInfo.builder().setOperatorName(operatorName).setOperatorKind(operatorKind.toString())
159-
.setOwnerTable(ownerTable).setFieldName(fieldName).setCondition(content).build();
178+
PredicateInfo predicateInfo =
179+
PredicateInfo.builder()
180+
.setOperatorName(operatorName)
181+
.setOperatorKind(operatorKind.toString())
182+
.setOwnerTable(ownerTable)
183+
.setFieldName(fieldName)
184+
.setCondition(condition)
185+
.build();
160186
predicatesInfoList.add(predicateInfo);
161187
}
162188
}
163189
}
164190

191+
/**
192+
* 去掉_UTF16前缀,只获取字符本身。要不然中文编码会有问题。例如 _UTF16'甲' 去掉后返回结果为 '甲'。
193+
*
194+
* @param stringLiteral
195+
* @return
196+
*/
197+
private String removeCoding(SqlCharStringLiteral stringLiteral) {
198+
return QUOTE + stringLiteral.getNlsString().getValue() + QUOTE;
199+
}
165200
}

core/src/main/java/com/dtstack/flink/sql/util/MathUtil.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,8 @@ public static Double getDoubleVal(Object obj) {
129129
return ((BigDecimal) obj).doubleValue();
130130
} else if (obj instanceof Integer) {
131131
return ((Integer) obj).doubleValue();
132+
} else if (obj instanceof BigInteger) {
133+
return ((BigInteger) obj).doubleValue();
132134
}
133135

134136
throw new RuntimeException("not support type of " + obj.getClass() + " convert to Double.");

core/src/test/java/com/dtstack/flink/sql/side/SidePredicatesParserTest.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,13 @@
22

33
import com.google.common.collect.Lists;
44
import org.apache.calcite.sql.SqlBasicCall;
5+
import org.apache.calcite.sql.SqlCharStringLiteral;
56
import org.apache.calcite.sql.SqlIdentifier;
67
import org.apache.calcite.sql.SqlKind;
78
import org.apache.calcite.sql.SqlNode;
9+
import org.apache.calcite.util.NlsString;
810
import org.apache.flink.calcite.shaded.com.google.common.collect.ImmutableList;
11+
import org.junit.Assert;
912
import org.junit.Test;
1013

1114
import java.lang.reflect.Method;
@@ -17,10 +20,9 @@
1720
public class SidePredicatesParserTest {
1821

1922
private SidePredicatesParser sidePredicatesParser = new SidePredicatesParser();
20-
2123

2224
@Test
23-
public void fillPredicateInfoToList() throws Exception {
25+
public void testFillPredicateInfoToList() throws Exception {
2426
Method method = SidePredicatesParser.class.getDeclaredMethod("fillPredicateInfoToList", SqlBasicCall.class, List.class, String.class, SqlKind.class, int.class, int.class);
2527
method.setAccessible(true);
2628
SqlBasicCall sqlBasicCall = mock(SqlBasicCall.class);
@@ -33,4 +35,15 @@ public void fillPredicateInfoToList() throws Exception {
3335
method.invoke(sidePredicatesParser, sqlBasicCall, Lists.newArrayList(), "a", SqlKind.IN, 0, 0);
3436
}
3537

38+
@Test
39+
public void testRemoveCoding() throws Exception {
40+
Method method = SidePredicatesParser.class.getDeclaredMethod("removeCoding", SqlCharStringLiteral.class);
41+
method.setAccessible(true);
42+
SqlCharStringLiteral stringLiteral = mock(SqlCharStringLiteral.class);
43+
when(stringLiteral.getNlsString()).thenReturn(new NlsString("甲", "UTF16", null));
44+
45+
String val = (String) method.invoke(sidePredicatesParser, stringLiteral);
46+
Assert.assertEquals("'甲'", val);
47+
}
48+
3649
}

docs/plugin/kafkaSource.md

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ CREATE TABLE tableName(
4444
|topic | 需要读取的 topic 名称|||
4545
|topicIsPattern | topic是否是正则表达式格式(true&#124;false) |否| false
4646
|offsetReset | 读取的topic 的offset初始位置[latest&#124;earliest&#124;指定offset值({"0":12312,"1":12321,"2":12312},{"partition_no":offset_value})]||latest|
47+
|offsetEnd | 任务停止时的offset位置|||
4748
|parallelism | 并行度设置||1|
4849
|sourcedatatype | 数据类型,avro,csv,json,dt_nest。dt_nest为默认JSON解析器,能够解析嵌套JSON数据类型,其他仅支持非嵌套格式||dt_nest|
4950
|schemaInfo | avro类型使用的schema信息|||
@@ -104,6 +105,23 @@ CREATE TABLE MyTable(
104105
parallelism ='1',
105106
sourcedatatype ='json' #可不设置
106107
);
108+
109+
CREATE TABLE two
110+
(
111+
id int,
112+
name string,
113+
message string
114+
) WITH (
115+
type = 'kafka11',
116+
bootstrapServers = 'kudu1:9092,kudu2:9092,kudu3:9092',
117+
zookeeperQuorum = 'kudu1:2181,kudu2:2181,kudu3:2181/kafka',
118+
offsetReset = '{"0": 0,"1": 0,"2":0}',
119+
-- offsetReset = '{"0": 34689}',
120+
-- offsetReset = 'earliest',
121+
offsetEnd = '{"0": 100,"1": 100,"2":100}',
122+
-- offsetEnd = '{"0": 34789}',
123+
topic = 'kafka'
124+
);
107125
```
108126
## 6.支持嵌套json、数据类型字段解析
109127

elasticsearch5-xh/elasticsearch5-xh-sink/src/main/java/com/dtstack/flink/sql/sink/elasticsearch/table/ElasticsearchTableInfo.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public boolean check() {
166166
Strings.isNullOrEmpty(keytab) &&
167167
Strings.isNullOrEmpty(krb5conf);
168168

169-
Preconditions.checkState(allNotSet, "xh's elasticsearch type of kerberos file is required");
169+
Preconditions.checkState(!allNotSet, "xh's elasticsearch type of kerberos file is required");
170170

171171
return true;
172172
}

0 commit comments

Comments
 (0)