Skip to content

Commit be62b81

Browse files
committed
[feat-1720][kafka] Added kafka sampling inspection feature, and support with kerberos.
1 parent a5c3e05 commit be62b81

File tree

19 files changed

+827
-217
lines changed

19 files changed

+827
-217
lines changed

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/AbstractKafkaConsumerFactory.java

Lines changed: 3 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,10 @@
2020

2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
2222
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
23-
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
24-
import com.dtstack.flink.sql.format.FormatType;
23+
import com.dtstack.flink.sql.source.kafka.deserialization.DeserializationSchemaFactory;
24+
import com.dtstack.flink.sql.source.kafka.deserialization.KafkaDeserializationMetricWrapper;
2525
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
26-
import org.apache.commons.lang3.StringUtils;
27-
import org.apache.flink.api.common.serialization.DeserializationSchema;
2826
import org.apache.flink.api.common.typeinfo.TypeInformation;
29-
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
30-
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
31-
import org.apache.flink.formats.json.DTJsonRowDeserializationSchema;
3227
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
3328
import org.apache.flink.types.Row;
3429

@@ -50,50 +45,8 @@ protected DeserializationMetricWrapper createDeserializationMetricWrapper(KafkaS
5045
TypeInformation<Row> typeInformation,
5146
Calculate calculate) {
5247
return new KafkaDeserializationMetricWrapper(typeInformation,
53-
createDeserializationSchema(kafkaSourceTableInfo, typeInformation),
48+
DeserializationSchemaFactory.createDeserializationSchema(kafkaSourceTableInfo, typeInformation),
5449
calculate,
5550
DirtyDataManager.newInstance(kafkaSourceTableInfo.getDirtyProperties()));
5651
}
57-
58-
private DeserializationSchema<Row> createDeserializationSchema(KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
59-
DeserializationSchema<Row> deserializationSchema = null;
60-
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
61-
deserializationSchema = new DtNestRowDeserializationSchema(typeInformation, kafkaSourceTableInfo.getPhysicalFields(),
62-
kafkaSourceTableInfo.getFieldExtraInfoList(),kafkaSourceTableInfo.getCharsetName());
63-
} else if (FormatType.JSON.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
64-
65-
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
66-
deserializationSchema = new DTJsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
67-
} else if (typeInformation != null && typeInformation.getArity() != 0) {
68-
deserializationSchema = new DTJsonRowDeserializationSchema(typeInformation);
69-
} else {
70-
throw new IllegalArgumentException("sourceDataType:" + FormatType.JSON.name() + " must set schemaString(JSON Schema)or TypeInformation<Row>");
71-
}
72-
73-
} else if (FormatType.CSV.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
74-
75-
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
76-
throw new IllegalArgumentException("sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
77-
}
78-
79-
final CsvRowDeserializationSchema.Builder deserSchemaBuilder = new CsvRowDeserializationSchema.Builder(typeInformation);
80-
deserSchemaBuilder.setFieldDelimiter(kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
81-
deserializationSchema = deserSchemaBuilder.build();
82-
83-
} else if (FormatType.AVRO.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
84-
85-
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
86-
throw new IllegalArgumentException("sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
87-
}
88-
89-
deserializationSchema = new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
90-
}
91-
92-
if (null == deserializationSchema) {
93-
throw new UnsupportedOperationException("FormatType:" + kafkaSourceTableInfo.getSourceDataType());
94-
}
95-
96-
return deserializationSchema;
97-
}
98-
9952
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
20+
21+
import com.dtstack.flink.sql.format.FormatType;
22+
import com.dtstack.flink.sql.format.dtnest.DtNestRowDeserializationSchema;
23+
import com.dtstack.flink.sql.source.kafka.table.KafkaSourceTableInfo;
24+
import org.apache.commons.lang3.StringUtils;
25+
import org.apache.flink.api.common.serialization.DeserializationSchema;
26+
import org.apache.flink.api.common.typeinfo.TypeInformation;
27+
import org.apache.flink.formats.avro.AvroRowDeserializationSchema;
28+
import org.apache.flink.formats.csv.CsvRowDeserializationSchema;
29+
import org.apache.flink.formats.json.DTJsonRowDeserializationSchema;
30+
import org.apache.flink.types.Row;
31+
32+
/**
33+
* Date: 2021/05/25 Company: www.dtstack.com
34+
*
35+
* @author tiezhu
36+
*/
37+
public class DeserializationSchemaFactory {
38+
39+
public static DeserializationSchema<Row> createDeserializationSchema(
40+
KafkaSourceTableInfo kafkaSourceTableInfo, TypeInformation<Row> typeInformation) {
41+
DeserializationSchema<Row> deserializationSchema = null;
42+
if (FormatType.DT_NEST.name().equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
43+
deserializationSchema =
44+
new DtNestRowDeserializationSchema(
45+
typeInformation,
46+
kafkaSourceTableInfo.getPhysicalFields(),
47+
kafkaSourceTableInfo.getFieldExtraInfoList(),
48+
kafkaSourceTableInfo.getCharsetName());
49+
} else if (FormatType.JSON
50+
.name()
51+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
52+
53+
if (StringUtils.isNotBlank(kafkaSourceTableInfo.getSchemaString())) {
54+
deserializationSchema =
55+
new DTJsonRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
56+
} else if (typeInformation != null && typeInformation.getArity() != 0) {
57+
deserializationSchema = new DTJsonRowDeserializationSchema(typeInformation);
58+
} else {
59+
throw new IllegalArgumentException(
60+
"sourceDataType:"
61+
+ FormatType.JSON.name()
62+
+ " must set schemaString(JSON Schema)or TypeInformation<Row>");
63+
}
64+
65+
} else if (FormatType.CSV
66+
.name()
67+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
68+
69+
if (StringUtils.isBlank(kafkaSourceTableInfo.getFieldDelimiter())) {
70+
throw new IllegalArgumentException(
71+
"sourceDataType:" + FormatType.CSV.name() + " must set fieldDelimiter");
72+
}
73+
74+
final CsvRowDeserializationSchema.Builder deserSchemaBuilder =
75+
new CsvRowDeserializationSchema.Builder(typeInformation);
76+
deserSchemaBuilder.setFieldDelimiter(
77+
kafkaSourceTableInfo.getFieldDelimiter().toCharArray()[0]);
78+
deserializationSchema = deserSchemaBuilder.build();
79+
80+
} else if (FormatType.AVRO
81+
.name()
82+
.equalsIgnoreCase(kafkaSourceTableInfo.getSourceDataType())) {
83+
84+
if (StringUtils.isBlank(kafkaSourceTableInfo.getSchemaString())) {
85+
throw new IllegalArgumentException(
86+
"sourceDataType:" + FormatType.AVRO.name() + " must set schemaString");
87+
}
88+
89+
deserializationSchema =
90+
new AvroRowDeserializationSchema(kafkaSourceTableInfo.getSchemaString());
91+
}
92+
93+
if (null == deserializationSchema) {
94+
throw new UnsupportedOperationException(
95+
"FormatType:" + kafkaSourceTableInfo.getSourceDataType());
96+
}
97+
98+
return deserializationSchema;
99+
}
100+
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/DtKafkaDeserializationSchemaWrapper.java renamed to kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/DtKafkaDeserializationSchemaWrapper.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source.kafka;
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
2020

21+
import com.dtstack.flink.sql.source.kafka.sample.OffsetMap;
2122
import org.apache.flink.api.common.serialization.DeserializationSchema;
2223
import org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper;
2324
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
2425
import org.apache.kafka.clients.consumer.ConsumerRecord;
2526

2627
import java.util.ArrayList;
28+
import java.util.HashMap;
2729
import java.util.List;
2830
import java.util.Map;
2931

@@ -34,15 +36,28 @@
3436
*/
3537
public class DtKafkaDeserializationSchemaWrapper<T> extends KafkaDeserializationSchemaWrapper<T> {
3638

37-
private final Map<KafkaTopicPartition, Long> specificEndOffsets;
38-
3939
private final List<Integer> endPartition = new ArrayList<>();
4040

41-
public DtKafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema,
42-
Map<KafkaTopicPartition, Long> specificEndOffsets) {
41+
private Map<KafkaTopicPartition, Long> specificEndOffsets;
4342

43+
public DtKafkaDeserializationSchemaWrapper(DeserializationSchema<T> deserializationSchema) {
4444
super(deserializationSchema);
45-
this.specificEndOffsets = specificEndOffsets;
45+
}
46+
47+
public void setSpecificEndOffsets(OffsetMap offsetMap) {
48+
Map<KafkaTopicPartition, Long> latest = offsetMap.getLatest();
49+
Map<KafkaTopicPartition, Long> earliest = offsetMap.getEarliest();
50+
51+
this.specificEndOffsets = new HashMap<>(latest);
52+
53+
// 除去没有数据的分区,避免任务一直等待分区数据
54+
latest.keySet().forEach(
55+
partition -> {
56+
if (latest.get(partition).equals(earliest.get(partition))) {
57+
specificEndOffsets.remove(partition);
58+
}
59+
}
60+
);
4661
}
4762

4863
@Override
@@ -53,9 +68,9 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
5368
}
5469
if (specificEndOffsets != null) {
5570
Long endOffset = specificEndOffsets.get(topicPartition);
56-
if (endOffset != null && record.offset() >= endOffset) {
71+
if (endOffset != null && record.offset() >= endOffset - 1) {
5772
endPartition.add(record.partition());
58-
return null;
73+
return super.deserialize(record);
5974
}
6075
}
6176

@@ -65,6 +80,7 @@ public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
6580
public boolean isEndOfStream(T nextElement) {
6681
boolean isEnd =
6782
specificEndOffsets != null
83+
&& !specificEndOffsets.isEmpty()
6884
&& endPartition.size() == specificEndOffsets.size();
6985
return super.isEndOfStream(nextElement) || isEnd;
7086
}

kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/KafkaDeserializationMetricWrapper.java renamed to kafka-base/kafka-base-source/src/main/java/com/dtstack/flink/sql/source/kafka/deserialization/KafkaDeserializationMetricWrapper.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
* limitations under the License.
1717
*/
1818

19-
package com.dtstack.flink.sql.source.kafka;
19+
package com.dtstack.flink.sql.source.kafka.deserialization;
2020

2121
import com.dtstack.flink.sql.dirtyManager.manager.DirtyDataManager;
2222
import com.dtstack.flink.sql.format.DeserializationMetricWrapper;
23+
import com.dtstack.flink.sql.source.kafka.Calculate;
2324
import com.dtstack.flink.sql.util.ReflectionUtils;
2425
import org.apache.flink.api.common.serialization.DeserializationSchema;
2526
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -54,13 +55,13 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
5455

5556
private static final Logger LOG = LoggerFactory.getLogger(KafkaDeserializationMetricWrapper.class);
5657

57-
private Calculate calculate;
58+
private final Calculate calculate;
5859

5960
public KafkaDeserializationMetricWrapper(
60-
TypeInformation<Row> typeInfo
61-
, DeserializationSchema<Row> deserializationSchema
62-
, Calculate calculate
63-
, DirtyDataManager dirtyDataManager) {
61+
TypeInformation<Row> typeInfo,
62+
DeserializationSchema<Row> deserializationSchema,
63+
Calculate calculate,
64+
DirtyDataManager dirtyDataManager) {
6465
super(typeInfo, deserializationSchema, dirtyDataManager);
6566
this.calculate = calculate;
6667
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package com.dtstack.flink.sql.source.kafka.sample;
20+
21+
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
22+
import org.apache.kafka.clients.consumer.ConsumerConfig;
23+
import org.apache.kafka.clients.consumer.KafkaConsumer;
24+
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
25+
26+
import java.util.Map;
27+
import java.util.Properties;
28+
29+
/**
30+
* @author tiezhu
31+
* @since 2021/6/15 星期二
32+
*/
33+
public interface OffsetFetcher {
34+
35+
OffsetMap fetchOffset(KafkaConsumer<?, ?> consumer, String topic);
36+
37+
default OffsetMap seekOffset(Properties props, String topic) {
38+
39+
setByteDeserializer(props);
40+
41+
try (KafkaConsumer<?, ?> consumer = new KafkaConsumer<>(props)) {
42+
OffsetMap offsetMap = fetchOffset(consumer, topic);
43+
44+
judgeKafkaSampleIsAvailable(offsetMap);
45+
46+
return offsetMap;
47+
}
48+
}
49+
50+
/**
51+
* Judge whether there is data for consumption in each partition in Kafka. If there is no data
52+
* consumption in all partitions, then this sampling task is not available
53+
*
54+
* @param offsetMap offset map
55+
*/
56+
default void judgeKafkaSampleIsAvailable(OffsetMap offsetMap) {
57+
boolean kafkaSampleIsAvailable = false;
58+
Map<KafkaTopicPartition, Long> latest = offsetMap.getLatest();
59+
Map<KafkaTopicPartition, Long> earliest = offsetMap.getEarliest();
60+
61+
for (KafkaTopicPartition partition : latest.keySet()) {
62+
Long earliestOffset = earliest.get(partition);
63+
Long latestOffset = latest.get(partition);
64+
65+
if (!latestOffset.equals(earliestOffset)) {
66+
kafkaSampleIsAvailable = true;
67+
}
68+
}
69+
70+
if (!kafkaSampleIsAvailable) {
71+
throw new RuntimeException(
72+
"Kafka sample is unavailable because there is no data in all partitions");
73+
}
74+
}
75+
76+
/**
77+
* Makes sure that the ByteArrayDeserializer is registered in the Kafka properties.
78+
*
79+
* @param props The Kafka properties to register the serializer in.
80+
*/
81+
default void setByteDeserializer(Properties props) {
82+
83+
final String deSerName = ByteArrayDeserializer.class.getName();
84+
85+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
86+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
87+
}
88+
}

0 commit comments

Comments
 (0)