Skip to content

Commit 525a5bd

Browse files
lexsfacebook-github-bot
authored andcommitted
Allow sending/receiving frames without having a frame processor
Summary: If we unset the frame processor we can no longer send terminal frames. However if we unset it we risk crashing if we receive any frames while it's unset (this happens outside the stack frame due to a thread hop). Instead we explicitly support the case of no frame processor set by keeping a queue of incoming frames and flushing those when the processor is back. Reviewed By: yschimke Differential Revision: D4921303 fbshipit-source-id: bc3da38242540348318a1d754e64eba8c95639af
1 parent fdab0d6 commit 525a5bd

File tree

4 files changed

+120
-16
lines changed

4 files changed

+120
-16
lines changed

src/FrameTransport.cpp

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,17 @@ void FrameTransport::setFrameProcessor(
5555
}
5656

5757
drainOutputFramesQueue();
58+
if (frameProcessor_) {
59+
while (!pendingReads_.empty()) {
60+
auto frame = std::move(pendingReads_.front());
61+
pendingReads_.pop_front();
62+
frameProcessor_->processFrame(std::move(frame));
63+
}
64+
if (pendingTerminal_) {
65+
terminateFrameProcessor(std::move(*pendingTerminal_));
66+
pendingTerminal_ = folly::none;
67+
}
68+
}
5869
}
5970

6071
void FrameTransport::close(folly::exception_wrapper ex) {
@@ -101,11 +112,10 @@ void FrameTransport::onSubscribe(
101112
void FrameTransport::onNext(std::unique_ptr<folly::IOBuf> frame) noexcept {
102113
std::lock_guard<std::recursive_mutex> lock(mutex_);
103114

104-
if (connection_) {
105-
// if *this is not closed and is pulling frames, it should have
106-
// frameProcessor
107-
CHECK(frameProcessor_);
115+
if (connection_ && frameProcessor_) {
108116
frameProcessor_->processFrame(std::move(frame));
117+
} else {
118+
pendingReads_.emplace_back(std::move(frame));
109119
}
110120
}
111121

@@ -115,6 +125,10 @@ void FrameTransport::terminateFrameProcessor(folly::exception_wrapper ex) {
115125
std::shared_ptr<FrameProcessor> frameProcessor;
116126
{
117127
std::lock_guard<std::recursive_mutex> lock(mutex_);
128+
if (!frameProcessor_) {
129+
pendingTerminal_ = std::move(ex);
130+
return;
131+
}
118132
frameProcessor = std::move(frameProcessor_);
119133
}
120134

@@ -159,9 +173,9 @@ void FrameTransport::cancel() noexcept {
159173
void FrameTransport::outputFrameOrEnqueue(std::unique_ptr<folly::IOBuf> frame) {
160174
std::lock_guard<std::recursive_mutex> lock(mutex_);
161175

162-
// we don't want to be sending frames when frameProcessor_ is not set because
163-
// we wont have a way to process error/terminating signals
164-
if (connection_ && frameProcessor_) {
176+
// We allow sending frames even without a frame processor so it's possible
177+
// to send terminal frames without expecting anything in return
178+
if (connection_) {
165179
drainOutputFramesQueue();
166180
if (pendingWrites_.empty() && writeAllowance_.tryAcquire()) {
167181
// TODO: temporary disabling VLOG as we don't know the correct
@@ -187,7 +201,7 @@ void FrameTransport::outputFrameOrEnqueue(std::unique_ptr<folly::IOBuf> frame) {
187201
void FrameTransport::drainOutputFramesQueue() {
188202
std::lock_guard<std::recursive_mutex> lock(mutex_);
189203

190-
if (connection_ && frameProcessor_) {
204+
if (connection_) {
191205
// Drain the queue or the allowance.
192206
while (!pendingWrites_.empty() && writeAllowance_.tryAcquire()) {
193207
auto frame = std::move(pendingWrites_.front());

src/FrameTransport.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@
55
#include <deque>
66
#include <memory>
77
#include <mutex>
8+
9+
#include <folly/Optional.h>
10+
#include <folly/ExceptionWrapper.h>
11+
812
#include "src/AllowanceSemaphore.h"
913
#include "src/Common.h"
1014
#include "src/FrameProcessor.h"
@@ -77,5 +81,7 @@ class FrameTransport :
7781
std::shared_ptr<Subscription> connectionInputSub_;
7882

7983
std::deque<std::unique_ptr<folly::IOBuf>> pendingWrites_;
84+
std::deque<std::unique_ptr<folly::IOBuf>> pendingReads_;
85+
folly::Optional<folly::exception_wrapper> pendingTerminal_;
8086
};
8187
} // reactivesocket

src/ServerConnectionAcceptor.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -131,17 +131,16 @@ void ServerConnectionAcceptor::processFrame(
131131
break;
132132
}
133133

134+
removeConnection(transport);
134135
auto triedResume =
135136
connectionHandler->resumeSocket(transport, std::move(resumeParams));
136-
if (triedResume) {
137-
connections_.erase(transport);
138-
} else {
137+
if (!triedResume) {
139138
transport->outputFrameOrEnqueue(frameSerializer->serializeOut(
140139
Frame_ERROR::connectionError("can not resume")));
141140
transport->close(std::runtime_error("can not resume"));
142-
connections_.erase(transport);
143141
}
144-
} break;
142+
break;
143+
}
145144

146145
case FrameType::CANCEL:
147146
case FrameType::ERROR:

test/ServerConnectionAcceptorTest.cpp

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ class MockConnectionHandler : public ConnectionHandler {
3939
void(std::shared_ptr<FrameTransport>, folly::exception_wrapper ex));
4040
};
4141

42-
class MockFrameProcessor : public FrameProcessor {
43-
void processFrame(std::unique_ptr<folly::IOBuf>) override {};
44-
void onTerminal(folly::exception_wrapper) override {};
42+
struct MockFrameProcessor : public FrameProcessor {
43+
MOCK_METHOD1(processFrame, void(std::unique_ptr<folly::IOBuf>));
44+
MOCK_METHOD1(onTerminal, void(folly::exception_wrapper));
4545
};
4646

4747
class ServerConnectionAcceptorTest : public Test {
@@ -221,3 +221,88 @@ TEST_F(ServerConnectionAcceptorTest, VerifyTransport) {
221221
clientOutput_->onComplete();
222222
transport_->close(folly::exception_wrapper());
223223
}
224+
225+
TEST_F(ServerConnectionAcceptorTest, VerifyAsyncProcessorFrame) {
226+
ResumeParameters resumeParams(
227+
ResumeIdentificationToken::generateNew(),
228+
1,
229+
2,
230+
FrameSerializer::getCurrentProtocolVersion());
231+
std::shared_ptr<FrameTransport> transport_;
232+
EXPECT_CALL(*handler_, resumeSocket(_, _))
233+
.WillOnce(Invoke(
234+
[&](std::shared_ptr<FrameTransport> transport,
235+
ResumeParameters params) -> bool {
236+
// Only capture the transport, wait with setting the processor.
237+
transport_ = transport;
238+
return true;
239+
}));
240+
241+
auto frameSerializer = FrameSerializer::createCurrentVersion();
242+
acceptor_.accept(std::move(serverConnection_), handler_);
243+
clientOutput_->onNext(frameSerializer->serializeOut(Frame_RESUME(
244+
resumeParams.token,
245+
resumeParams.serverPosition,
246+
resumeParams.clientPosition,
247+
FrameSerializer::getCurrentProtocolVersion())));
248+
249+
// The transport won't have a processor now, try sending a frame
250+
clientOutput_->onNext(frameSerializer->serializeOut(Frame_REQUEST_FNF(
251+
1,
252+
FrameFlags::EMPTY,
253+
Payload())));
254+
255+
auto processor = std::make_shared<NiceMock<MockFrameProcessor>>();
256+
EXPECT_CALL(*processor, onTerminal(_))
257+
.Times(Exactly(0));
258+
EXPECT_CALL(*processor, processFrame(_))
259+
.WillOnce(Invoke([&](std::unique_ptr<folly::IOBuf> frame) {
260+
Frame_REQUEST_FNF fnfFrame;
261+
EXPECT_TRUE(frameSerializer->deserializeFrom(fnfFrame, std::move(frame)));
262+
}));
263+
264+
transport_->setFrameProcessor(processor);
265+
266+
// Teardown will cause a terminal callback
267+
Mock::VerifyAndClearExpectations(processor.get());
268+
clientOutput_->onComplete();
269+
transport_->close(folly::exception_wrapper());
270+
}
271+
272+
TEST_F(ServerConnectionAcceptorTest, VerifyAsyncProcessorTerminal) {
273+
ResumeParameters resumeParams(
274+
ResumeIdentificationToken::generateNew(),
275+
1,
276+
2,
277+
FrameSerializer::getCurrentProtocolVersion());
278+
std::shared_ptr<FrameTransport> transport_;
279+
EXPECT_CALL(*handler_, resumeSocket(_, _))
280+
.WillOnce(Invoke(
281+
[&](std::shared_ptr<FrameTransport> transport,
282+
ResumeParameters params) -> bool {
283+
// Only capture the transport, wait with setting the processor.
284+
transport_ = transport;
285+
return true;
286+
}));
287+
288+
auto frameSerializer = FrameSerializer::createCurrentVersion();
289+
acceptor_.accept(std::move(serverConnection_), handler_);
290+
clientOutput_->onNext(frameSerializer->serializeOut(Frame_RESUME(
291+
resumeParams.token,
292+
resumeParams.serverPosition,
293+
resumeParams.clientPosition,
294+
FrameSerializer::getCurrentProtocolVersion())));
295+
296+
// The transport won't have a processor now, try sending terminal.
297+
clientOutput_->onError(std::runtime_error("too bad"));
298+
299+
auto processor = std::make_shared<StrictMock<MockFrameProcessor>>();
300+
EXPECT_CALL(*processor, onTerminal(_))
301+
.WillOnce(Invoke([&](folly::exception_wrapper ex) {
302+
EXPECT_THAT(ex.what().toStdString(), HasSubstr("too bad"));
303+
}));
304+
305+
transport_->setFrameProcessor(processor);
306+
307+
transport_->close(folly::exception_wrapper());
308+
}

0 commit comments

Comments
 (0)