Skip to content

Commit 7ed710e

Browse files
authored
Allow dynamic limiter to scale ingest rate below static limit (#851)
1 parent 0cb3e9a commit 7ed710e

File tree

4 files changed

+71
-22
lines changed

4 files changed

+71
-22
lines changed

processor/ratelimitprocessor/README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ Telemetry and metrics:
313313

314314
### Throttling rate based on custom logic using `window_configurator`
315315

316-
The `window_configurator` option configures a custom OpenTelemetry Collector extension to dynamically choose a window multiplier for the current period. The value is the extension ID (the extension's configured name). The extension MUST implement the `WindowConfigurator` interface. The multiplier can be used to scale up the rate limit from previous window (by returning a multiplier greater than `1`) or scale down the rate limit from previous window (by returning a multiplier less than `1`, greater than `0`). If the extension returns a negative value then the `default_window_multiplier` will be used. Note that the dynamic limit MUST be at least the configured `static_rate`, ensuring a minimum level of throughput. An example configuration including the window configurator:
316+
The `window_configurator` option configures a custom OpenTelemetry Collector extension to dynamically choose a window multiplier for the current period. The value is the extension ID (the extension's configured name). The extension MUST implement the `WindowConfigurator` interface. The multiplier can be used to scale up the rate limit from the previous window (by returning a multiplier greater than `1`) or scale down the rate limit from the previous window (by returning a multiplier less than `1`, greater than `0`). The lowest possible value of the rate is `1/s`. If the extension returns a negative value, then the `default_window_multiplier` will be used. An example configuration including the window configurator:
317317

318318
```yaml
319319
processors:

processor/ratelimitprocessor/config.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -83,9 +83,8 @@ type DynamicRateLimiting struct {
8383
// WindowConfigurator is the component ID of the extension to dynamically
8484
// determine the window multiplier. The extension is expected to implement
8585
// the `WindowConfigurator` interface. The window configurator is used in
86-
// the hot path so it should respond fast. The effective rate cannot go
87-
// below the configured static rate limit settings. If the configurator
88-
// returns a negative multiplier then the default multiplier will be used.
86+
// the hot path so it should respond fast.If the configurator returns a
87+
// negative multiplier then the default multiplier will be used.
8988
WindowConfigurator component.ID `mapstructure:"window_configurator"`
9089
}
9190

processor/ratelimitprocessor/gubernator.go

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,11 +407,21 @@ func (r *gubernatorRateLimiter) getDynamicLimit(ctx context.Context,
407407
}
408408
// Only record the incoming hits when the current rate is within the allowed
409409
// range, otherwise, do not record the hits and return the calculated rate.
410-
// MaxAllowed sets a ceiling on the rate with the window duration.
410+
// MaxAllowed sets a ceiling on the rate with the window duration. If the
411+
// previous period had hits and the window multiplier is suggesting lowering
412+
// the ingestion rate then the MaxAllowed will be allowed to go below the
413+
// static rate (to as low as `1`). As soon as the window multiplier suggests
414+
// increasing the ingestion rate, the MaxAllowed will jump to a minimum of
415+
// static rate.
411416
//
412417
// NOTE(marclop) We may want to add a follow-up static ceiling to avoid
413418
// unbounded growth.
414-
maxAllowed := math.Max(staticRate, previous*windowMultiplier)
419+
var maxAllowed float64
420+
if previous > 0 && windowMultiplier <= 1 {
421+
maxAllowed = max(1, previous*windowMultiplier)
422+
} else {
423+
maxAllowed = max(staticRate, previous*windowMultiplier)
424+
}
415425
// Normalise the current rate assuming no more events will occur during the
416426
// rest of the window. This will ensure that we record hits based on the
417427
// currently observed hits and NOT based on extrapolated data.
@@ -423,6 +433,19 @@ func (r *gubernatorRateLimiter) getDynamicLimit(ctx context.Context,
423433
return -1, err
424434
}
425435
}
436+
if r.logger.Level() == zap.DebugLevel {
437+
r.logger.Debug(
438+
"Dynamic rate limiting applied",
439+
zap.Dict(
440+
"ratelimit",
441+
zap.String("unique_key", uniqueKey),
442+
zap.Float64("multiplier", windowMultiplier),
443+
zap.Float64("static_rate", staticRate),
444+
zap.Float64("previous_rate", previous),
445+
zap.Float64("limit", maxAllowed),
446+
),
447+
)
448+
}
426449
return maxAllowed, nil
427450
}
428451

processor/ratelimitprocessor/gubernator_test.go

Lines changed: 43 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -734,8 +734,8 @@ func TestGubernatorRateLimiter_WindowConfigurator(t *testing.T) {
734734
// Set the windowConfigurator manually
735735
rateLimiter.windowConfigurator = &fakeWindowConfigurator{
736736
mapping: map[string][]float64{
737-
"x-tenant-id:12345": {1.5}, // more ingest data
738-
"x-tenant-id:67890": {-1, 2, 2, 0.5}, // throttle ingest data
737+
"x-tenant-id:12345": {1.5}, // more ingest data
738+
"x-tenant-id:67890": {-1, 2, 2, 0.5, 0.01}, // throttle ingest data
739739
},
740740
count: make(map[string]int),
741741
}
@@ -747,7 +747,8 @@ func TestGubernatorRateLimiter_WindowConfigurator(t *testing.T) {
747747
})
748748

749749
waitUntilNextPeriod(windowDuration)
750-
// 1st non-zero window: establish baseline with static limit
750+
// 1st non-zero window: establish baseline with static limit. Since the previous
751+
// hits at this point were `0` the limit will always be equal to static limit.
751752
actual := int64(staticRatePerSec) * int64(windowDuration) / int64(time.Second)
752753
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
753754
drainEvents(eventChannel)
@@ -756,11 +757,12 @@ func TestGubernatorRateLimiter_WindowConfigurator(t *testing.T) {
756757
// 2nd window: ramp up by 1.5 factor as configured for the tenant
757758
actual = int64(100) * int64(windowDuration) / int64(time.Second)
758759
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
760+
expectedLimit := int64(float64(staticRatePerSec) * 1.5) // 150% of previous hits/s
759761
verify := func(evc <-chan gubernator.HitEvent) {
760762
t.Helper()
761763
event := lastReqLimitEvent(drainEvents(evc), t)
762764
assertRequestRateLimitEvent(t, "x-tenant-id:12345", event,
763-
actual, 750, 750-actual, gubernator.Status_UNDER_LIMIT,
765+
actual, expectedLimit, expectedLimit-actual, gubernator.Status_UNDER_LIMIT,
764766
)
765767
}
766768
verify(eventChannel)
@@ -775,49 +777,73 @@ func TestGubernatorRateLimiter_WindowConfigurator(t *testing.T) {
775777
uniqueKey := "x-tenant-id:67890"
776778

777779
waitUntilNextPeriod(windowDuration)
778-
// 1st non-zero window: establish baseline with static limit
780+
// 1st non-zero window: establish baseline with static limit. Since the previous
781+
// hits at this point were `0` the limit will always be equal to static limit.
779782
actual := int64(staticRatePerSec) * int64(windowDuration) / int64(time.Second)
780783
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
781784
drainEvents(eventChannel)
782785

783786
waitUntilNextPeriod(windowDuration)
784787
// 2nd window: scale up by 2 factor as configured for the tenant
785-
actual = int64(1000) * int64(windowDuration) / int64(time.Second)
788+
expectedLimit := int64(float64(staticRatePerSec) * 2) // 200% of previous hits/s
789+
// maximize the hits so that the rate is fully utilized
790+
actual = expectedLimit * int64(windowDuration) / int64(time.Second)
786791
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
787792
assertRequestRateLimitEvent(
788793
t, uniqueKey,
789794
lastReqLimitEvent(drainEvents(eventChannel), t),
790-
actual, 1000, 1000-actual, gubernator.Status_UNDER_LIMIT,
795+
actual, expectedLimit, expectedLimit-actual, gubernator.Status_UNDER_LIMIT,
791796
)
792797

793798
waitUntilNextPeriod(windowDuration)
794799
// 3nd window: scale up by 2 factor as configured for the tenant
795-
actual = int64(2000) * int64(windowDuration) / int64(time.Second)
800+
expectedLimit = expectedLimit * 2 // 200% of previous hits/s
801+
// maximize the hits so that the rate is fully utilized
802+
actual = expectedLimit * int64(windowDuration) / int64(time.Second)
796803
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
797804
assertRequestRateLimitEvent(
798805
t, uniqueKey,
799806
lastReqLimitEvent(drainEvents(eventChannel), t),
800-
actual, 2000, 2000-actual, gubernator.Status_UNDER_LIMIT,
807+
actual, expectedLimit, expectedLimit-actual, gubernator.Status_UNDER_LIMIT,
801808
)
802809

803810
waitUntilNextPeriod(windowDuration)
804811
// 4th window: scale down by 50% as configured for the tenant
805-
actual = int64(100) * int64(windowDuration) / int64(time.Second)
812+
expectedLimit = int64(float64(expectedLimit) * 0.5) // 50% of previous hits/s
813+
// maximize the hits so that the rate is fully utilized
814+
actual = expectedLimit * int64(windowDuration) / int64(time.Second)
806815
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
807816
assertRequestRateLimitEvent(
808817
t, uniqueKey,
809818
lastReqLimitEvent(drainEvents(eventChannel), t),
810-
actual, 1000, 1000-actual, gubernator.Status_UNDER_LIMIT,
819+
actual, expectedLimit, expectedLimit-actual, gubernator.Status_UNDER_LIMIT,
811820
)
812821

813-
// wait for a couple of periods to assert static rate as the lower bound
814-
waitUntilNextPeriod(2 * windowDuration)
815-
actual = int64(100) * int64(windowDuration) / int64(time.Second)
822+
waitUntilNextPeriod(windowDuration)
823+
// 5th window: scale down by 1% as configured for the tenant and assert
824+
// that the rate goes below static limit
825+
expectedLimit = int64(float64(expectedLimit) * 0.01) // 1% of previous hits/s
826+
// maximize the hits so that the rate is fully utilized
827+
actual = expectedLimit * int64(windowDuration) / int64(time.Second)
828+
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
829+
assertRequestRateLimitEvent(
830+
t, uniqueKey,
831+
lastReqLimitEvent(drainEvents(eventChannel), t),
832+
actual, expectedLimit, expectedLimit-actual, gubernator.Status_UNDER_LIMIT,
833+
)
834+
835+
// wait for a few periods to assert rate that the rate goes to be equal to the
836+
// static rate as data is NOT sent in the first waiting window causing the
837+
// previous rate to go to zero in the next window which we will assert.
838+
waitUntilNextPeriod(windowDuration)
839+
waitUntilNextPeriod(windowDuration)
840+
expectedLimit = int64(staticRatePerSec)
841+
actual = int64(100) * int64(windowDuration) / int64(time.Second) // send 100 hits
816842
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
817843
assertRequestRateLimitEvent(
818844
t, uniqueKey,
819845
lastReqLimitEvent(drainEvents(eventChannel), t),
820-
actual, 500, 500-actual, gubernator.Status_UNDER_LIMIT,
846+
actual, expectedLimit, expectedLimit-actual, gubernator.Status_UNDER_LIMIT,
821847
)
822848
})
823849
t.Run("unknown tenant, use default multiplier", func(t *testing.T) {
@@ -828,12 +854,13 @@ func TestGubernatorRateLimiter_WindowConfigurator(t *testing.T) {
828854
})
829855

830856
actual := int64(400)
857+
expectedLimit := int64(staticRatePerSec)
831858
require.NoError(t, rateLimiter.RateLimit(ctx, int(actual)))
832859
verify := func(evc <-chan gubernator.HitEvent) {
833860
t.Helper()
834861
event := lastReqLimitEvent(drainEvents(evc), t)
835862
assertRequestRateLimitEvent(t, "x-tenant-id:unknown", event,
836-
actual, 500, 500-actual, gubernator.Status_UNDER_LIMIT,
863+
actual, expectedLimit, expectedLimit-actual, gubernator.Status_UNDER_LIMIT,
837864
)
838865
}
839866
verify(eventChannel)

0 commit comments

Comments
 (0)