1919package com .dtstack .flink .sql .source .kafka ;
2020
2121import com .dtstack .flink .sql .format .DeserializationMetricWrapper ;
22+ import com .dtstack .flink .sql .util .ReflectionUtils ;
2223import org .apache .flink .api .common .serialization .DeserializationSchema ;
2324import org .apache .flink .api .common .typeinfo .TypeInformation ;
2425import org .apache .flink .metrics .Gauge ;
2526import org .apache .flink .metrics .MetricGroup ;
2627import org .apache .flink .streaming .connectors .kafka .internal .KafkaConsumerThread ;
2728import org .apache .flink .streaming .connectors .kafka .internals .AbstractFetcher ;
29+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartition ;
30+ import org .apache .flink .streaming .connectors .kafka .internals .KafkaTopicPartitionState ;
2831import org .apache .flink .types .Row ;
2932import org .apache .kafka .clients .consumer .KafkaConsumer ;
3033import org .apache .kafka .clients .consumer .internals .SubscriptionState ;
3134import org .apache .kafka .common .TopicPartition ;
3235import org .slf4j .Logger ;
3336import org .slf4j .LoggerFactory ;
3437
35- import java .io .IOException ;
3638import java .lang .reflect .Field ;
37- import java .util .Set ;
38- import java .util .concurrent .atomic .AtomicBoolean ;
39+ import java .util .List ;
3940
4041import static com .dtstack .flink .sql .metric .MetricConstant .DT_PARTITION_GROUP ;
4142import static com .dtstack .flink .sql .metric .MetricConstant .DT_TOPIC_GROUP ;
@@ -52,77 +53,89 @@ public class KafkaDeserializationMetricWrapper extends DeserializationMetricWrap
5253
5354 private static final Logger LOG = LoggerFactory .getLogger (KafkaDeserializationMetricWrapper .class );
5455
55- private AbstractFetcher <Row , ?> fetcher ;
56-
57- private AtomicBoolean firstMsg = new AtomicBoolean (true );
58-
5956 private Calculate calculate ;
6057
6158 public KafkaDeserializationMetricWrapper (TypeInformation <Row > typeInfo , DeserializationSchema <Row > deserializationSchema , Calculate calculate ) {
6259 super (typeInfo , deserializationSchema );
6360 this .calculate = calculate ;
6461 }
6562
66- @ Override
67- protected void beforeDeserialize () throws IOException {
68- super .beforeDeserialize ();
69- if (firstMsg .compareAndSet (true , false )) {
70- try {
71- registerPtMetric (fetcher );
72- } catch (Exception e ) {
73- LOG .error ("register topic partition metric error." , e );
74- }
75- }
76- }
77-
7863 protected void registerPtMetric (AbstractFetcher <Row , ?> fetcher ) throws Exception {
79- Field consumerThreadField = getConsumerThreadField (fetcher );
64+ Field consumerThreadField = ReflectionUtils . getDeclaredField (fetcher , "consumerThread" );
8065 consumerThreadField .setAccessible (true );
8166 KafkaConsumerThread consumerThread = (KafkaConsumerThread ) consumerThreadField .get (fetcher );
8267
8368 Field hasAssignedPartitionsField = consumerThread .getClass ().getDeclaredField ("hasAssignedPartitions" );
8469 hasAssignedPartitionsField .setAccessible (true );
8570
86- //wait until assignedPartitions
87-
88- boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
71+ // get subtask unassigned kafka topic partition
72+ Field subscribedPartitionStatesField = ReflectionUtils .getDeclaredField (fetcher , "subscribedPartitionStates" );
73+ subscribedPartitionStatesField .setAccessible (true );
74+ List <KafkaTopicPartitionState <KafkaTopicPartition >> subscribedPartitionStates = (List <KafkaTopicPartitionState <KafkaTopicPartition >>) subscribedPartitionStatesField .get (fetcher );
8975
90- if (!hasAssignedPartitions ) {
91- throw new RuntimeException ("wait 50 secs, but not assignedPartitions" );
92- }
76+ // init partition lag metric
77+ for (KafkaTopicPartitionState <KafkaTopicPartition > kafkaTopicPartitionState : subscribedPartitionStates ) {
78+ KafkaTopicPartition kafkaTopicPartition = kafkaTopicPartitionState .getKafkaTopicPartition ();
79+ MetricGroup topicMetricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , kafkaTopicPartition .getTopic ());
9380
94- Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
95- consumerField .setAccessible (true );
96-
97- KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
98- Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
99- subscriptionStateField .setAccessible (true );
100-
101- //topic partitions lag
102- SubscriptionState subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
103- Set <TopicPartition > assignedPartitions = subscriptionState .assignedPartitions ();
104-
105- for (TopicPartition topicPartition : assignedPartitions ) {
106- MetricGroup metricGroup = getRuntimeContext ().getMetricGroup ().addGroup (DT_TOPIC_GROUP , topicPartition .topic ())
107- .addGroup (DT_PARTITION_GROUP , topicPartition .partition () + "" );
81+ MetricGroup metricGroup = topicMetricGroup .addGroup (DT_PARTITION_GROUP , kafkaTopicPartition .getPartition () + "" );
10882 metricGroup .gauge (DT_TOPIC_PARTITION_LAG_GAUGE , new Gauge <Long >() {
83+ // tmp variable
84+ boolean initLag = true ;
85+ int partitionIndex ;
86+ SubscriptionState subscriptionState ;
87+ TopicPartition topicPartition ;
88+
10989 @ Override
11090 public Long getValue () {
111- return calculate .calc (subscriptionState , topicPartition );
91+ // first time register metrics
92+ if (initLag ) {
93+ partitionIndex = kafkaTopicPartition .getPartition ();
94+ initLag = false ;
95+ return -1L ;
96+ }
97+ // when kafka topic partition assigned calc metrics
98+ if (subscriptionState == null ) {
99+ try {
100+ Field consumerField = consumerThread .getClass ().getDeclaredField ("consumer" );
101+ consumerField .setAccessible (true );
102+
103+ KafkaConsumer kafkaConsumer = (KafkaConsumer ) consumerField .get (consumerThread );
104+ Field subscriptionStateField = kafkaConsumer .getClass ().getDeclaredField ("subscriptions" );
105+ subscriptionStateField .setAccessible (true );
106+
107+ boolean hasAssignedPartitions = (boolean ) hasAssignedPartitionsField .get (consumerThread );
108+
109+ if (!hasAssignedPartitions ) {
110+ LOG .error ("wait 50 secs, but not assignedPartitions" );
111+ }
112+
113+ subscriptionState = (SubscriptionState ) subscriptionStateField .get (kafkaConsumer );
114+
115+ topicPartition = subscriptionState
116+ .assignedPartitions ()
117+ .stream ()
118+ .filter (x -> x .partition () == partitionIndex )
119+ .findFirst ()
120+ .get ();
121+
122+ } catch (Exception e ) {
123+ LOG .error (e .getMessage ());
124+ }
125+ return -1L ;
126+ } else {
127+ return calculate .calc (subscriptionState , topicPartition );
128+ }
112129 }
113130 });
114131 }
115132 }
116133
117134 public void setFetcher (AbstractFetcher <Row , ?> fetcher ) {
118- this .fetcher = fetcher ;
119- }
120-
121- private Field getConsumerThreadField (AbstractFetcher fetcher ) throws NoSuchFieldException {
122135 try {
123- return fetcher . getClass (). getDeclaredField ( "consumerThread" );
136+ registerPtMetric ( fetcher );
124137 } catch (Exception e ) {
125- return fetcher . getClass (). getSuperclass (). getDeclaredField ( "consumerThread" );
138+ LOG . error ( "register topic partition metric error." , e );
126139 }
127140 }
128141}
0 commit comments