Skip to content

Commit e92e100

Browse files
committed
fix(connection): Block device state updates to Alexa when too many incoming msgs are received
1 parent a7851f6 commit e92e100

File tree

3 files changed

+58
-4
lines changed

3 files changed

+58
-4
lines changed

MqttClient.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ const globalLmiter = new Bottleneck({
77
reservoirIncreaseAmount: 2,
88
reservoirIncreaseInterval: 10000, // must be divisible by 250
99
reservoirIncreaseMaximum: 40,
10-
strategy: Bottleneck.strategy.LEAK,
10+
strategy: Bottleneck.strategy.BLOCK,
1111

1212
// also use maxConcurrent and/or minTime for safety
1313
maxConcurrent: 5,
@@ -32,7 +32,7 @@ function MqttClient (options, callbacksObj) {
3232
reservoirIncreaseAmount: 2,
3333
reservoirIncreaseInterval: 10000, // must be divisible by 250
3434
reservoirIncreaseMaximum: 40,
35-
strategy: Bottleneck.strategy.LEAK,
35+
strategy: Bottleneck.strategy.BLOCK,
3636

3737
// also use maxConcurrent and/or minTime for safety
3838
maxConcurrent: 5,

RateLimiter.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
'use strict'
2+
3+
function RateLimiter ({
4+
highWaterMark = 10,
5+
intervalInSec = 5,
6+
onExhaustionCb
7+
} = {}) {
8+
this.highWaterMark = highWaterMark
9+
this.intervalInSec = intervalInSec * 1000
10+
this.exhaustedCallback = onExhaustionCb
11+
this.counter = 0
12+
13+
this.resetCounter = function () {
14+
this.counter = 0
15+
}
16+
17+
this.intervalHandle = setInterval(
18+
this.resetCounter.bind(this),
19+
this.intervalInSec
20+
)
21+
}
22+
23+
RateLimiter.prototype.execute = function (callback) {
24+
this.counter++
25+
26+
if (this.counter <= this.highWaterMark) {
27+
callback()
28+
}
29+
30+
if (this.counter == this.highWaterMark) {
31+
this.exhaustedCallback()
32+
}
33+
}
34+
35+
module.exports = RateLimiter

virtual-device.js

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
const merge = require('deepmerge')
22
const deepEql = require('deep-eql')
3+
const RateLimiter = require('./RateLimiter')
34
const {
45
getValidators,
56
getDecorator,
@@ -23,6 +24,19 @@ module.exports = function (RED) {
2324
const validators = getValidators(config.template)
2425
const decorator = getDecorator(config.template)
2526

27+
let isIncomingMsgProcessingAllowed = true
28+
29+
const rater = new RateLimiter({
30+
highWaterMark: 15,
31+
intervalInSec: 60,
32+
onExhaustionCb: () => {
33+
isIncomingMsgProcessingAllowed = false
34+
console.log(
35+
'Blocking device state sync to Alexa from now on! Quota exhausted!'
36+
)
37+
}
38+
})
39+
2640
const getLocalState = function () {
2741
return { ...localState }
2842
}
@@ -77,10 +91,15 @@ module.exports = function (RED) {
7791
const mergedState = merge(oldLocalState, approvedState)
7892
const newLocalState = { ...mergedState, source: 'device' }
7993

80-
if (!deepEql(oldLocalState, newLocalState)) {
94+
if (
95+
isIncomingMsgProcessingAllowed &&
96+
!deepEql(oldLocalState, newLocalState)
97+
) {
8198
setLocalState(newLocalState)
8299

83-
connectionNode.updateShadow({ nodeId, type: 'desired' })
100+
rater.execute(() =>
101+
connectionNode.updateShadow({ nodeId, type: 'desired' })
102+
)
84103
}
85104

86105
if (config.passthrough && Object.keys(approvedState).length > 0) {

0 commit comments

Comments
 (0)