Skip to content

Commit e4e7412

Browse files
author
gituser
committed
Merge branch '1.10_test_4.2.x' into 1.10_release_4.2.x
2 parents ca2df61 + 981933e commit e4e7412

File tree

22 files changed

+902
-223
lines changed

22 files changed

+902
-223
lines changed

core/src/main/java/com/dtstack/flink/sql/side/BaseSideInfo.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import org.apache.flink.table.runtime.typeutils.BaseRowTypeInfo;
3535

3636
import java.io.Serializable;
37-
import java.util.Arrays;
3837
import java.util.List;
3938
import java.util.Map;
4039

@@ -93,20 +92,17 @@ public void parseSelectFields(JoinInfo joinInfo){
9392
String sideTableName = joinInfo.getSideTableName();
9493
String nonSideTableName = joinInfo.getNonSideTable();
9594
List<String> fields = Lists.newArrayList();
96-
int sideTableFieldIndex;
95+
int sideTableFieldIndex = 0;
9796

9897
for( int i=0; i<outFieldInfoList.size(); i++){
9998
FieldInfo fieldInfo = outFieldInfoList.get(i);
10099
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
101100
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
102101
fields.add(sideFieldName);
103-
sideTableFieldIndex = Arrays.asList(sideTableInfo.getFields()).indexOf(sideFieldName);
104-
if (sideTableFieldIndex == -1){
105-
throw new RuntimeException(String.format("unknown filed {%s} in sideTable {%s} ", sideFieldName, sideTableName));
106-
}
107102
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
108103
sideFieldIndex.put(i, sideTableFieldIndex);
109104
sideFieldNameIndex.put(i, sideFieldName);
105+
sideTableFieldIndex++;
110106
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
111107
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
112108
inFieldIndex.put(i, nonSideIndex);

elasticsearch7/elasticsearch7-side/elasticsearch7-async-side/src/main/java/com/dtstack/flink/sql/side/elasticsearch7/Elasticsearch7AsyncSideInfo.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.calcite.sql.SqlNode;
2929
import org.apache.flink.api.java.typeutils.RowTypeInfo;
3030

31+
import java.util.Arrays;
3132
import java.util.List;
3233

3334
/**
@@ -43,6 +44,40 @@ public Elasticsearch7AsyncSideInfo(RowTypeInfo rowTypeInfo, JoinInfo joinInfo, L
4344
super(rowTypeInfo, joinInfo, outFieldInfoList, sideTableInfo);
4445
}
4546

47+
@Override
48+
public void parseSelectFields(JoinInfo joinInfo){
49+
String sideTableName = joinInfo.getSideTableName();
50+
String nonSideTableName = joinInfo.getNonSideTable();
51+
List<String> fields = Lists.newArrayList();
52+
int sideTableFieldIndex;
53+
54+
for( int i=0; i<outFieldInfoList.size(); i++){
55+
FieldInfo fieldInfo = outFieldInfoList.get(i);
56+
if(fieldInfo.getTable().equalsIgnoreCase(sideTableName)){
57+
String sideFieldName = sideTableInfo.getPhysicalFields().getOrDefault(fieldInfo.getFieldName(), fieldInfo.getFieldName());
58+
fields.add(sideFieldName);
59+
sideTableFieldIndex = Arrays.asList(sideTableInfo.getFields()).indexOf(sideFieldName);
60+
if (sideTableFieldIndex == -1){
61+
throw new RuntimeException(String.format("unknown filed {%s} in sideTable {%s} ", sideFieldName, sideTableName));
62+
}
63+
sideSelectFieldsType.put(sideTableFieldIndex, getTargetFieldType(fieldInfo.getFieldName()));
64+
sideFieldIndex.put(i, sideTableFieldIndex);
65+
sideFieldNameIndex.put(i, sideFieldName);
66+
}else if(fieldInfo.getTable().equalsIgnoreCase(nonSideTableName)){
67+
int nonSideIndex = rowTypeInfo.getFieldIndex(fieldInfo.getFieldName());
68+
inFieldIndex.put(i, nonSideIndex);
69+
}else{
70+
throw new RuntimeException("unknown table " + fieldInfo.getTable());
71+
}
72+
}
73+
74+
if(fields.size() == 0){
75+
throw new RuntimeException("select non field from table " + sideTableName);
76+
}
77+
78+
sideSelectFields = String.join(",", fields);
79+
}
80+
4681
@Override
4782
public void buildEqualInfo(JoinInfo joinInfo, AbstractSideTableInfo sideTableInfo) {
4883

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaConsumerFactory.java

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,10 @@
2020

2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
2222
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
23-
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
24-
import com.dtstack.flink.sql.format.FormatType;
23+
import com.dtstack.flink.sql.source.kafka.deserialization.DeserializationSchemaFactory;
24+
import com.dtstack.flink.sql.source.kafka.deserialization.KafkaDeserializationMetricWrapper;
2525
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
26-
import org.apache.commons.lang3.StringUtils;
27-
import org.apache.flink.api.common.serialization.DeserializationSchema;
2826
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
30-
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
31-
import org.apache.flink.formats.json.DTJsonRowDeserializationSchema;
3227
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
3328
import org.apache.flink.types.Row;
3429

@@ -50,50 +45,8 @@ protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaS
5045
TypeInformation<Row> typeInformation,
5146
Calculate calculate) {
5247
return new KafkaDeserializationMetricWrapper(typeInformation,
53-
createDeserializationSchema(kafkaSourceTableInfo, typeInformation),
48+
DeserializationSchemaFactory.createDeserializationSchema(kafkaSourceTableInfo, typeInformation),
5449
calculate,
5550
DirtyDataManager.newInstance(kafkaSourceTableInfo.getDirtyProperties()));
5651
}
57-
58-
private DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
59-
DeserializationSchema<Row> deserializationSchema = null;
60-
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
61-
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(),
62-
kafkaSourceTableInfo.getFieldExtraInfoList(),kafkaSourceTableInfo.getCharsetName());
63-
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
64-
65-
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
66-
deserializationSchema = new DTJsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
67-
} else if (typeInformation != null && typeInformation.getArity() != 0) {
68-
deserializationSchema = new DTJsonRowDeserializationSchema(typeInformation);
69-
} else {
70-
throw new IllegalArgumentException("sourceDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
71-
}
72-
73-
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
74-
75-
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
76-
throw new IllegalArgumentException("sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
77-
}
78-
79-
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInformation);
80-
deserSchemaBuilder.setFieldDelimiter(kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
81-
deserializationSchema = deserSchemaBuilder.build();
82-
83-
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
84-
85-
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
86-
throw new IllegalArgumentException("sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
87-
}
88-
89-
deserializationSchema = new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
90-
}
91-
92-
if (null == deserializationSchema) {
93-
throw new UnsupportedOperationException("FormatType:" + kafkaSourceTableInfo.getSourceDataType());
94-
}
95-
96-
return deserializationSchema;
97-
}
98-
9952
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
20+
21+
import com.dtstack.flink.sql.format.FormatType;
22+
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
23+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.api.common.serialization.DeserializationSchema;
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
28+
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
29+
import org.apache.flink.formats.json.DTJsonRowDeserializationSchema;
30+
import org.apache.flink.types.Row;
31+
32+
/**
33+
* Date: 2021/05/25 Company: www.dtstack.com
34+
*
35+
* @author tiezhu
36+
*/
37+
public class DeserializationSchemaFactory {
38+
39+
public static DeserializationSchema<Row> createDeserializationSchema(
40+
KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
41+
DeserializationSchema<Row> deserializationSchema = null;
42+
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
43+
deserializationSchema =
44+
new DtNestRowDeserializationSchema(
45+
typeInformation,
46+
kafkaSourceTableInfo.getPhysicalFields(),
47+
kafkaSourceTableInfo.getFieldExtraInfoList(),
48+
kafkaSourceTableInfo.getCharsetName());
49+
} else if (FormatType.JSON
50+
.name()
51+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
52+
53+
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
54+
deserializationSchema =
55+
new DTJsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
56+
} else if (typeInformation != null && typeInformation.getArity() != 0) {
57+
deserializationSchema = new DTJsonRowDeserializationSchema(typeInformation);
58+
} else {
59+
throw new IllegalArgumentException(
60+
"sourceDataType:"
61+
+ FormatType.JSON.name()
62+
+ " must set schemaString(JSON Schema)or TypeInformation<Row>");
63+
}
64+
65+
} else if (FormatType.CSV
66+
.name()
67+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
68+
69+
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
70+
throw new IllegalArgumentException(
71+
"sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
72+
}
73+
74+
final CsvRowDeserializationSchema.Builder deserSchemaBuilder =
75+
new CsvRowDeserializationSchema.Builder(typeInformation);
76+
deserSchemaBuilder.setFieldDelimiter(
77+
kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
78+
deserializationSchema = deserSchemaBuilder.build();
79+
80+
} else if (FormatType.AVRO
81+
.name()
82+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
83+
84+
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
85+
throw new IllegalArgumentException(
86+
"sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
87+
}
88+
89+
deserializationSchema =
90+
new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
91+
}
92+
93+
if (null == deserializationSchema) {
94+
throw new UnsupportedOperationException(
95+
"FormatType:" + kafkaSourceTableInfo.getSourceDataType());
96+
}
97+
98+
return deserializationSchema;
99+
}
100+
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/DtKafkaDeserializationSchemaWrapper.java renamed to kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/DtKafkaDeserializationSchemaWrapper.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source.kafka;
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
2020

21+
import com.dtstack.flink.sql.source.kafka.sample.OffsetMap;
2122
import org.apache.flink.api.common.serialization.DeserializationSchema;
2223
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
2324
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526

2627
import java.util.ArrayList;
28+
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
2931

@@ -34,15 +36,28 @@
3436
*/
3537
public class DtKafkaDeserializationSchemaWrapper<T> extends KafkaDeserializationSchemaWrapper<T> {
3638

37-
private final Map<KafkaTopicPartition, Long> specificEndOffsets;
38-
3939
private final List<Integer> endPartition = new ArrayList<>();
4040

41-
public DtKafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema,
42-
Map<KafkaTopicPartition, Long> specificEndOffsets) {
41+
private Map<KafkaTopicPartition, Long> specificEndOffsets;
4342

43+
public DtKafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
4444
super(deserializationSchema);
45-
this.specificEndOffsets = specificEndOffsets;
45+
}
46+
47+
public void setSpecificEndOffsets(OffsetMap offsetMap) {
48+
Map<KafkaTopicPartition, Long> latest = offsetMap.getLatest();
49+
Map<KafkaTopicPartition, Long> earliest = offsetMap.getEarliest();
50+
51+
this.specificEndOffsets = new HashMap<>(latest);
52+
53+
// 除去没有数据的分区,避免任务一直等待分区数据
54+
latest.keySet().forEach(
55+
partition -> {
56+
if (latest.get(partition).equals(earliest.get(partition))) {
57+
specificEndOffsets.remove(partition);
58+
}
59+
}
60+
);
4661
}
4762

4863
@Override
@@ -53,9 +68,9 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
5368
}
5469
if (specificEndOffsets != null) {
5570
Long endOffset = specificEndOffsets.get(topicPartition);
56-
if (endOffset != null && record.offset() >= endOffset) {
71+
if (endOffset != null && record.offset() >= endOffset - 1) {
5772
endPartition.add(record.partition());
58-
return null;
73+
return super.deserialize(record);
5974
}
6075
}
6176

@@ -65,6 +80,7 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
6580
public boolean isEndOfStream(T nextElement) {
6681
boolean isEnd =
6782
specificEndOffsets != null
83+
&& !specificEndOffsets.isEmpty()
6884
&& endPartition.size() == specificEndOffsets.size();
6985
return super.isEndOfStream(nextElement) || isEnd;
7086
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java renamed to kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/KafkaDeserializationMetricWrapper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source.kafka;
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
2020

2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
2222
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
23+
import com.dtstack.flink.sql.source.kafka.Calculate;
2324
import com.dtstack.flink.sql.util.ReflectionUtils;
2425
import org.apache.flink.api.common.serialization.DeserializationSchema;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,13 +55,13 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
5455

5556
private static final Logger LOG = LoggerFactory.getLogger(KafkaDeserializationMetricWrapper.class);
5657

57-
private Calculate calculate;
58+
private final Calculate calculate;
5859

5960
public KafkaDeserializationMetricWrapper(
60-
TypeInformation<Row> typeInfo
61-
, DeserializationSchema<Row> deserializationSchema
62-
, Calculate calculate
63-
, DirtyDataManager dirtyDataManager) {
61+
TypeInformation<Row> typeInfo,
62+
DeserializationSchema<Row> deserializationSchema,
63+
Calculate calculate,
64+
DirtyDataManager dirtyDataManager) {
6465
super(typeInfo, deserializationSchema, dirtyDataManager);
6566
this.calculate = calculate;
6667
}

0 commit comments

Comments
 (0)