Skip to content

Commit 1fbbaca

Browse files
lhotarinodece
andcommitted
[fix][client] Fix getPendingQueueSize for PartitionedTopicProducerStatsRecorderImpl: avoid NPE and implement aggregation (apache#24830)
Co-authored-by: Zixuan Liu <nodeces@gmail.com> (cherry picked from commit a43762b)
1 parent 5664272 commit 1fbbaca

File tree

3 files changed

+34
-1
lines changed

3 files changed

+34
-1
lines changed

pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedTopicProducerStatsRecorderImpl.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ public class PartitionedTopicProducerStatsRecorderImpl extends ProducerStatsReco
3535
private final DoubleAdder sendMsgsRateAggregate;
3636
private final DoubleAdder sendBytesRateAggregate;
3737
private int partitions = 0;
38+
private int pendingQueueSize;
3839

3940
public PartitionedTopicProducerStatsRecorderImpl() {
4041
super();
@@ -46,6 +47,7 @@ public PartitionedTopicProducerStatsRecorderImpl() {
4647
void reset() {
4748
super.reset();
4849
partitions = 0;
50+
pendingQueueSize = 0;
4951
}
5052

5153
void updateCumulativeStats(String partition, ProducerStats stats) {
@@ -58,6 +60,7 @@ void updateCumulativeStats(String partition, ProducerStats stats) {
5860
sendMsgsRateAggregate.add(stats.getSendMsgsRate());
5961
sendBytesRateAggregate.add(stats.getSendBytesRate());
6062
partitions++;
63+
pendingQueueSize += stats.getPendingQueueSize();
6164
}
6265

6366
@Override
@@ -75,5 +78,10 @@ public Map<String, ProducerStats> getPartitionStats() {
7578
return partitionStats;
7679
}
7780

81+
@Override
82+
public int getPendingQueueSize() {
83+
return pendingQueueSize;
84+
}
85+
7886
private static final Logger log = LoggerFactory.getLogger(PartitionedTopicProducerStatsRecorderImpl.class);
7987
}

pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -332,7 +332,7 @@ public double getSendLatencyMillisMax() {
332332

333333
@Override
334334
public int getPendingQueueSize() {
335-
return producer.getPendingQueueSize();
335+
return producer != null ? producer.getPendingQueueSize() : 0;
336336
}
337337

338338
public void cancelStatsTimeout() {

pulsar-client/src/test/java/org/apache/pulsar/client/impl/ProducerStatsRecorderImplTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,4 +89,29 @@ public void testPartitionTopicAggegationStats() {
8989
assertTrue(recorder2.getSendBytesRate() > 0);
9090
assertTrue(recorder2.getSendMsgsRate() > 0);
9191
}
92+
93+
@Test
94+
public void testPartitionedTopicProducerStatsPendingQueueSizeDoesntNPE() {
95+
PartitionedTopicProducerStatsRecorderImpl recorder = new PartitionedTopicProducerStatsRecorderImpl();
96+
assertEquals(recorder.getPendingQueueSize(), 0);
97+
}
98+
99+
@Test
100+
public void testProducerStatsPendingQueueSizeDoesntNPE() {
101+
ProducerStatsRecorderImpl recorder = new ProducerStatsRecorderImpl();
102+
assertEquals(recorder.getPendingQueueSize(), 0);
103+
}
104+
105+
@Test
106+
public void testPartitionedTopicProducerStatsPendingQueueSizeAggregated() {
107+
PartitionedTopicProducerStatsRecorderImpl recorder = new PartitionedTopicProducerStatsRecorderImpl();
108+
109+
ProducerStatsRecorderImpl individualStats = spy(new ProducerStatsRecorderImpl());
110+
when(individualStats.getPendingQueueSize()).thenReturn(1);
111+
recorder.updateCumulativeStats("1", individualStats);
112+
recorder.updateCumulativeStats("2", individualStats);
113+
recorder.updateCumulativeStats("3", individualStats);
114+
115+
assertEquals(recorder.getPendingQueueSize(), 3);
116+
}
92117
}

0 commit comments

Comments
 (0)