Skip to content

Commit 4af7bf2

Browse files
committed
Delay re-enablement of channel dispatcher
This gives some leeway for the channel to recover before commands start flowing again. There ought to be a way around using a leeway, but my experiments with queuing properly aren't up to scratch yet. [#119713047]
1 parent 25cc2d6 commit 4af7bf2

File tree

4 files changed

+42
-16
lines changed

4 files changed

+42
-16
lines changed

RMQClient/RMQMultipleChannelAllocator.m

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ @interface RMQMultipleChannelAllocator ()
6565
@property (nonatomic, readwrite) NSMutableDictionary *channels;
6666
@property (nonatomic, readwrite) NSNumber *syncTimeout;
6767
@property (nonatomic, readwrite) RMQProcessInfoNameGenerator *nameGenerator;
68+
@property (nonatomic, readwrite) NSNumber *dispatcherReenableDelay;
6869
@end
6970

7071
@implementation RMQMultipleChannelAllocator
@@ -78,6 +79,7 @@ - (instancetype)initWithChannelSyncTimeout:(NSNumber *)syncTimeout {
7879
self.sender = nil;
7980
self.syncTimeout = syncTimeout;
8081
self.nameGenerator = [RMQProcessInfoNameGenerator new];
82+
self.dispatcherReenableDelay = @1;
8183
}
8284
return self;
8385
}
@@ -148,14 +150,19 @@ - (void)unsafeReleaseChannelNumber:(NSNumber *)channelNumber {
148150
}
149151

150152
- (RMQAllocatedChannel *)allocatedChannel:(NSUInteger)channelNumber {
151-
RMQGCDSerialQueue *commandQueue = [self suspendedDispatchQueue:channelNumber
152-
type:@"commands"];
153-
RMQGCDSerialQueue *recoveryQueue = [self suspendedDispatchQueue:channelNumber
154-
type:@"recovery"];
153+
RMQGCDSerialQueue *commandQueue = [self serialQueue:channelNumber type:@"commands"];
154+
[commandQueue suspend];
155+
RMQGCDSerialQueue *recoveryQueue = [self serialQueue:channelNumber type:@"recovery"];
156+
[recoveryQueue suspend];
157+
RMQGCDSerialQueue *enablementQueue = [self serialQueue:channelNumber type:@"enablement"];
158+
155159
RMQSuspendResumeDispatcher *dispatcher = [[RMQSuspendResumeDispatcher alloc] initWithSender:self.sender
156-
commandQueue:commandQueue];
160+
commandQueue:commandQueue
161+
enablementQueue:enablementQueue
162+
enableDelay:self.dispatcherReenableDelay];
157163
RMQSuspendResumeDispatcher *recoveryDispatcher = [[RMQSuspendResumeDispatcher alloc] initWithSender:self.sender
158164
commandQueue:recoveryQueue];
165+
159166
RMQAllocatedChannel *ch = [[RMQAllocatedChannel alloc] init:@(channelNumber)
160167
contentBodySize:@(self.sender.frameMax.integerValue - RMQEmptyFrameSize)
161168
dispatcher:dispatcher
@@ -175,11 +182,9 @@ - (BOOL)atMaxIndex {
175182
return self.channelNumber == RMQChannelLimit;
176183
}
177184

178-
- (RMQGCDSerialQueue *)suspendedDispatchQueue:(UInt16)channelNumber
179-
type:(NSString *)type {
180-
RMQGCDSerialQueue *serialQueue = [[RMQGCDSerialQueue alloc] initWithName:[NSString stringWithFormat:@"channel %d (%@)", channelNumber, type]];
181-
[serialQueue suspend];
182-
return serialQueue;
185+
- (RMQGCDSerialQueue *)serialQueue:(UInt16)channelNumber
186+
type:(NSString *)type {
187+
return [[RMQGCDSerialQueue alloc] initWithName:[NSString stringWithFormat:@"channel %d (%@)", channelNumber, type]];
183188
}
184189

185190
@end

RMQClient/RMQSuspendResumeDispatcher.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@
5757

5858
@interface RMQSuspendResumeDispatcher : NSObject <RMQDispatcher>
5959

60+
- (instancetype)initWithSender:(id<RMQSender>)sender
61+
commandQueue:(id<RMQLocalSerialQueue>)commandQueue
62+
enablementQueue:(id<RMQLocalSerialQueue>)enablementQueue
63+
enableDelay:(NSNumber *)enableDelay;
64+
6065
- (instancetype)initWithSender:(id<RMQSender>)sender
6166
commandQueue:(id<RMQLocalSerialQueue>)commandQueue;
6267

RMQClient/RMQSuspendResumeDispatcher.m

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ @interface RMQSuspendResumeDispatcher ()
6363
@property (nonatomic, readwrite) id<RMQSender> sender;
6464
@property (nonatomic, readwrite) RMQFramesetValidator *validator;
6565
@property (nonatomic, readwrite) id<RMQLocalSerialQueue> commandQueue;
66+
@property (nonatomic, readwrite) id<RMQLocalSerialQueue> enablementQueue;
67+
@property (nonatomic, readwrite) NSNumber *enableDelay;
6668
@property (nonatomic, readwrite) id<RMQConnectionDelegate> delegate;
6769
@property (nonatomic, readwrite) DispatcherState state;
6870
@property (nonatomic, readwrite) BOOL disabled;
@@ -71,19 +73,28 @@ @interface RMQSuspendResumeDispatcher ()
7173
@implementation RMQSuspendResumeDispatcher
7274

7375
- (instancetype)initWithSender:(id<RMQSender>)sender
74-
commandQueue:(id<RMQLocalSerialQueue>)commandQueue {
76+
commandQueue:(id<RMQLocalSerialQueue>)commandQueue
77+
enablementQueue:(id<RMQLocalSerialQueue>)enablementQueue
78+
enableDelay:(NSNumber *)enableDelay {
7579
self = [super init];
7680
if (self) {
7781
self.channel = nil;
7882
self.sender = sender;
7983
self.validator = [RMQFramesetValidator new];
8084
self.commandQueue = commandQueue;
85+
self.enablementQueue = enablementQueue;
86+
self.enableDelay = enableDelay;
8187
self.state = DispatcherStateOpen;
8288
self.disabled = NO;
8389
}
8490
return self;
8591
}
8692

93+
- (instancetype)initWithSender:(id<RMQSender>)sender
94+
commandQueue:(id<RMQLocalSerialQueue>)commandQueue {
95+
return [self initWithSender:sender commandQueue:commandQueue enablementQueue:nil enableDelay:@0];
96+
}
97+
8798
- (void)activateWithChannel:(id<RMQChannel>)channel
8899
delegate:(id<RMQConnectionDelegate>)delegate {
89100
self.channel = channel;
@@ -175,8 +186,10 @@ - (void)disable {
175186
}
176187

177188
- (void)enable {
178-
self.disabled = NO;
179-
[self.commandQueue resume];
189+
[self.enablementQueue delayedBy:self.enableDelay enqueue:^{
190+
self.disabled = NO;
191+
[self.commandQueue resume];
192+
}];
180193
}
181194

182195
- (void)handleFrameset:(RMQFrameset *)frameset {

RMQClientTests/RMQSuspendResumeDispatcherTest.swift

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,18 @@ class RMQSuspendResumeDispatcherTest: XCTestCase {
6868

6969
func testDisableSuspendsCommandQueueAndPreventsFramesetHandlingFromResuming() {
7070
let q = FakeSerialQueue()
71-
let dispatcher = RMQSuspendResumeDispatcher(sender: SenderSpy(), commandQueue: q)
71+
let eq = FakeSerialQueue()
72+
let dispatcher = RMQSuspendResumeDispatcher(sender: SenderSpy(), commandQueue: q, enablementQueue: eq, enableDelay: 3)
7273

7374
dispatcher.disable()
7475
XCTAssertTrue(q.suspended)
7576

7677
dispatcher.handleFrameset(RMQFrameset(channelNumber: 1, method: MethodFixtures.basicQosOk()))
77-
XCTAssertTrue(q.suspended)
78-
7978
dispatcher.enable()
79+
80+
XCTAssertTrue(q.suspended)
81+
XCTAssertEqual(3, eq.enqueueDelay)
82+
try! eq.step()
8083
XCTAssertFalse(q.suspended)
8184
}
8285

0 commit comments

Comments
 (0)