Skip to content

Commit 6ee0137

Browse files
committed
Avoid slow responses due to missed queue drain after a client sent out all its messages
1 parent fc1b603 commit 6ee0137

File tree

4 files changed

+21
-2
lines changed

4 files changed

+21
-2
lines changed

broker/src/main/java/io/moquette/broker/MQTTConnection.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -406,8 +406,7 @@ void sendIfWritableElseDrop(MqttMessage msg) {
406406
ChannelFuture channelFuture;
407407
if (brokerConfig.isImmediateBufferFlush()) {
408408
channelFuture = channel.writeAndFlush(msg);
409-
}
410-
else {
409+
} else {
411410
channelFuture = channel.write(msg);
412411
}
413412
channelFuture.addListener(FIRE_EXCEPTION_ON_FAILURE);
@@ -501,4 +500,9 @@ public String toString() {
501500
InetSocketAddress remoteAddress() {
502501
return (InetSocketAddress) channel.remoteAddress();
503502
}
503+
504+
public void readCompleted() {
505+
// TODO drain all messages in target's session in-flight message queue
506+
postOffice.flushInFlight(this);
507+
}
504508
}

broker/src/main/java/io/moquette/broker/NewNettyMQTTHandler.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,12 @@ public void operationComplete(ChannelFuture future) {
7070
}
7171
}
7272

73+
@Override
74+
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
75+
final MQTTConnection mqttConnection = mqttConnection(ctx.channel());
76+
mqttConnection.readCompleted();
77+
}
78+
7379
@Override
7480
public void channelActive(ChannelHandlerContext ctx) {
7581
MQTTConnection connection = connectionFactory.create(ctx.channel());

broker/src/main/java/io/moquette/broker/PostOffice.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,4 +308,9 @@ void dispatchDisconnection(String clientId,String userName){
308308
void dispatchConnectionLost(String clientId,String userName){
309309
interceptor.notifyClientConnectionLost(clientId,userName);
310310
}
311+
312+
void flushInFlight(MQTTConnection mqttConnection) {
313+
Session targetSession = sessionRegistry.retrieve(mqttConnection.getClientId());
314+
targetSession.flushAllQueuedMessages();
315+
}
311316
}

broker/src/main/java/io/moquette/broker/Session.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -281,6 +281,10 @@ void pubAckReceived(int ackPacketId) {
281281
drainQueueToConnection();
282282
}
283283

284+
public void flushAllQueuedMessages() {
285+
drainQueueToConnection();
286+
}
287+
284288
public void resendInflightNotAcked() {
285289
Collection<InFlightPacket> expired = new ArrayList<>(INFLIGHT_WINDOW_SIZE);
286290
inflightTimeouts.drainTo(expired);

0 commit comments

Comments
 (0)