From 6523df8b4c601684d66489d026c9e3263845c5cd Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 9 May 2024 14:12:05 -0600 Subject: [PATCH 1/9] Add the bones of a hedging system We need a budget and a way of tracking percentiles --- .../dns/discovery/netty/HedgingDnsClient.java | 208 ++++++++++++++++++ 1 file changed, 208 insertions(+) create mode 100644 servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsClient.java diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsClient.java new file mode 100644 index 0000000000..7e9bff4222 --- /dev/null +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsClient.java @@ -0,0 +1,208 @@ +package io.servicetalk.dns.discovery.netty; + +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; + +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; +import static java.lang.Math.max; +import static java.lang.Math.min; + +final class HedgingWrapper implements Function> { + + private final EventLoop eventLoop; + private final Function> computation; + private final PercentileTracker percentile; + private final Budget budget; + + HedgingWrapper(EventLoop eventLoop, Function> computation) { + this(eventLoop, computation, defaultTracker(), defaultBudget()); + } + + HedgingWrapper(EventLoop eventLoop, Function> computation, PercentileTracker percentile, + Budget budget) { + this.eventLoop = eventLoop; + this.computation = computation; + this.percentile = percentile; + this.budget = budget; + } + + @Override + public Future apply(T t) { + return doApply(t, eventLoop.newPromise()); + } + + private Future doApply(T t, Promise promise) { + // Only add tokens for organic requests, not retries. + budget.deposit(); + Future underlyingResult = computation.apply(t); + final long startTime = System.currentTimeMillis(); + final long deadline = startTime + percentile.getValue(); + Future hedgeTimer = eventLoop.schedule(() -> maybeApplyHedge(t, underlyingResult, promise), + deadline, TimeUnit.MILLISECONDS); + underlyingResult.addListener(completedFuture -> { + measureRequest(System.currentTimeMillis() - startTime, completedFuture.isSuccess()); + if (complete(underlyingResult, promise)) { + hedgeTimer.cancel(true); + } + }); + + return promise; + } + + private void maybeApplyHedge(T t, Future original, Promise promise) { + if (budget.withdraw() && !original.isDone()) { + Future backupResult = computation.apply(t); + backupResult.addListener(done -> { + if (complete(backupResult, promise)) { + original.cancel(true); + } + }); + promise.addListener(complete -> backupResult.cancel(true)); + } + } + + private void measureRequest(long durationMs, boolean succeeded) { + if (succeeded) { + percentile.addSample(durationMs); + } + } + + private boolean complete(Future f, Promise p) { + assert f.isDone(); + if (f.isSuccess()) { + return p.trySuccess(f.getNow()); + } else { + return p.tryFailure(f.cause()); + } + } + + private interface PercentileTracker { + void addSample(long sample); + + long getValue(); + } + + private interface Budget { + void deposit(); + + boolean withdraw(); + } + + // TODO: both these implementations rely on access being serialized by the netty event loop. + private static final class DefaultBudgetImpl implements Budget { + + private final int depositAmount; + private final int withDrawAmount; + private final int maxTokens; + private int tokens; + + DefaultBudgetImpl(int depositAmount, int withDrawAmount, int maxTokens) { + this(depositAmount, withDrawAmount, maxTokens, 0); + } + + DefaultBudgetImpl(int depositAmount, int withDrawAmount, int maxTokens, int initialTokens) { + this.depositAmount = depositAmount; + this.withDrawAmount = withDrawAmount; + this.maxTokens = maxTokens; + initialTokens = initialTokens; + } + + + @Override + public void deposit() { + tokens = max(maxTokens, tokens + depositAmount); + } + + @Override + public boolean withdraw() { + if (tokens < withDrawAmount) { + return false; + } else { + tokens -= withDrawAmount; + return true; + } + } + } + + // TODO: we shouldn't need to worry about concurrency if this is all happening in the same netty channel. + private static final class DefaultPercentileTracker implements PercentileTracker { + + // TODO: we need to make the buckets grow exponentially to save space. + private final int[] buckets; + private final double percentile; + private final int sampleThreshold; + private long lastValue; + private int sampleCount; + + public DefaultPercentileTracker(int buckets, double percentile, int sampleThreshold) { + if (percentile < 0 || percentile > 1) { + throw new IllegalArgumentException("Unexpected percentile value: " + percentile); + } + this.buckets = new int[ensurePositive(buckets, "buckets")]; + this.percentile = percentile; + this.sampleThreshold = ensurePositive(sampleThreshold, "sampleThreshold"); + lastValue = Long.MAX_VALUE; + } + + @Override + public void addSample(long value) { + maybeSwap(); + int bucket = valueToBucket(value); + buckets[bucket]++; + sampleCount++; + } + + @Override + public long getValue() { + maybeSwap(); + return lastValue; + } + + + private void maybeSwap() { + if (shouldSwap()) { + lastValue = compute(); + } + } + private boolean shouldSwap() { + return sampleCount >= sampleThreshold; + } + + private long compute() { + long targetCount = (long)(sampleCount * percentile); + sampleCount = 0; + long result = -1; + for (int i = 0; i < buckets.length; i++) { + if (result != -1) { + targetCount -= buckets[i]; + if (targetCount <= 0) { + result = bucketToValue(i); + } + } + buckets[i] = 0; + } + assert result != -1; // we should have found a bucket. + return result; + } + + private long bucketToValue(int bucket) { + return bucket; + } + + private int valueToBucket(long value) { + return (int) max(0, min(buckets.length, value)); + } + } + + private static PercentileTracker defaultTracker() { + return new DefaultPercentileTracker(128, 0.98, 200); + } + + private static Budget defaultBudget() { + return new DefaultBudgetImpl(1, 20, 100); + } +} From 0227d18a7e162a3e871d36320387cdd68834fd3c Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 9 May 2024 17:10:01 -0600 Subject: [PATCH 2/9] Work with the Dns interface --- .../dns/discovery/netty/DefaultDnsClient.java | 6 +- ...lient.java => HedgingDnsNameResolver.java} | 72 ++++++++++++------- 2 files changed, 50 insertions(+), 28 deletions(-) rename servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/{HedgingDnsClient.java => HedgingDnsNameResolver.java} (69%) diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index 06a5de2f73..2ac47fe10b 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -47,7 +47,6 @@ import io.netty.resolver.dns.DefaultAuthoritativeDnsServerCache; import io.netty.resolver.dns.DefaultDnsCache; import io.netty.resolver.dns.DefaultDnsCnameCache; -import io.netty.resolver.dns.DnsNameResolver; import io.netty.resolver.dns.DnsNameResolverBuilder; import io.netty.resolver.dns.NameServerComparator; import io.netty.resolver.dns.NoopAuthoritativeDnsServerCache; @@ -75,6 +74,7 @@ import java.util.List; import java.util.Map; import java.util.RandomAccess; +import java.util.function.Function; import java.util.function.IntFunction; import javax.annotation.Nullable; @@ -125,7 +125,7 @@ final class DefaultDnsClient implements DnsClient { private static final Cancellable TERMINATED = () -> { }; private final EventLoopAwareNettyIoExecutor nettyIoExecutor; - private final DnsNameResolver resolver; + private final HedgingDnsNameResolver resolver; private final MinTtlCache ttlCache; private final long maxTTLNanos; private final long ttlJitterNanos; @@ -221,7 +221,7 @@ final class DefaultDnsClient implements DnsClient { if (dnsServerAddressStreamProvider != null) { builder.nameServerProvider(toNettyType(dnsServerAddressStreamProvider)); } - resolver = builder.build(); + resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor.eventLoopGroup().next()); } @Override diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java similarity index 69% rename from servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsClient.java rename to servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java index 7e9bff4222..46265f49f1 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java @@ -1,59 +1,80 @@ package io.servicetalk.dns.discovery.netty; import io.netty.channel.EventLoop; +import io.netty.handler.codec.dns.DnsQuestion; +import io.netty.handler.codec.dns.DnsRecord; +import io.netty.resolver.dns.DnsNameResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import java.io.Closeable; +import java.net.InetAddress; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; import static java.lang.Math.max; import static java.lang.Math.min; -final class HedgingWrapper implements Function> { +final class HedgingDnsNameResolver implements Closeable { + private final DnsNameResolver delegate; private final EventLoop eventLoop; - private final Function> computation; private final PercentileTracker percentile; private final Budget budget; - HedgingWrapper(EventLoop eventLoop, Function> computation) { - this(eventLoop, computation, defaultTracker(), defaultBudget()); + HedgingDnsNameResolver(DnsNameResolver delegate, EventLoop eventLoop) { + this(delegate, eventLoop, defaultTracker(), defaultBudget()); } - HedgingWrapper(EventLoop eventLoop, Function> computation, PercentileTracker percentile, + HedgingDnsNameResolver(DnsNameResolver delegate, EventLoop eventLoop, PercentileTracker percentile, Budget budget) { + this.delegate = delegate; this.eventLoop = eventLoop; - this.computation = computation; this.percentile = percentile; this.budget = budget; } + public Future> resolveAll(DnsQuestion t) { + return applyHedge(delegate::resolveAll, t); + } + + public Future> resolveAll(String t) { + return applyHedge(delegate::resolveAll, t); + } + @Override - public Future apply(T t) { - return doApply(t, eventLoop.newPromise()); + public void close() { + delegate.close(); } - private Future doApply(T t, Promise promise) { - // Only add tokens for organic requests, not retries. + private Future applyHedge(Function> computation, T t) { + // Only add tokens for organic requests and not retries. budget.deposit(); Future underlyingResult = computation.apply(t); final long startTime = System.currentTimeMillis(); - final long deadline = startTime + percentile.getValue(); - Future hedgeTimer = eventLoop.schedule(() -> maybeApplyHedge(t, underlyingResult, promise), - deadline, TimeUnit.MILLISECONDS); - underlyingResult.addListener(completedFuture -> { - measureRequest(System.currentTimeMillis() - startTime, completedFuture.isSuccess()); - if (complete(underlyingResult, promise)) { - hedgeTimer.cancel(true); - } - }); - - return promise; + final long deadline = addWithOverflowProtection(startTime, percentile.getValue()); + if (deadline == Long.MAX_VALUE) { + // no need to attempt a hedge that will wait that long: just return the value. + return underlyingResult; + } else { + Promise promise = eventLoop.newPromise(); + Future hedgeTimer = eventLoop.schedule(() -> maybeApplyHedge(computation, t, underlyingResult, promise), + deadline, TimeUnit.MILLISECONDS); + underlyingResult.addListener(completedFuture -> { + measureRequest(System.currentTimeMillis() - startTime, completedFuture.isSuccess()); + if (complete(underlyingResult, promise)) { + hedgeTimer.cancel(true); + } + }); + return promise; + } } - private void maybeApplyHedge(T t, Future original, Promise promise) { + private void maybeApplyHedge( + Function> computation, T t, Future original, Promise promise) { if (budget.withdraw() && !original.isDone()) { Future backupResult = computation.apply(t); backupResult.addListener(done -> { @@ -71,7 +92,7 @@ private void measureRequest(long durationMs, boolean succeeded) { } } - private boolean complete(Future f, Promise p) { + private boolean complete(Future f, Promise p) { assert f.isDone(); if (f.isSuccess()) { return p.trySuccess(f.getNow()); @@ -108,7 +129,7 @@ private static final class DefaultBudgetImpl implements Budget { this.depositAmount = depositAmount; this.withDrawAmount = withDrawAmount; this.maxTokens = maxTokens; - initialTokens = initialTokens; + this.tokens = initialTokens; } @@ -186,7 +207,7 @@ private long compute() { buckets[i] = 0; } assert result != -1; // we should have found a bucket. - return result; + return max(1, result); } private long bucketToValue(int bucket) { @@ -203,6 +224,7 @@ private static PercentileTracker defaultTracker() { } private static Budget defaultBudget() { + // 5% extra load and a max burst of 5 hedges. return new DefaultBudgetImpl(1, 20, 100); } } From 8f10df58f4c1211d86d377451bcbc7bb4936830d Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 9 May 2024 17:17:36 -0600 Subject: [PATCH 3/9] Use the io executor as the time source for easy testing --- .../dns/discovery/netty/DefaultDnsClient.java | 2 +- .../netty/HedgingDnsNameResolver.java | 40 ++++++++++++++++--- 2 files changed, 35 insertions(+), 7 deletions(-) diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index 2ac47fe10b..678f1d6a8b 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -221,7 +221,7 @@ final class DefaultDnsClient implements DnsClient { if (dnsServerAddressStreamProvider != null) { builder.nameServerProvider(toNettyType(dnsServerAddressStreamProvider)); } - resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor.eventLoopGroup().next()); + resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor, nettyIoExecutor.eventLoopGroup().next()); } @Override diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java index 46265f49f1..b300a99340 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java @@ -6,6 +6,8 @@ import io.netty.resolver.dns.DnsNameResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import io.servicetalk.concurrent.api.DelegatingExecutor; +import io.servicetalk.concurrent.api.Executor; import java.io.Closeable; import java.net.InetAddress; @@ -17,21 +19,26 @@ import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; import static java.lang.Math.max; import static java.lang.Math.min; +import static java.util.concurrent.TimeUnit.NANOSECONDS; final class HedgingDnsNameResolver implements Closeable { private final DnsNameResolver delegate; + private final Executor executor; + private final EventLoop eventLoop; private final PercentileTracker percentile; private final Budget budget; - HedgingDnsNameResolver(DnsNameResolver delegate, EventLoop eventLoop) { - this(delegate, eventLoop, defaultTracker(), defaultBudget()); + HedgingDnsNameResolver(DnsNameResolver delegate, Executor executor, EventLoop eventLoop) { + this(delegate, executor, eventLoop, defaultTracker(), defaultBudget()); } - HedgingDnsNameResolver(DnsNameResolver delegate, EventLoop eventLoop, PercentileTracker percentile, - Budget budget) { + HedgingDnsNameResolver(DnsNameResolver delegate, Executor executor, EventLoop eventLoop, + PercentileTracker percentile, Budget budget) { this.delegate = delegate; + this.executor = executor instanceof NormalizedTimeSourceExecutor ? + executor : new NormalizedTimeSourceExecutor(executor); this.eventLoop = eventLoop; this.percentile = percentile; this.budget = budget; @@ -50,11 +57,15 @@ public void close() { delegate.close(); } + private long currentTimeMillis() { + return executor.currentTime(TimeUnit.MILLISECONDS); + } + private Future applyHedge(Function> computation, T t) { // Only add tokens for organic requests and not retries. budget.deposit(); Future underlyingResult = computation.apply(t); - final long startTime = System.currentTimeMillis(); + final long startTime = currentTimeMillis(); final long deadline = addWithOverflowProtection(startTime, percentile.getValue()); if (deadline == Long.MAX_VALUE) { // no need to attempt a hedge that will wait that long: just return the value. @@ -64,7 +75,7 @@ private Future applyHedge(Function> computation, T t) { Future hedgeTimer = eventLoop.schedule(() -> maybeApplyHedge(computation, t, underlyingResult, promise), deadline, TimeUnit.MILLISECONDS); underlyingResult.addListener(completedFuture -> { - measureRequest(System.currentTimeMillis() - startTime, completedFuture.isSuccess()); + measureRequest(currentTimeMillis() - startTime, completedFuture.isSuccess()); if (complete(underlyingResult, promise)) { hedgeTimer.cancel(true); } @@ -227,4 +238,21 @@ private static Budget defaultBudget() { // 5% extra load and a max burst of 5 hedges. return new DefaultBudgetImpl(1, 20, 100); } + + // TODO: copied from servicetalk-loadbalancer. + private static final class NormalizedTimeSourceExecutor extends DelegatingExecutor { + + private final long offsetNanos; + + NormalizedTimeSourceExecutor(final Executor delegate) { + super(delegate); + offsetNanos = delegate.currentTime(NANOSECONDS); + } + + @Override + public long currentTime(final TimeUnit unit) { + final long elapsedNanos = delegate().currentTime(NANOSECONDS) - offsetNanos; + return unit.convert(elapsedNanos, NANOSECONDS); + } + } } From 06c90371663154c317e315f46f39b3da6e3f3400 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 10 May 2024 09:27:50 -0600 Subject: [PATCH 4/9] Cleanup; --- .../dns/discovery/netty/DefaultDnsClient.java | 4 +- .../netty/HedgingDnsNameResolver.java | 71 ++++++++++++++++--- 2 files changed, 63 insertions(+), 12 deletions(-) diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index 678f1d6a8b..8a9d25593a 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -74,7 +74,6 @@ import java.util.List; import java.util.Map; import java.util.RandomAccess; -import java.util.function.Function; import java.util.function.IntFunction; import javax.annotation.Nullable; @@ -221,7 +220,8 @@ final class DefaultDnsClient implements DnsClient { if (dnsServerAddressStreamProvider != null) { builder.nameServerProvider(toNettyType(dnsServerAddressStreamProvider)); } - resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor, nettyIoExecutor.eventLoopGroup().next()); + resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor, + nettyIoExecutor.eventLoopGroup().next()); } @Override diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java index b300a99340..5717b773aa 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java @@ -1,13 +1,29 @@ +/* + * Copyright © 2024 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package io.servicetalk.dns.discovery.netty; +import io.servicetalk.concurrent.api.DelegatingExecutor; +import io.servicetalk.concurrent.api.Executor; + import io.netty.channel.EventLoop; import io.netty.handler.codec.dns.DnsQuestion; import io.netty.handler.codec.dns.DnsRecord; import io.netty.resolver.dns.DnsNameResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; -import io.servicetalk.concurrent.api.DelegatingExecutor; -import io.servicetalk.concurrent.api.Executor; import java.io.Closeable; import java.net.InetAddress; @@ -23,7 +39,7 @@ final class HedgingDnsNameResolver implements Closeable { - private final DnsNameResolver delegate; + private final DnsResolverIface delegate; private final Executor executor; private final EventLoop eventLoop; @@ -31,10 +47,14 @@ final class HedgingDnsNameResolver implements Closeable { private final Budget budget; HedgingDnsNameResolver(DnsNameResolver delegate, Executor executor, EventLoop eventLoop) { + this(new NettyDnsNameResolver(delegate), executor, eventLoop); + } + + HedgingDnsNameResolver(DnsResolverIface delegate, Executor executor, EventLoop eventLoop) { this(delegate, executor, eventLoop, defaultTracker(), defaultBudget()); } - HedgingDnsNameResolver(DnsNameResolver delegate, Executor executor, EventLoop eventLoop, + HedgingDnsNameResolver(DnsResolverIface delegate, Executor executor, EventLoop eventLoop, PercentileTracker percentile, Budget budget) { this.delegate = delegate; this.executor = executor instanceof NormalizedTimeSourceExecutor ? @@ -68,7 +88,7 @@ private Future applyHedge(Function> computation, T t) { final long startTime = currentTimeMillis(); final long deadline = addWithOverflowProtection(startTime, percentile.getValue()); if (deadline == Long.MAX_VALUE) { - // no need to attempt a hedge that will wait that long: just return the value. + // basically forever: just return the value. return underlyingResult; } else { Promise promise = eventLoop.newPromise(); @@ -124,7 +144,7 @@ private interface Budget { boolean withdraw(); } - // TODO: both these implementations rely on access being serialized by the netty event loop. + // TODO: both these implementations are un-synchronized and rely on netty using only a single event loop. private static final class DefaultBudgetImpl implements Budget { private final int depositAmount; @@ -143,7 +163,6 @@ private static final class DefaultBudgetImpl implements Budget { this.tokens = initialTokens; } - @Override public void deposit() { tokens = max(maxTokens, tokens + depositAmount); @@ -170,7 +189,7 @@ private static final class DefaultPercentileTracker implements PercentileTracker private long lastValue; private int sampleCount; - public DefaultPercentileTracker(int buckets, double percentile, int sampleThreshold) { + DefaultPercentileTracker(int buckets, double percentile, int sampleThreshold) { if (percentile < 0 || percentile > 1) { throw new IllegalArgumentException("Unexpected percentile value: " + percentile); } @@ -194,18 +213,18 @@ public long getValue() { return lastValue; } - private void maybeSwap() { if (shouldSwap()) { lastValue = compute(); } } + private boolean shouldSwap() { return sampleCount >= sampleThreshold; } private long compute() { - long targetCount = (long)(sampleCount * percentile); + long targetCount = (long) (sampleCount * percentile); sampleCount = 0; long result = -1; for (int i = 0; i < buckets.length; i++) { @@ -255,4 +274,36 @@ public long currentTime(final TimeUnit unit) { return unit.convert(elapsedNanos, NANOSECONDS); } } + + interface DnsResolverIface extends Closeable { + Future> resolveAll(DnsQuestion t); + + Future> resolveAll(String t); + + @Override + void close(); + } + + private static final class NettyDnsNameResolver implements DnsResolverIface { + private final DnsNameResolver resolver; + + NettyDnsNameResolver(final DnsNameResolver resolver) { + this.resolver = resolver; + } + + @Override + public Future> resolveAll(DnsQuestion t) { + return resolver.resolveAll(t); + } + + @Override + public Future> resolveAll(String t) { + return resolver.resolveAll(t); + } + + @Override + public void close() { + resolver.close(); + } + } } From e59eed2236e8c1fd7608cdcdfe0a6ffdfd99aa0c Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 10 May 2024 11:03:48 -0600 Subject: [PATCH 5/9] Add some tests --- .../dns/discovery/netty/DefaultDnsClient.java | 3 +- .../netty/HedgingDnsNameResolver.java | 59 +++---- .../discovery/netty/DefaultDnsClientTest.java | 2 +- .../netty/HedgingDnsNameResolverTest.java | 148 ++++++++++++++++++ 4 files changed, 181 insertions(+), 31 deletions(-) create mode 100644 servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolverTest.java diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index 8a9d25593a..fb870a1b09 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -220,8 +220,7 @@ final class DefaultDnsClient implements DnsClient { if (dnsServerAddressStreamProvider != null) { builder.nameServerProvider(toNettyType(dnsServerAddressStreamProvider)); } - resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor, - nettyIoExecutor.eventLoopGroup().next()); + resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor); } @Override diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java index 5717b773aa..0c913c6823 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java @@ -15,15 +15,17 @@ */ package io.servicetalk.dns.discovery.netty; +import io.servicetalk.concurrent.Cancellable; import io.servicetalk.concurrent.api.DelegatingExecutor; import io.servicetalk.concurrent.api.Executor; -import io.netty.channel.EventLoop; import io.netty.handler.codec.dns.DnsQuestion; import io.netty.handler.codec.dns.DnsRecord; import io.netty.resolver.dns.DnsNameResolver; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Promise; +import io.servicetalk.transport.api.IoExecutor; +import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutor; import java.io.Closeable; import java.net.InetAddress; @@ -32,6 +34,7 @@ import java.util.function.Function; import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; +import static io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor; import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; import static java.lang.Math.max; import static java.lang.Math.min; @@ -40,36 +43,34 @@ final class HedgingDnsNameResolver implements Closeable { private final DnsResolverIface delegate; - private final Executor executor; + private final EventLoopAwareNettyIoExecutor executor; - private final EventLoop eventLoop; +// private final EventLoop eventLoop; private final PercentileTracker percentile; private final Budget budget; - HedgingDnsNameResolver(DnsNameResolver delegate, Executor executor, EventLoop eventLoop) { - this(new NettyDnsNameResolver(delegate), executor, eventLoop); + HedgingDnsNameResolver(DnsNameResolver delegate, IoExecutor executor) { + this(new NettyDnsNameResolver(delegate), executor); } - HedgingDnsNameResolver(DnsResolverIface delegate, Executor executor, EventLoop eventLoop) { - this(delegate, executor, eventLoop, defaultTracker(), defaultBudget()); + HedgingDnsNameResolver(DnsResolverIface delegate, IoExecutor executor) { + this(delegate, executor, defaultTracker(), defaultBudget()); } - HedgingDnsNameResolver(DnsResolverIface delegate, Executor executor, EventLoop eventLoop, + HedgingDnsNameResolver(DnsResolverIface delegate, IoExecutor executor, PercentileTracker percentile, Budget budget) { this.delegate = delegate; - this.executor = executor instanceof NormalizedTimeSourceExecutor ? - executor : new NormalizedTimeSourceExecutor(executor); - this.eventLoop = eventLoop; + this.executor = toEventLoopAwareNettyIoExecutor(executor).next(); this.percentile = percentile; this.budget = budget; } public Future> resolveAll(DnsQuestion t) { - return applyHedge(delegate::resolveAll, t); + return setupHedge(delegate::resolveAllQuestion, t); } public Future> resolveAll(String t) { - return applyHedge(delegate::resolveAll, t); + return setupHedge(delegate::resolveAll, t); } @Override @@ -81,36 +82,38 @@ private long currentTimeMillis() { return executor.currentTime(TimeUnit.MILLISECONDS); } - private Future applyHedge(Function> computation, T t) { + private Future setupHedge(Function> computation, T t) { // Only add tokens for organic requests and not retries. budget.deposit(); Future underlyingResult = computation.apply(t); - final long startTime = currentTimeMillis(); - final long deadline = addWithOverflowProtection(startTime, percentile.getValue()); - if (deadline == Long.MAX_VALUE) { + final long delay = percentile.getValue(); + if (delay == Long.MAX_VALUE) { // basically forever: just return the value. return underlyingResult; } else { - Promise promise = eventLoop.newPromise(); - Future hedgeTimer = eventLoop.schedule(() -> maybeApplyHedge(computation, t, underlyingResult, promise), - deadline, TimeUnit.MILLISECONDS); + final long startTimeMs = currentTimeMillis(); + Promise promise = executor.eventLoopGroup().next().newPromise(); + Cancellable hedgeTimer = executor.schedule(() -> tryHedge(computation, t, underlyingResult, promise), + delay, TimeUnit.MILLISECONDS); underlyingResult.addListener(completedFuture -> { - measureRequest(currentTimeMillis() - startTime, completedFuture.isSuccess()); + measureRequest(currentTimeMillis() - startTimeMs, completedFuture.isSuccess()); if (complete(underlyingResult, promise)) { - hedgeTimer.cancel(true); + hedgeTimer.cancel(); } }); return promise; } } - private void maybeApplyHedge( + private void tryHedge( Function> computation, T t, Future original, Promise promise) { - if (budget.withdraw() && !original.isDone()) { + if (!original.isDone() && budget.withdraw()) { Future backupResult = computation.apply(t); + final long startTime = currentTimeMillis(); backupResult.addListener(done -> { if (complete(backupResult, promise)) { original.cancel(true); + measureRequest(currentTimeMillis() - startTime, done.isSuccess()); } }); promise.addListener(complete -> backupResult.cancel(true)); @@ -132,13 +135,13 @@ private boolean complete(Future f, Promise p) { } } - private interface PercentileTracker { + interface PercentileTracker { void addSample(long sample); long getValue(); } - private interface Budget { + interface Budget { void deposit(); boolean withdraw(); @@ -276,7 +279,7 @@ public long currentTime(final TimeUnit unit) { } interface DnsResolverIface extends Closeable { - Future> resolveAll(DnsQuestion t); + Future> resolveAllQuestion(DnsQuestion t); Future> resolveAll(String t); @@ -292,7 +295,7 @@ private static final class NettyDnsNameResolver implements DnsResolverIface { } @Override - public Future> resolveAll(DnsQuestion t) { + public Future> resolveAllQuestion(DnsQuestion t) { return resolver.resolveAll(t); } diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java index 8b50d95d28..f09d96806d 100644 --- a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java @@ -1307,7 +1307,7 @@ private static void assertHasEvent(Collection timerExecutor = ExecutorExtension.withTestExecutor() + .setClassLevel(true); + + @RegisterExtension + static final ExecutorExtension ioExecutor = ExecutorExtension + .withExecutor(() -> createIoExecutor(1)) + .setClassLevel(true); + + HedgingDnsNameResolver.PercentileTracker percentileTracker; + HedgingDnsNameResolver.Budget budget; + + HedgingDnsNameResolver.DnsResolverIface underlying; + HedgingDnsNameResolver resolver; + + void setup() { + if (percentileTracker == null) { + percentileTracker = mock(HedgingDnsNameResolver.PercentileTracker.class); + when(percentileTracker.getValue()).thenReturn(10L); + } + if (budget == null) { + budget = mock(HedgingDnsNameResolver.Budget.class); + when(budget.withdraw()).thenReturn(true); + } + underlying = mock(HedgingDnsNameResolver.DnsResolverIface.class); + // DnsResolverIface delegate, Executor executor, EventLoop eventLoop, + // PercentileTracker percentile, Budget budget + resolver = new HedgingDnsNameResolver(underlying, + new DefaultDnsClientTest.NettyIoExecutorWithTestTimer(ioExecutor.executor(), timerExecutor.executor()), + percentileTracker, budget); + } + + @Test + void requestThatDoesntNeedHedge() throws Exception { + setup(); + Promise> p1 = newPromise(); + when(underlying.resolveAll(any())).thenReturn(p1, null); + Future> results = resolver.resolveAll("apple.com"); + assertThat(results.isDone(), equalTo(false)); + advanceTime(1); + List result = new ArrayList<>(); + p1.trySuccess(result); + assertThat(results.get(), equalTo(result)); + verify(budget).deposit(); + verify(budget, never()).withdraw(); + verify(percentileTracker).addSample(1); + } + + @Test + void requestWithHedgingAndFirstWins() throws Exception { + setup(); + Promise> p1 = newPromise(); + Promise> p2 = newPromise(); + when(underlying.resolveAll(any())).thenReturn(p1, p2); + Future> results = resolver.resolveAll("apple.com"); + assertThat(results.isDone(), equalTo(false)); + advanceTime(10); + List result = new ArrayList<>(); + p1.trySuccess(result); + assertThat(results.get(), equalTo(result)); + verify(budget).deposit(); + verify(budget, times(1)).withdraw(); + verify(percentileTracker).addSample(10); + assertThat(p2.isCancelled(), equalTo(true)); + } + + @Test + void requestWithHedgingAndSecondWins() throws Exception { + setup(); + Promise> p1 = newPromise(); + Promise> p2 = newPromise(); + when(underlying.resolveAll(any())).thenReturn(p1, p2); + + Future> results = resolver.resolveAll("apple.com"); + assertThat(results.isDone(), equalTo(false)); + advanceTime(10); + + // Hedging should have started and the new timer should be set. + advanceTime(5); + List result = new ArrayList<>(); + p2.trySuccess(result); + assertThat(results.get(), equalTo(result)); + verify(budget).deposit(); + verify(budget, times(1)).withdraw(); + verify(percentileTracker).addSample(5); // only add the successful sample. + assertThat(p1.isCancelled(), equalTo(true)); + } + + @Test + void requestWhenNoHedgingBudget() throws Exception { + setup(); + when(budget.withdraw()).thenReturn(false); + Promise> p1 = newPromise(); + when(underlying.resolveAll(any())).thenReturn(p1); + + Future> results = resolver.resolveAll("apple.com"); + assertThat(results.isDone(), equalTo(false)); + advanceTime(10); + + verify(budget).deposit(); + verify(budget, times(1)).withdraw(); + verify(underlying, times(1)).resolveAll("apple.com"); + } + + private Promise newPromise() { + return ioExecutor.executor().eventLoopGroup().next().newPromise(); + } + + private static void advanceTime(int advance) throws Exception { + // To make sure that the time is advanced after all prior work on the EvenLoop is complete, we advance it from + // the EventLoop too. + ioExecutor.executor().submit(() -> { + LOGGER.debug("Advance time by {}s.", advance); + timerExecutor.executor().advanceTimeBy(advance, MILLISECONDS); + }).toFuture().get(); + } +} From e7b7fff6b37f86a56bb81b4facbb8cbd40ce187f Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 10 May 2024 14:03:27 -0600 Subject: [PATCH 6/9] Cleanup and make it easier to indirect --- .../dns/discovery/netty/DefaultDnsClient.java | 12 ++- .../netty/HedgingDnsNameResolver.java | 78 ++++--------------- .../netty/UnderlyingDnsResolver.java | 43 ++++++++++ .../netty/HedgingDnsNameResolverTest.java | 4 +- 4 files changed, 67 insertions(+), 70 deletions(-) create mode 100644 servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/UnderlyingDnsResolver.java diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index fb870a1b09..c6e833f431 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -124,7 +124,7 @@ final class DefaultDnsClient implements DnsClient { private static final Cancellable TERMINATED = () -> { }; private final EventLoopAwareNettyIoExecutor nettyIoExecutor; - private final HedgingDnsNameResolver resolver; + private final UnderlyingDnsResolver resolver; private final MinTtlCache ttlCache; private final long maxTTLNanos; private final long ttlJitterNanos; @@ -220,7 +220,13 @@ final class DefaultDnsClient implements DnsClient { if (dnsServerAddressStreamProvider != null) { builder.nameServerProvider(toNettyType(dnsServerAddressStreamProvider)); } - resolver = new HedgingDnsNameResolver(builder.build(), nettyIoExecutor); + if (true /* hedging enabled */) { // need to wire this in. + DnsNameResolverBuilderUtils.consolidateCacheSize(id, builder, 0); + resolver = new HedgingDnsNameResolver( + new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()), nettyIoExecutor); + } else { + resolver = new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()); + } } @Override @@ -424,7 +430,7 @@ protected AbstractDnsSubscription newSubscription( @Override protected Future> doDnsQuery(final boolean scheduledQuery) { Promise> promise = nettyIoExecutor.eventLoopGroup().next().newPromise(); - resolver.resolveAll(new DefaultDnsQuestion(name, SRV)) + resolver.resolveAllQuestion(new DefaultDnsQuestion(name, SRV)) .addListener((Future> completedFuture) -> { Throwable cause = completedFuture.cause(); if (cause != null) { diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java index 0c913c6823..078e0caa12 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java @@ -16,8 +16,6 @@ package io.servicetalk.dns.discovery.netty; import io.servicetalk.concurrent.Cancellable; -import io.servicetalk.concurrent.api.DelegatingExecutor; -import io.servicetalk.concurrent.api.Executor; import io.netty.handler.codec.dns.DnsQuestion; import io.netty.handler.codec.dns.DnsRecord; @@ -27,25 +25,20 @@ import io.servicetalk.transport.api.IoExecutor; import io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutor; -import java.io.Closeable; import java.net.InetAddress; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Function; -import static io.servicetalk.concurrent.internal.FlowControlUtils.addWithOverflowProtection; import static io.servicetalk.transport.netty.internal.EventLoopAwareNettyIoExecutors.toEventLoopAwareNettyIoExecutor; import static io.servicetalk.utils.internal.NumberUtils.ensurePositive; import static java.lang.Math.max; import static java.lang.Math.min; -import static java.util.concurrent.TimeUnit.NANOSECONDS; -final class HedgingDnsNameResolver implements Closeable { +final class HedgingDnsNameResolver implements UnderlyingDnsResolver { - private final DnsResolverIface delegate; + private final UnderlyingDnsResolver delegate; private final EventLoopAwareNettyIoExecutor executor; - -// private final EventLoop eventLoop; private final PercentileTracker percentile; private final Budget budget; @@ -53,11 +46,11 @@ final class HedgingDnsNameResolver implements Closeable { this(new NettyDnsNameResolver(delegate), executor); } - HedgingDnsNameResolver(DnsResolverIface delegate, IoExecutor executor) { + HedgingDnsNameResolver(UnderlyingDnsResolver delegate, IoExecutor executor) { this(delegate, executor, defaultTracker(), defaultBudget()); } - HedgingDnsNameResolver(DnsResolverIface delegate, IoExecutor executor, + HedgingDnsNameResolver(UnderlyingDnsResolver delegate, IoExecutor executor, PercentileTracker percentile, Budget budget) { this.delegate = delegate; this.executor = toEventLoopAwareNettyIoExecutor(executor).next(); @@ -65,10 +58,12 @@ final class HedgingDnsNameResolver implements Closeable { this.budget = budget; } - public Future> resolveAll(DnsQuestion t) { + @Override + public Future> resolveAllQuestion(DnsQuestion t) { return setupHedge(delegate::resolveAllQuestion, t); } + @Override public Future> resolveAll(String t) { return setupHedge(delegate::resolveAll, t); } @@ -96,7 +91,7 @@ private Future setupHedge(Function> computation, T t) { Cancellable hedgeTimer = executor.schedule(() -> tryHedge(computation, t, underlyingResult, promise), delay, TimeUnit.MILLISECONDS); underlyingResult.addListener(completedFuture -> { - measureRequest(currentTimeMillis() - startTimeMs, completedFuture.isSuccess()); + measureRequest(currentTimeMillis() - startTimeMs, completedFuture); if (complete(underlyingResult, promise)) { hedgeTimer.cancel(); } @@ -113,15 +108,17 @@ private void tryHedge( backupResult.addListener(done -> { if (complete(backupResult, promise)) { original.cancel(true); - measureRequest(currentTimeMillis() - startTime, done.isSuccess()); + measureRequest(currentTimeMillis() - startTime, done); } }); promise.addListener(complete -> backupResult.cancel(true)); } } - private void measureRequest(long durationMs, boolean succeeded) { - if (succeeded) { + private void measureRequest(long durationMs, Future future) { + // Cancelled responses don't count but we do consider failed responses because failure + // is a legitimate response. + if (!future.isCancelled()) { percentile.addSample(durationMs); } } @@ -260,53 +257,4 @@ private static Budget defaultBudget() { // 5% extra load and a max burst of 5 hedges. return new DefaultBudgetImpl(1, 20, 100); } - - // TODO: copied from servicetalk-loadbalancer. - private static final class NormalizedTimeSourceExecutor extends DelegatingExecutor { - - private final long offsetNanos; - - NormalizedTimeSourceExecutor(final Executor delegate) { - super(delegate); - offsetNanos = delegate.currentTime(NANOSECONDS); - } - - @Override - public long currentTime(final TimeUnit unit) { - final long elapsedNanos = delegate().currentTime(NANOSECONDS) - offsetNanos; - return unit.convert(elapsedNanos, NANOSECONDS); - } - } - - interface DnsResolverIface extends Closeable { - Future> resolveAllQuestion(DnsQuestion t); - - Future> resolveAll(String t); - - @Override - void close(); - } - - private static final class NettyDnsNameResolver implements DnsResolverIface { - private final DnsNameResolver resolver; - - NettyDnsNameResolver(final DnsNameResolver resolver) { - this.resolver = resolver; - } - - @Override - public Future> resolveAllQuestion(DnsQuestion t) { - return resolver.resolveAll(t); - } - - @Override - public Future> resolveAll(String t) { - return resolver.resolveAll(t); - } - - @Override - public void close() { - resolver.close(); - } - } } diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/UnderlyingDnsResolver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/UnderlyingDnsResolver.java new file mode 100644 index 0000000000..0115017db0 --- /dev/null +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/UnderlyingDnsResolver.java @@ -0,0 +1,43 @@ +package io.servicetalk.dns.discovery.netty; + +import io.netty.handler.codec.dns.DnsQuestion; +import io.netty.handler.codec.dns.DnsRecord; +import io.netty.resolver.dns.DnsNameResolver; +import io.netty.util.concurrent.Future; + +import java.io.Closeable; +import java.net.InetAddress; +import java.util.List; + +interface UnderlyingDnsResolver extends Closeable { + + Future> resolveAllQuestion(DnsQuestion t); + + Future> resolveAll(String t); + + @Override + void close(); + + static final class NettyDnsNameResolver implements UnderlyingDnsResolver { + private final DnsNameResolver resolver; + + NettyDnsNameResolver(final DnsNameResolver resolver) { + this.resolver = resolver; + } + + @Override + public Future> resolveAllQuestion(DnsQuestion t) { + return resolver.resolveAll(t); + } + + @Override + public Future> resolveAll(String t) { + return resolver.resolveAll(t); + } + + @Override + public void close() { + resolver.close(); + } + } +} diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolverTest.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolverTest.java index c69207192f..2f3b5645e8 100644 --- a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolverTest.java +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolverTest.java @@ -41,7 +41,7 @@ class HedgingDnsNameResolverTest { HedgingDnsNameResolver.PercentileTracker percentileTracker; HedgingDnsNameResolver.Budget budget; - HedgingDnsNameResolver.DnsResolverIface underlying; + UnderlyingDnsResolver underlying; HedgingDnsNameResolver resolver; void setup() { @@ -53,7 +53,7 @@ void setup() { budget = mock(HedgingDnsNameResolver.Budget.class); when(budget.withdraw()).thenReturn(true); } - underlying = mock(HedgingDnsNameResolver.DnsResolverIface.class); + underlying = mock(UnderlyingDnsResolver.class); // DnsResolverIface delegate, Executor executor, EventLoop eventLoop, // PercentileTracker percentile, Budget budget resolver = new HedgingDnsNameResolver(underlying, From 32e6ead3a57f081ae7f1983dc8dad05d267bd810 Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Fri, 10 May 2024 15:41:14 -0600 Subject: [PATCH 7/9] Start hacking together tests --- .../dns/discovery/netty/DefaultDnsClient.java | 5 +- .../netty/HedgingDnsNameResolver.java | 29 +++++++++++ .../discovery/netty/DefaultDnsClientTest.java | 48 +++++++++++++++++++ .../dns/discovery/netty/TestRecordStore.java | 34 +++++++++++-- 4 files changed, 111 insertions(+), 5 deletions(-) diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index c6e833f431..11f65d0364 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -223,7 +223,10 @@ final class DefaultDnsClient implements DnsClient { if (true /* hedging enabled */) { // need to wire this in. DnsNameResolverBuilderUtils.consolidateCacheSize(id, builder, 0); resolver = new HedgingDnsNameResolver( - new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()), nettyIoExecutor); +// new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()), nettyIoExecutor); + // TODO: this is just for hacking together tests. + new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()), nettyIoExecutor, + HedgingDnsNameResolver.constantTracker(100), HedgingDnsNameResolver.alwaysBudget()); } else { resolver = new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()); } diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java index 078e0caa12..a79036a723 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java @@ -103,6 +103,7 @@ private Future setupHedge(Function> computation, T t) { private void tryHedge( Function> computation, T t, Future original, Promise promise) { if (!original.isDone() && budget.withdraw()) { + System.out.println("" + System.currentTimeMillis() + ": sending backup request."); Future backupResult = computation.apply(t); final long startTime = currentTimeMillis(); backupResult.addListener(done -> { @@ -257,4 +258,32 @@ private static Budget defaultBudget() { // 5% extra load and a max burst of 5 hedges. return new DefaultBudgetImpl(1, 20, 100); } + + static PercentileTracker constantTracker(int value) { + return new PercentileTracker() { + @Override + public void addSample(long sample) { + // noop + } + + @Override + public long getValue() { + return value; + } + }; + } + + static Budget alwaysBudget() { + return new Budget() { + @Override + public void deposit() { + // noop + } + + @Override + public boolean withdraw() { + return true; + } + }; + } } diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java index f09d96806d..ee80f361ab 100644 --- a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java @@ -43,6 +43,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.net.DatagramPacket; +import java.net.DatagramSocket; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; @@ -170,6 +173,51 @@ static Stream missingRecordStatus() { return Stream.of(ServiceDiscovererEvent.Status.EXPIRED, ServiceDiscovererEvent.Status.UNAVAILABLE); } + @Test + void hedging() throws Exception { + // should be bound now. + DatagramSocket datagramSocket = new DatagramSocket(new InetSocketAddress("127.0.0.1", 5657)); + Thread t = new Thread(() -> { + while (true) { + byte[] buf = new byte[2048]; + DatagramPacket packet = new DatagramPacket(buf, buf.length); + try { + datagramSocket.receive(packet); + } catch (IOException ex) { + System.out.println("Exception: " + ex); + return; + } + String packetStr = new String(buf, 0, packet.getLength()); + System.out.println("" + System.currentTimeMillis() + ": Received packet- " + packetStr); + packet.getLength(); + } + }); + t.start(); + + setup(builder -> builder.dnsServerAddressStreamProvider( + new SingletonDnsServerAddressStreamProvider((InetSocketAddress) datagramSocket.getLocalSocketAddress()))); +// recordStore.addStall("unknown.com", latch); + + final String targetDomain1 = "sd.domain.com"; + final String ip1 = nextIp(); + + recordStore.addIPv4Address(targetDomain1, DEFAULT_TTL, ip1); + + TestPublisherSubscriber> subscriber = dnsQuery(targetDomain1); + Subscription subscription = subscriber.awaitSubscription(); + subscription.request(Long.MAX_VALUE); + + Thread.sleep(100); // just add an actual delay so our println messages don't stack atop one another. + advanceTime(); + + List> signals = subscriber.takeOnNext(1); + assertHasEvent(signals, ip1, AVAILABLE); + + + t.interrupt(); + } + + @Test void singleSrvSingleADiscover() throws Exception { setup(); diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java index 0cdff5c0c6..dbe14047cc 100644 --- a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/TestRecordStore.java @@ -34,6 +34,7 @@ import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; import javax.annotation.Nullable; import static java.util.Collections.emptySet; @@ -90,6 +91,11 @@ public int hashCode() { } private final Set failSet = new HashSet<>(); + private final Map stalledRecords = new HashMap<>(); + + public synchronized void addStall(final String dnsRecordName, CountDownLatch latch) { + stalledRecords.put(dnsRecordName, latch); + } public synchronized void addFail(final ServFail fail) { failSet.add(fail); @@ -220,12 +226,32 @@ private boolean removeRecords(ResourceRecord rr, List recordList @Nullable @Override - public synchronized Set getRecords(final QuestionRecord questionRecord) throws DnsException { + public Set getRecords(final QuestionRecord questionRecord) throws DnsException { final String domain = questionRecord.getDomainName(); - if (failSet.contains(ServFail.of(questionRecord))) { - throw new DnsException(SERVER_FAILURE); + + // TODO: the blocking doesn't work as expected because we can't get any more messages through for the + // backup request. + final CountDownLatch latch; + synchronized (this) { + latch = stalledRecords.remove(domain); + } + if (latch != null) { + try { + latch.await(); + } catch (InterruptedException cause) { + DnsException ex = new DnsException(SERVER_FAILURE); + ex.initCause(cause); + throw ex; + } + } + final Map> recordsToReturn; + synchronized (this) { + if (failSet.contains(ServFail.of(questionRecord))) { + throw new DnsException(SERVER_FAILURE); + } + recordsToReturn = recordsToReturnByDomain.get(domain); } - final Map> recordsToReturn = recordsToReturnByDomain.get(domain); + LOGGER.debug("Getting {} records for {}", questionRecord.getRecordType(), domain); if (recordsToReturn != null) { final List recordsForType = recordsToReturn.get(questionRecord.getRecordType()); From b1859c76add2641a5797b6612ed061ebfe31eb0a Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Mon, 13 May 2024 09:17:54 -0600 Subject: [PATCH 8/9] WIP --- .../discovery/netty/DefaultDnsClientTest.java | 47 +++++++++---------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java index ee80f361ab..c5f743e2e0 100644 --- a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/DefaultDnsClientTest.java @@ -176,32 +176,34 @@ static Stream missingRecordStatus() { @Test void hedging() throws Exception { // should be bound now. - DatagramSocket datagramSocket = new DatagramSocket(new InetSocketAddress("127.0.0.1", 5657)); - Thread t = new Thread(() -> { - while (true) { - byte[] buf = new byte[2048]; - DatagramPacket packet = new DatagramPacket(buf, buf.length); - try { - datagramSocket.receive(packet); - } catch (IOException ex) { - System.out.println("Exception: " + ex); - return; - } - String packetStr = new String(buf, 0, packet.getLength()); - System.out.println("" + System.currentTimeMillis() + ": Received packet- " + packetStr); - packet.getLength(); - } - }); - t.start(); - - setup(builder -> builder.dnsServerAddressStreamProvider( - new SingletonDnsServerAddressStreamProvider((InetSocketAddress) datagramSocket.getLocalSocketAddress()))); -// recordStore.addStall("unknown.com", latch); +// DatagramSocket datagramSocket = new DatagramSocket(new InetSocketAddress("127.0.0.1", 5657)); +// Thread t = new Thread(() -> { +// while (true) { +// byte[] buf = new byte[2048]; +// DatagramPacket packet = new DatagramPacket(buf, buf.length); +// try { +// datagramSocket.receive(packet); +// } catch (IOException ex) { +// System.out.println("Exception: " + ex); +// return; +// } +// String packetStr = new String(buf, 0, packet.getLength()); +// System.out.println("" + System.currentTimeMillis() + ": Received packet- " + packetStr); +// packet.getLength(); +// } +// }); +// t.start(); + +// setup(builder -> builder.dnsServerAddressStreamProvider( +// new SequentialDnsServerAddressStreamProvider((InetSocketAddress) datagramSocket.getLocalSocketAddress(), dnsServer.localAddress()))); + setup(); final String targetDomain1 = "sd.domain.com"; final String ip1 = nextIp(); recordStore.addIPv4Address(targetDomain1, DEFAULT_TTL, ip1); + CountDownLatch latch = new CountDownLatch(1); + recordStore.addStall(targetDomain1, latch); TestPublisherSubscriber> subscriber = dnsQuery(targetDomain1); Subscription subscription = subscriber.awaitSubscription(); @@ -212,9 +214,6 @@ void hedging() throws Exception { List> signals = subscriber.takeOnNext(1); assertHasEvent(signals, ip1, AVAILABLE); - - - t.interrupt(); } From 3fdd86c9a71a96245155818cd442a917dcefa39e Mon Sep 17 00:00:00 2001 From: Bryce Anderson Date: Thu, 6 Jun 2024 11:03:57 -0600 Subject: [PATCH 9/9] Add a moving variance computation --- .../dns/discovery/netty/DefaultDnsClient.java | 2 +- .../netty/HedgingDnsNameResolver.java | 97 +++++-------------- .../dns/discovery/netty/MovingVariance.java | 64 ++++++++++++ .../discovery/netty/MovingVarianceTest.java | 55 +++++++++++ 4 files changed, 144 insertions(+), 74 deletions(-) create mode 100644 servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/MovingVariance.java create mode 100644 servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/MovingVarianceTest.java diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java index 030233a38f..fcaa9f100c 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/DefaultDnsClient.java @@ -230,7 +230,7 @@ final class DefaultDnsClient implements DnsClient { // new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()), nettyIoExecutor); // TODO: this is just for hacking together tests. new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()), nettyIoExecutor, - HedgingDnsNameResolver.constantTracker(100), HedgingDnsNameResolver.alwaysBudget()); + HedgingDnsNameResolver.constantTracker(100), HedgingDnsNameResolver.alwaysAllowBudget()); } else { resolver = new UnderlyingDnsResolver.NettyDnsNameResolver(builder.build()); } diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java index c012871407..c1e0a24e86 100644 --- a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/HedgingDnsNameResolver.java @@ -150,7 +150,28 @@ interface Budget { boolean withdraw(); } - // TODO: both these implementations are un-synchronized and rely on netty using only a single event loop. + private static final class DefaultPercentileTracker implements PercentileTracker { + + private final MovingVariance movingVariance; + private final double multiple; + + DefaultPercentileTracker(final double multiple, final int historySize) { + movingVariance = new MovingVariance(historySize); + this.multiple = multiple; + } + + @Override + public void addSample(long sample) { + int clipped = Math.max(0, sample > Integer.MAX_VALUE ? Integer.MAX_VALUE : (int) sample); + movingVariance.addSample(clipped); + } + + @Override + public long getValue() { + return Math.round(movingVariance.mean() + movingVariance.stdev() * multiple); + } + } + private static final class DefaultBudgetImpl implements Budget { private final int depositAmount; @@ -185,78 +206,8 @@ public boolean withdraw() { } } - // TODO: we shouldn't need to worry about concurrency if this is all happening in the same netty channel. - private static final class DefaultPercentileTracker implements PercentileTracker { - - // TODO: we need to make the buckets grow exponentially to save space. - private final int[] buckets; - private final double percentile; - private final int sampleThreshold; - private long lastValue; - private int sampleCount; - - DefaultPercentileTracker(int buckets, double percentile, int sampleThreshold) { - if (percentile < 0 || percentile > 1) { - throw new IllegalArgumentException("Unexpected percentile value: " + percentile); - } - this.buckets = new int[ensurePositive(buckets, "buckets")]; - this.percentile = percentile; - this.sampleThreshold = ensurePositive(sampleThreshold, "sampleThreshold"); - lastValue = Long.MAX_VALUE; - } - - @Override - public void addSample(long value) { - maybeSwap(); - int bucket = valueToBucket(value); - buckets[bucket]++; - sampleCount++; - } - - @Override - public long getValue() { - maybeSwap(); - return lastValue; - } - - private void maybeSwap() { - if (shouldSwap()) { - lastValue = compute(); - } - } - - private boolean shouldSwap() { - return sampleCount >= sampleThreshold; - } - - private long compute() { - long targetCount = (long) (sampleCount * percentile); - sampleCount = 0; - long result = -1; - for (int i = 0; i < buckets.length; i++) { - if (result != -1) { - targetCount -= buckets[i]; - if (targetCount <= 0) { - result = bucketToValue(i); - } - } - buckets[i] = 0; - } - assert result != -1; // we should have found a bucket. - return max(1, result); - } - - private long bucketToValue(int bucket) { - return bucket; - } - - private int valueToBucket(long value) { - return (int) max(0, min(buckets.length, value)); - } - } - private static PercentileTracker defaultTracker() { - return new DefaultPercentileTracker(128, 0.98, 200); + return new DefaultPercentileTracker(3.0, 256); } private static Budget defaultBudget() { @@ -278,7 +229,7 @@ public long getValue() { }; } - static Budget alwaysBudget() { + static Budget alwaysAllowBudget() { return new Budget() { @Override public void deposit() { diff --git a/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/MovingVariance.java b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/MovingVariance.java new file mode 100644 index 0000000000..8f4e7527a2 --- /dev/null +++ b/servicetalk-dns-discovery-netty/src/main/java/io/servicetalk/dns/discovery/netty/MovingVariance.java @@ -0,0 +1,64 @@ +package io.servicetalk.dns.discovery.netty; + +import java.util.Arrays; + +final class MovingVariance { + + private final int size; + private final double invSize; + private final double invSizeBySizeMinus1; + + // We initialize with the assumption that all previous sames were zero. This lets us know the sum (x[i]) == 0 + // and that Var(x[n]) == 0. However, that means that until `size` samples the variance will be artificially low. + private final int[] xi; + private int ii; + private long sumXi; + private long varianceSizeSizeMinus1; + + MovingVariance(final int size) { + this(size, Integer.MAX_VALUE); + } + + MovingVariance(final int size, final int initialMean) { + if (size < 2) { + throw new IllegalArgumentException("Must allow at least two samples"); + } + this.size = size; + this.invSize = 1.0 / size; + this.invSizeBySizeMinus1 = 1.0 / (size * (size - 1)); + this.xi = new int[size]; + Arrays.fill(xi, initialMean); + sumXi = ((long) initialMean) * size; + } + + public double mean() { + return sumXi * invSize; + } + + public double variance() { + return varianceSizeSizeMinus1 * invSizeBySizeMinus1; + } + + public double stdev() { + return Math.sqrt(variance()); + } + + public void addSample(int sample) { + // Widen sample to a long so that we don't have to worry about overflows. + final long xn = sample; + final int i = getIndex(); + final long x0 = xi[i]; + xi[i] = sample; + final long oldSumXi = sumXi; + sumXi += xn - x0; + varianceSizeSizeMinus1 = varianceSizeSizeMinus1 + (size * (xn + x0) - sumXi - oldSumXi) * (xn - x0); + } + + private int getIndex() { + final int result = ii; + if (++ii == size) { + ii = 0; + } + return result; + } +} diff --git a/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/MovingVarianceTest.java b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/MovingVarianceTest.java new file mode 100644 index 0000000000..b574aa1c75 --- /dev/null +++ b/servicetalk-dns-discovery-netty/src/test/java/io/servicetalk/dns/discovery/netty/MovingVarianceTest.java @@ -0,0 +1,55 @@ +package io.servicetalk.dns.discovery.netty; + +import org.junit.jupiter.api.RepeatedTest; +import org.junit.jupiter.api.Test; + +import java.util.concurrent.ThreadLocalRandom; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.closeTo; +import static org.hamcrest.Matchers.equalTo; + +class MovingVarianceTest { + + @Test + void twoSamples() { + MovingVariance m = new MovingVariance(2); + m.addSample(1); + m.addSample(1); + assertThat(m.variance(), equalTo(0.0)); + } + + @RepeatedTest(1000) + void jquikish() { + ThreadLocalRandom r = ThreadLocalRandom.current(); + final int size = r.nextInt(2, 100); + MovingVariance m = new MovingVariance(size); + int[] samples = new int[size]; + for (int i = 0; i < size; i++) { + int xi = r.nextInt(-100, 100); + samples[i] = xi; + m.addSample(xi); + } + + assertThat(m.variance(), closeTo(variance(samples), 0.0001)); + assertThat(m.mean(), closeTo(mean(samples), 0.0001)); + } + + private double variance(int[] values) { + final double mean = mean(values); + double accumulator = 0; + for (int value : values) { + double diff = value - mean; + accumulator += diff * diff; + } + return accumulator / (values.length - 1); + } + + double mean(int[] values) { + long sum = 0; + for (double v : values) { + sum += v; + } + return ((double) sum) / values.length; + } +}