Skip to content

Commit 8228f8c

Browse files
committed
Count still queued messages per JsonRpcConnection
1 parent 73c24a3 commit 8228f8c

File tree

2 files changed

+9
-0
lines changed

2 files changed

+9
-0
lines changed

lib/remote/jsonrpcconnection.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ void JsonRpcConnection::WriteOutgoingMessages(boost::asio::yield_context yc)
154154
}
155155

156156
size_t bytesSent = JsonRpc::SendRawMessage(m_Stream, message, yc);
157+
m_PendingOutgoingMessages--;
157158

158159
if (m_Endpoint) {
159160
m_Endpoint->AddMessageSent(bytesSent);
@@ -230,6 +231,7 @@ void JsonRpcConnection::SendRawMessage(const String& message)
230231

231232
m_OutgoingMessagesQueue.emplace_back(message);
232233
m_OutgoingMessagesQueued.Set();
234+
m_PendingOutgoingMessages++;
233235
});
234236
}
235237

@@ -241,6 +243,7 @@ void JsonRpcConnection::SendMessageInternal(const Dictionary::Ptr& message)
241243

242244
m_OutgoingMessagesQueue.emplace_back(JsonEncode(message));
243245
m_OutgoingMessagesQueued.Set();
246+
m_PendingOutgoingMessages++;
244247
}
245248

246249
void JsonRpcConnection::Disconnect()

lib/remote/jsonrpcconnection.hpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ class JsonRpcConnection final : public Object
5454
Shared<AsioTlsStream>::Ptr GetStream() const;
5555
ConnectionRole GetRole() const;
5656

57+
auto GetPendingOutgoingMessages() const noexcept
58+
{
59+
return m_PendingOutgoingMessages.load();
60+
}
61+
5762
void Disconnect();
5863

5964
void SendMessage(const Dictionary::Ptr& request);
@@ -76,6 +81,7 @@ class JsonRpcConnection final : public Object
7681
boost::asio::io_context::strand m_IoStrand;
7782
std::vector<String> m_OutgoingMessagesQueue;
7883
AsioConditionVariable m_OutgoingMessagesQueued;
84+
Atomic<decltype(m_OutgoingMessagesQueue)::size_type, std::memory_order_relaxed> m_PendingOutgoingMessages {0};
7985
AsioConditionVariable m_WriterDone;
8086
Atomic<bool> m_ShuttingDown;
8187
boost::asio::deadline_timer m_CheckLivenessTimer, m_HeartbeatTimer;

0 commit comments

Comments
 (0)