1818
1919package com .dtstack .flink .sql .source .kafka .sample ;
2020
21+ import com .dtstack .flink .sql .source .kafka .throwable .KafkaSamplingUnavailableException ;
2122import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartition ;
2223import org .apache .kafka .clients .consumer .ConsumerConfig ;
2324import org .apache .kafka .clients .consumer .KafkaConsumer ;
@@ -41,7 +42,7 @@ default OffsetMap seekOffset(Properties props, String topic) {
4142 try (KafkaConsumer <?, ?> consumer = new KafkaConsumer <>(props )) {
4243 OffsetMap offsetMap = fetchOffset (consumer , topic );
4344
44- judgeKafkaSampleIsAvailable (offsetMap );
45+ judgeKafkaSampleIsAvailable (offsetMap , topic );
4546
4647 return offsetMap ;
4748 }
@@ -53,7 +54,7 @@ default OffsetMap seekOffset(Properties props, String topic) {
5354 *
5455 * @param offsetMap offset map
5556 */
56- default void judgeKafkaSampleIsAvailable (OffsetMap offsetMap ) {
57+ default void judgeKafkaSampleIsAvailable (OffsetMap offsetMap , String topic ) {
5758 boolean kafkaSampleIsAvailable = false ;
5859 Map <KafkaTopicPartition , Long > latest = offsetMap .getLatest ();
5960 Map <KafkaTopicPartition , Long > earliest = offsetMap .getEarliest ();
@@ -68,8 +69,10 @@ default void judgeKafkaSampleIsAvailable(OffsetMap offsetMap) {
6869 }
6970
7071 if (!kafkaSampleIsAvailable ) {
71- throw new RuntimeException (
72- "Kafka sample is unavailable because there is no data in all partitions" );
72+ throw new KafkaSamplingUnavailableException (
73+ String .format (
74+ "Kafka sampling of [%s] is unavailable because there is no data in all partitions" ,
75+ topic ));
7376 }
7477 }
7578
0 commit comments