From 7ad23db7826f31ec2da5c4c9ea2cf6fbf8d8f176 Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Sat, 9 Aug 2025 16:25:40 +0200 Subject: [PATCH 1/8] Base proposal to fix #1146 --- .../AbstractUaaTokenProvider.java | 83 ++++++++++++++----- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java index 09c23487c0..3a80459ade 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java @@ -37,6 +37,8 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import org.cloudfoundry.Nullable; import org.cloudfoundry.reactor.ConnectionContext; import org.cloudfoundry.reactor.TokenProvider; @@ -83,6 +85,12 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider { private final ConcurrentMap> refreshTokens = new ConcurrentHashMap<>(1); + private final ConcurrentMap tokenSchedulers = + new ConcurrentHashMap<>(1); + + private final ConcurrentMap> activeTokenRequests = + new ConcurrentHashMap<>(1); + /** * The client id. Defaults to {@code cf}. */ @@ -297,30 +305,59 @@ private void setAuthorization(HttpHeaders headers) { headers.set(AUTHORIZATION, String.format("Basic %s", encoded)); } - private Mono token(ConnectionContext connectionContext) { - Mono cached = - this.refreshTokens - .getOrDefault(connectionContext, Mono.empty()) - .flatMap( - refreshToken -> - refreshToken(connectionContext, refreshToken) - .doOnSubscribe( - s -> - LOGGER.debug( - "Negotiating using refresh" - + " token"))) - .switchIfEmpty( - primaryToken(connectionContext) - .doOnSubscribe( - s -> - LOGGER.debug( - "Negotiating using token" - + " provider"))); + private Mono token(final ConnectionContext connectionContext) { + // Get or create a single-threaded scheduler for this connection context + final Scheduler tokenScheduler = this.tokenSchedulers.computeIfAbsent( + connectionContext, + ctx -> Schedulers.newSingle("token-" + ctx.hashCode()) + ); + + return Mono.defer(() -> { + // Check if there's already an active token request + final Mono existingRequest = this.activeTokenRequests.get(connectionContext); + if (existingRequest != null) { + LOGGER.debug("Reusing existing token request for connection context"); + return existingRequest; + } + + // Create new token request with proper synchronization and cache duration + final Mono baseTokenRequest = createTokenRequest(connectionContext) + .publishOn(tokenScheduler) // Ensure execution on single thread + .doOnSubscribe(s -> LOGGER.debug("Starting new token request")) + .doOnSuccess(token -> LOGGER.debug("Token request completed successfully")) + .doOnError(error -> LOGGER.debug("Token request failed", error)) + .doFinally(signal -> { + // Clear the active request when done (success or error) + this.activeTokenRequests.remove(connectionContext); + LOGGER.debug("Cleared active token request for connection context"); + }); + + // Apply cache duration from connection context + final Mono newTokenRequest = connectionContext + .getCacheDuration() + .map(baseTokenRequest::cache) + .orElseGet(baseTokenRequest::cache); + + // Store the active request atomically + final Mono actualRequest = this.activeTokenRequests.putIfAbsent(connectionContext, newTokenRequest); + if (actualRequest != null) { + // Another thread beat us to it, use their request + LOGGER.debug("Another thread created token request first, using theirs"); + return actualRequest; + } + + // We successfully stored our request, use it + return newTokenRequest; + }); + } - return connectionContext - .getCacheDuration() - .map(cached::cache) - .orElseGet(cached::cache) + private Mono createTokenRequest(final ConnectionContext connectionContext) { + return this.refreshTokens + .getOrDefault(connectionContext, Mono.empty()) + .flatMap(refreshToken -> refreshToken(connectionContext, refreshToken) + .doOnSubscribe(s -> LOGGER.debug("Negotiating using refresh token"))) + .switchIfEmpty(primaryToken(connectionContext) + .doOnSubscribe(s -> LOGGER.debug("Negotiating using token provider"))) .checkpoint(); } From a3886d7fb7fa2df8716a19cac87439d7f0747618 Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Sat, 9 Aug 2025 16:33:50 +0200 Subject: [PATCH 2/8] Noting down the intention of the fix --- .../AbstractUaaTokenProvider.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java index 3a80459ade..9ea926b183 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java @@ -306,21 +306,47 @@ private void setAuthorization(HttpHeaders headers) { } private Mono token(final ConnectionContext connectionContext) { + /* + * Beware of issue #1146! + * There can be both multiple concurrent callers coming here during creation of the Mono + * and there can be concurrent callers during the subscription/execution of the Mono: + * - The latter is very harmful: This may lead to concurrent execution of the logic + * written in requestToken and hence to multiple requests to the UAA server using + * the same *value for the refresh token*! The UAA server will sequentialize them, + * one will go through just as normal and a new refresh token gets issued. + * The UAA server invalidates the old refresh token. The second request then arrives + * with the old refresh token and gets rejected. In an earlier version this led + * to caching the second request, hence to cache an error. This caused deadlocks. + * - The first is only "not nice", if the second issue is resolved: It causes that + * we will request two access tokens from the UAA server shortly after the other, + * but having use appropriate refresh tokens sequentially. The second request + * simply is to be considered waste. + * + * The coding below fixes both issues: It ensures that the execution of the Mono + * is synchronized and it ensures that two threads arriving to fetch the JWT in a + * non-caching situation does not trigger "wasteful" requests to the UAA server. + */ + // Get or create a single-threaded scheduler for this connection context final Scheduler tokenScheduler = this.tokenSchedulers.computeIfAbsent( connectionContext, ctx -> Schedulers.newSingle("token-" + ctx.hashCode()) ); + /* + * We use Mono.defer to ensure that the execution of the locking not happens + * during creation of the Mono (where it is of little relevance), but + * during the execution/subscription. + */ return Mono.defer(() -> { // Check if there's already an active token request final Mono existingRequest = this.activeTokenRequests.get(connectionContext); if (existingRequest != null) { - LOGGER.debug("Reusing existing token request for connection context"); + LOGGER.debug("Reusing existing UAA JWT token request for connection context"); return existingRequest; } - // Create new token request with proper synchronization and cache duration + // Create new token request with proper synchronization final Mono baseTokenRequest = createTokenRequest(connectionContext) .publishOn(tokenScheduler) // Ensure execution on single thread .doOnSubscribe(s -> LOGGER.debug("Starting new token request")) From b0a41a82d8eed2451e4e1c91437d20f787315fde Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Sat, 9 Aug 2025 20:53:06 +0200 Subject: [PATCH 3/8] Move single-thread Scheduler to ConnectionContext --- .../cloudfoundry/reactor/ConnectionContext.java | 6 ++++++ .../reactor/_DefaultConnectionContext.java | 9 +++++++++ .../tokenprovider/AbstractUaaTokenProvider.java | 15 ++------------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java index 4c90f669f7..417cdea62c 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.Optional; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.netty.http.client.HttpClient; /** @@ -52,6 +53,11 @@ public interface ConnectionContext { */ RootProvider getRootProvider(); + /** + * The {@link Scheduler} to use for token operations + */ + Scheduler getTokenScheduler(); + /** * Attempt to explicitly trust the TLS certificate of an endpoint. Implementations can choose whether any actual trusting will happen. * diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java index 0aea075c6f..62ceea663d 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java @@ -32,6 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; @@ -74,6 +76,7 @@ abstract class _DefaultConnectionContext implements ConnectionContext { @PreDestroy public final void dispose() { getConnectionProvider().ifPresent(ConnectionProvider::dispose); + getTokenScheduler().dispose(); getThreadPool().dispose(); try { @@ -154,6 +157,12 @@ public Mono trust(String host, int port) { .orElse(Mono.empty()); } + @Override + @Value.Derived + public Scheduler getTokenScheduler() { + return Schedulers.newSingle(String.format("token-provider-%s/%d", getApiHost(), getPort().orElse(DEFAULT_PORT))); + } + /** * Additional configuration for the underlying HttpClient */ diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java index 9ea926b183..fcdb4c12e0 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java @@ -37,8 +37,6 @@ import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; -import reactor.core.scheduler.Scheduler; -import reactor.core.scheduler.Schedulers; import org.cloudfoundry.Nullable; import org.cloudfoundry.reactor.ConnectionContext; import org.cloudfoundry.reactor.TokenProvider; @@ -85,9 +83,6 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider { private final ConcurrentMap> refreshTokens = new ConcurrentHashMap<>(1); - private final ConcurrentMap tokenSchedulers = - new ConcurrentHashMap<>(1); - private final ConcurrentMap> activeTokenRequests = new ConcurrentHashMap<>(1); @@ -323,16 +318,10 @@ private Mono token(final ConnectionContext connectionContext) { * simply is to be considered waste. * * The coding below fixes both issues: It ensures that the execution of the Mono - * is synchronized and it ensures that two threads arriving to fetch the JWT in a + * is synchronized and it ensures that two threads arriving to fetch the JWT in a * non-caching situation does not trigger "wasteful" requests to the UAA server. */ - // Get or create a single-threaded scheduler for this connection context - final Scheduler tokenScheduler = this.tokenSchedulers.computeIfAbsent( - connectionContext, - ctx -> Schedulers.newSingle("token-" + ctx.hashCode()) - ); - /* * We use Mono.defer to ensure that the execution of the locking not happens * during creation of the Mono (where it is of little relevance), but @@ -348,7 +337,7 @@ private Mono token(final ConnectionContext connectionContext) { // Create new token request with proper synchronization final Mono baseTokenRequest = createTokenRequest(connectionContext) - .publishOn(tokenScheduler) // Ensure execution on single thread + .publishOn(connectionContext.getTokenScheduler()) // Ensure execution on single thread .doOnSubscribe(s -> LOGGER.debug("Starting new token request")) .doOnSuccess(token -> LOGGER.debug("Token request completed successfully")) .doOnError(error -> LOGGER.debug("Token request failed", error)) From 9526f35b2e8cb3b9623dab9a8ef40a521a0dd587 Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Sat, 9 Aug 2025 21:05:03 +0200 Subject: [PATCH 4/8] Improve comments/documentation --- .../AbstractUaaTokenProvider.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java index fcdb4c12e0..5a498c4263 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java @@ -335,16 +335,18 @@ private Mono token(final ConnectionContext connectionContext) { return existingRequest; } - // Create new token request with proper synchronization final Mono baseTokenRequest = createTokenRequest(connectionContext) - .publishOn(connectionContext.getTokenScheduler()) // Ensure execution on single thread - .doOnSubscribe(s -> LOGGER.debug("Starting new token request")) - .doOnSuccess(token -> LOGGER.debug("Token request completed successfully")) - .doOnError(error -> LOGGER.debug("Token request failed", error)) + /* + * Ensure execution on single thread. + * This prevents sending requests to the UAA server with expired refresh tokens. + */ + .publishOn(connectionContext.getTokenScheduler()) + .doOnSubscribe(s -> LOGGER.debug("Starting new UAA JWT token request")) + .doOnSuccess(token -> LOGGER.debug("UAA JWT token request completed successfully")) + .doOnError(error -> LOGGER.debug("UAA JWT token request failed", error)) .doFinally(signal -> { // Clear the active request when done (success or error) this.activeTokenRequests.remove(connectionContext); - LOGGER.debug("Cleared active token request for connection context"); }); // Apply cache duration from connection context @@ -356,8 +358,8 @@ private Mono token(final ConnectionContext connectionContext) { // Store the active request atomically final Mono actualRequest = this.activeTokenRequests.putIfAbsent(connectionContext, newTokenRequest); if (actualRequest != null) { - // Another thread beat us to it, use their request - LOGGER.debug("Another thread created token request first, using theirs"); + // Another thread beat us to it, use their request. This prevents "wasteful" requests. + LOGGER.debug("Another thread created token request first, using theirs instead"); return actualRequest; } From 908e5f60557be67f40ffd8908cd82efb7e92cbbd Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Sat, 9 Aug 2025 21:36:09 +0200 Subject: [PATCH 5/8] Initial implementation of unit test for concurrency issue --- ...stractUaaTokenProviderConcurrencyTest.java | 280 ++++++++++++++++++ .../reactor/tokenprovider/MockUaaServer.java | 199 +++++++++++++ 2 files changed, 479 insertions(+) create mode 100644 cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java create mode 100644 cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java new file mode 100644 index 0000000000..5624372173 --- /dev/null +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java @@ -0,0 +1,280 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.tokenprovider; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import org.cloudfoundry.reactor.ConnectionContext; +import org.cloudfoundry.reactor.DefaultConnectionContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.netty.http.client.HttpClientForm; +import reactor.netty.http.client.HttpClientRequest; + +/** + * Integration-style tests for AbstractUaaTokenProvider that verify the + * corrected behavior for concurrent token requests with expired access tokens. + * + * These tests verify the fix for issue #1146: "Parallel Requests with Expired + * Access Tokens triggering Refresh Token Flow leads to Broken State". + */ +class AbstractUaaTokenProviderConcurrencyTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(AbstractUaaTokenProviderConcurrencyTest.class); + + private MockUaaServer mockUaaServer; + private ConnectionContext connectionContext; + private TestTokenProvider tokenProvider; + private ExecutorService executorService; + + @BeforeEach + void setUp() throws IOException { + mockUaaServer = new MockUaaServer(); + + // Extract port from URL like "http://localhost:12345/" + final String baseUrl = mockUaaServer.getBaseUrl(); + final int port = Integer.parseInt(baseUrl.split(":")[2].split("/")[0]); + + connectionContext = DefaultConnectionContext.builder() + .apiHost("localhost") + .port(port) + .secure(false) + .cacheDuration(Duration.ofMillis(100)) // Short cache for testing + .build(); + + tokenProvider = new TestTokenProvider(); + executorService = Executors.newFixedThreadPool(10); + } + + @AfterEach + void tearDown() throws IOException { + if (mockUaaServer != null) { + mockUaaServer.shutdown(); + } + if (executorService != null) { + executorService.shutdown(); + } + } + + /** + * Test that concurrent token requests don't cause broken state. + * This is the main test case for issue #1146. + */ + @Test + void concurrentTokenRequestsWithRefreshTokenRotation() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token to establish refresh token + final String initialToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + // Reset request count to focus on concurrent requests + mockUaaServer.resetRequestCount(); + + // Invalidate the token to force refresh on next request + tokenProvider.invalidate(connectionContext); + + // Launch multiple concurrent requests + final int concurrentRequests = 5; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + throw new RuntimeException(e); + } + }, executorService); + futures.add(future); + } + + // Start all requests simultaneously + startLatch.countDown(); + + // Wait for all requests to complete + final List tokens = new ArrayList<>(); + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + assertThat(token).isNotNull(); + tokens.add(token); + } + + // Verify all tokens are valid (not null/empty) + assertThat(tokens).hasSize(concurrentRequests); + for (final String token : tokens) { + assertThat(token).isNotNull().isNotEmpty(); + } + + // Verify that subsequent requests still work (no broken state) + final String subsequentToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(subsequentToken).isNotNull(); + + LOGGER.info("Concurrent test completed successfully. Total UAA requests: {}", mockUaaServer.getRequestCount()); + } + + /** + * Test that the token provider handles UAA server errors gracefully during + * concurrent requests. + */ + @Test + void concurrentTokenRequestsWithServerErrors() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token + final String initialToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + // Configure server to fail the first few refresh requests + mockUaaServer.setShouldFailRefreshRequests(true, 2); + + // Invalidate to force refresh + tokenProvider.invalidate(connectionContext); + + // Launch concurrent requests + final int concurrentRequests = 3; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + return null; // Allow some failures + } + }, executorService); + futures.add(future); + } + + startLatch.countDown(); + + // Collect results (some may be null due to failures) + final List tokens = new ArrayList<>(); + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + if (token != null) { + tokens.add(token); + } + } + + // At least one request should succeed eventually + assertThat(tokens).isNotEmpty(); + + // Verify system recovers and subsequent requests work + final String recoveryToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(recoveryToken).isNotNull(); + + LOGGER.info("Error handling test completed. Successful tokens: {}, Total UAA requests: {}", + tokens.size(), mockUaaServer.getRequestCount()); + } + + /** + * Test that the token provider properly serializes token requests to prevent + * multiple concurrent UAA requests with the same refresh token. + */ + @Test + void tokenRequestSerialization() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token + final String initialToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + final int initialRequestCount = mockUaaServer.getRequestCount(); + + // Invalidate to force refresh + tokenProvider.invalidate(connectionContext); + + // Launch many concurrent requests + final int concurrentRequests = 10; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + throw new RuntimeException(e); + } + }, executorService); + futures.add(future); + } + + startLatch.countDown(); + + // Wait for all to complete + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + assertThat(token).isNotNull(); + } + + final int finalRequestCount = mockUaaServer.getRequestCount(); + final int newRequests = finalRequestCount - initialRequestCount; + + // The key assertion: despite many concurrent requests, only a minimal number of + // actual UAA requests should be made due to proper serialization and caching + assertThat(newRequests).isLessThanOrEqualTo(3); // Allow some margin for timing + + LOGGER.info("Serialization test completed. Concurrent requests: {}, Actual UAA requests: {}", + concurrentRequests, newRequests); + } + + /** + * Test token provider implementation for testing purposes. + */ + private static class TestTokenProvider extends AbstractUaaTokenProvider { + + @Override + String getIdentityZoneSubdomain() { + return null; + } + + @Override + void tokenRequestTransformer(final HttpClientRequest request, final HttpClientForm form) { + form.multipart(false) + .attr("client_id", getClientId()) + .attr("client_secret", getClientSecret()) + .attr("grant_type", "password") + .attr("username", "test-user") + .attr("password", "test-password"); + } + } +} diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java new file mode 100644 index 0000000000..750569edcc --- /dev/null +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java @@ -0,0 +1,199 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.tokenprovider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import mockwebserver3.Dispatcher; +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import mockwebserver3.RecordedRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock UAA server for testing token provider behavior. + * Simulates UAA refresh token rotation and invalidation behavior. + */ +class MockUaaServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MockUaaServer.class); + + private final MockWebServer mockWebServer; + private final ObjectMapper objectMapper; + private final AtomicReference currentValidRefreshToken; + private final AtomicInteger requestCount; + private volatile boolean shouldFailRefreshRequests = false; + private volatile int failureCount = 0; + private volatile int maxFailures = 0; + + public MockUaaServer() throws IOException { + this.mockWebServer = new MockWebServer(); + this.objectMapper = new ObjectMapper(); + this.currentValidRefreshToken = new AtomicReference<>(); + this.requestCount = new AtomicInteger(0); + + // Set up dispatcher to handle token requests + this.mockWebServer.setDispatcher(new TokenRequestDispatcher()); + + this.mockWebServer.start(); + } + + public String getBaseUrl() { + return mockWebServer.url("/").toString(); + } + + public void setInitialRefreshToken(final String refreshToken) { + this.currentValidRefreshToken.set(refreshToken); + } + + public void setShouldFailRefreshRequests(final boolean shouldFail) { + this.shouldFailRefreshRequests = shouldFail; + this.failureCount = 0; + } + + public void setShouldFailRefreshRequests(final boolean shouldFail, final int maxFailures) { + this.shouldFailRefreshRequests = shouldFail; + this.maxFailures = maxFailures; + this.failureCount = 0; + } + + public int getRequestCount() { + return requestCount.get(); + } + + public void resetRequestCount() { + requestCount.set(0); + } + + public void shutdown() throws IOException { + mockWebServer.shutdown(); + } + + private MockResponse handleTokenRequest(final RecordedRequest request) throws IOException { + requestCount.incrementAndGet(); + + if (!"/oauth/token".equals(request.getPath())) { + return new MockResponse().setResponseCode(404); + } + + final String requestBody = request.getBody().readUtf8(); + LOGGER.debug("Received token request: {}", requestBody); + + // Check if this is a refresh token request + if (requestBody.contains("grant_type=refresh_token")) { + return handleRefreshTokenRequest(requestBody); + } + + // Handle other grant types (password, client_credentials, etc.) + return handlePrimaryTokenRequest(); + } + + /* Warning! synchronized is really necessary here to ensure no concurrent threads are trying to run into a refresh token handling! */ + private synchronized MockResponse handleRefreshTokenRequest(final String requestBody) throws IOException { + // Check if we should fail requests + if (shouldFailRefreshRequests) { + if (maxFailures == 0 || failureCount < maxFailures) { + failureCount++; + LOGGER.debug("Failing refresh token request (failure {} of {})", failureCount, maxFailures); + return new MockResponse() + .setResponseCode(401) + .setBody("{\"error\":\"invalid_grant\",\"error_description\":\"Invalid refresh token\"}"); + } else { + // Reset failure mode after max failures reached + shouldFailRefreshRequests = false; + } + } + + // Extract refresh token from request body + final String refreshTokenFromRequest = extractRefreshTokenFromRequest(requestBody); + final String currentValid = currentValidRefreshToken.get(); + + if (currentValid != null && !currentValid.equals(refreshTokenFromRequest)) { + LOGGER.debug("Invalid refresh token provided. Expected: {}, Got: {}", currentValid, refreshTokenFromRequest); + return new MockResponse() + .setResponseCode(401) + .setBody("{\"error\":\"invalid_grant\",\"error_description\":\"Invalid refresh token\"}"); + } + + // Generate new tokens + return generateTokenResponse(); + } + + private MockResponse handlePrimaryTokenRequest() throws IOException { + // For primary token requests (password grant, etc.), always succeed + return generateTokenResponse(); + } + + private MockResponse generateTokenResponse() throws IOException { + final long now = Instant.now().getEpochSecond(); + final long expiresAt = now + 3600; // 1 hour from now + + // Create a simple JWT-like token (not cryptographically valid, just for testing) + final String header = Base64.getEncoder().encodeToString("{\"alg\":\"none\"}".getBytes()); + final String payload = Base64.getEncoder().encodeToString( + String.format("{\"exp\":%d,\"iat\":%d,\"sub\":\"test-user\"}", expiresAt, now).getBytes() + ); + final String accessToken = header + "." + payload + "."; + + // Generate new refresh token + final String newRefreshToken = "refresh-token-" + System.nanoTime(); + currentValidRefreshToken.set(newRefreshToken); + + final Map response = new HashMap<>(); + response.put("access_token", accessToken); + response.put("token_type", "Bearer"); + response.put("expires_in", 3600); + response.put("refresh_token", newRefreshToken); + response.put("scope", "openid"); + + final String responseBody = objectMapper.writeValueAsString(response); + LOGGER.debug("Generated token response with refresh token: {}", newRefreshToken); + + return new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(responseBody); + } + + private String extractRefreshTokenFromRequest(final String requestBody) { + // Simple extraction of refresh_token parameter + final String[] parts = requestBody.split("&"); + for (final String part : parts) { + if (part.startsWith("refresh_token=")) { + return part.substring("refresh_token=".length()); + } + } + return null; + } + + private class TokenRequestDispatcher extends Dispatcher { + @Override + public MockResponse dispatch(final RecordedRequest request) { + try { + return handleTokenRequest(request); + } catch (final Exception e) { + LOGGER.error("Error handling request", e); + return new MockResponse().setResponseCode(500); + } + } + } +} From 58cfaf14e4c406397b9c61aa34136eddb2735f5f Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Sat, 9 Aug 2025 22:18:57 +0200 Subject: [PATCH 6/8] Unit Test require /v2/info mocking as well --- .../reactor/tokenprovider/MockUaaServer.java | 41 +++++++++++++++++-- 1 file changed, 37 insertions(+), 4 deletions(-) diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java index 750569edcc..9ac1ee590f 100644 --- a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java @@ -88,13 +88,23 @@ public void shutdown() throws IOException { mockWebServer.shutdown(); } - private MockResponse handleTokenRequest(final RecordedRequest request) throws IOException { + private MockResponse handleRequest(final RecordedRequest request) throws IOException { requestCount.incrementAndGet(); - if (!"/oauth/token".equals(request.getPath())) { + final String path = request.getPath(); + LOGGER.debug("Received request: {} {}", request.getMethod(), path); + + if ("/v2/info".equals(path)) { + return handleInfoRequest(); + } else if ("/oauth/token".equals(path)) { + return handleTokenRequest(request); + } else { + LOGGER.debug("Unknown path requested: {}", path); return new MockResponse().setResponseCode(404); } - + } + + private MockResponse handleTokenRequest(final RecordedRequest request) throws IOException { final String requestBody = request.getBody().readUtf8(); LOGGER.debug("Received token request: {}", requestBody); @@ -107,6 +117,29 @@ private MockResponse handleTokenRequest(final RecordedRequest request) throws IO return handlePrimaryTokenRequest(); } + private MockResponse handleInfoRequest() throws IOException { + LOGGER.debug("Handling /v2/info request"); + + final Map infoResponse = new HashMap<>(); + infoResponse.put("authorization_endpoint", getBaseUrl().replaceAll("/$", "")); + infoResponse.put("token_endpoint", getBaseUrl().replaceAll("/$", "")); + infoResponse.put("app_ssh_endpoint", "ssh.localhost:2222"); + infoResponse.put("app_ssh_host_key_fingerprint", "test-fingerprint"); + infoResponse.put("api_version", "2.165.0"); + infoResponse.put("name", "test-cf"); + infoResponse.put("build", "test-build"); + infoResponse.put("version", 0); + infoResponse.put("description", "Unit Test Cloud Foundry"); + + final String responseBody = objectMapper.writeValueAsString(infoResponse); + LOGGER.debug("Generated info response: {}", responseBody); + + return new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(responseBody); + } + /* Warning! synchronized is really necessary here to ensure no concurrent threads are trying to run into a refresh token handling! */ private synchronized MockResponse handleRefreshTokenRequest(final String requestBody) throws IOException { // Check if we should fail requests @@ -189,7 +222,7 @@ private class TokenRequestDispatcher extends Dispatcher { @Override public MockResponse dispatch(final RecordedRequest request) { try { - return handleTokenRequest(request); + return handleRequest(request); } catch (final Exception e) { LOGGER.error("Error handling request", e); return new MockResponse().setResponseCode(500); From a2e8f14efc752eed38406b10d344b1782e0f3996 Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Sun, 10 Aug 2025 21:25:56 +0200 Subject: [PATCH 7/8] Ensure that caching is also done in a single thread --- .../tokenprovider/AbstractUaaTokenProvider.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java index 5a498c4263..c00cf919bc 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java @@ -336,11 +336,6 @@ private Mono token(final ConnectionContext connectionContext) { } final Mono baseTokenRequest = createTokenRequest(connectionContext) - /* - * Ensure execution on single thread. - * This prevents sending requests to the UAA server with expired refresh tokens. - */ - .publishOn(connectionContext.getTokenScheduler()) .doOnSubscribe(s -> LOGGER.debug("Starting new UAA JWT token request")) .doOnSuccess(token -> LOGGER.debug("UAA JWT token request completed successfully")) .doOnError(error -> LOGGER.debug("UAA JWT token request failed", error)) @@ -353,7 +348,12 @@ private Mono token(final ConnectionContext connectionContext) { final Mono newTokenRequest = connectionContext .getCacheDuration() .map(baseTokenRequest::cache) - .orElseGet(baseTokenRequest::cache); + .orElseGet(baseTokenRequest::cache) + /* + * Ensure execution on single thread. + * This prevents sending requests to the UAA server with expired refresh tokens. + */ + .publishOn(connectionContext.getTokenScheduler()); // Store the active request atomically final Mono actualRequest = this.activeTokenRequests.putIfAbsent(connectionContext, newTokenRequest); From ca1a02071ab3e582a4dc0d1cb8a722a904f09338 Mon Sep 17 00:00:00 2001 From: Nico Schmoigl Date: Tue, 19 Aug 2025 19:46:49 +0200 Subject: [PATCH 8/8] Spotless-ification --- .../AbstractUaaTokenProvider.java | 118 ++-- ...stractUaaTokenProviderConcurrencyTest.java | 586 +++++++++--------- .../reactor/tokenprovider/MockUaaServer.java | 479 +++++++------- 3 files changed, 625 insertions(+), 558 deletions(-) diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java index c00cf919bc..b89dfac02a 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java @@ -307,16 +307,16 @@ private Mono token(final ConnectionContext connectionContext) { * and there can be concurrent callers during the subscription/execution of the Mono: * - The latter is very harmful: This may lead to concurrent execution of the logic * written in requestToken and hence to multiple requests to the UAA server using - * the same *value for the refresh token*! The UAA server will sequentialize them, + * the same *value for the refresh token*! The UAA server will sequentialize them, * one will go through just as normal and a new refresh token gets issued. - * The UAA server invalidates the old refresh token. The second request then arrives + * The UAA server invalidates the old refresh token. The second request then arrives * with the old refresh token and gets rejected. In an earlier version this led * to caching the second request, hence to cache an error. This caused deadlocks. * - The first is only "not nice", if the second issue is resolved: It causes that * we will request two access tokens from the UAA server shortly after the other, * but having use appropriate refresh tokens sequentially. The second request * simply is to be considered waste. - * + * * The coding below fixes both issues: It ensures that the execution of the Mono * is synchronized and it ensures that two threads arriving to fetch the JWT in a * non-caching situation does not trigger "wasteful" requests to the UAA server. @@ -327,54 +327,80 @@ private Mono token(final ConnectionContext connectionContext) { * during creation of the Mono (where it is of little relevance), but * during the execution/subscription. */ - return Mono.defer(() -> { - // Check if there's already an active token request - final Mono existingRequest = this.activeTokenRequests.get(connectionContext); - if (existingRequest != null) { - LOGGER.debug("Reusing existing UAA JWT token request for connection context"); - return existingRequest; - } - - final Mono baseTokenRequest = createTokenRequest(connectionContext) - .doOnSubscribe(s -> LOGGER.debug("Starting new UAA JWT token request")) - .doOnSuccess(token -> LOGGER.debug("UAA JWT token request completed successfully")) - .doOnError(error -> LOGGER.debug("UAA JWT token request failed", error)) - .doFinally(signal -> { - // Clear the active request when done (success or error) - this.activeTokenRequests.remove(connectionContext); - }); - - // Apply cache duration from connection context - final Mono newTokenRequest = connectionContext - .getCacheDuration() - .map(baseTokenRequest::cache) - .orElseGet(baseTokenRequest::cache) - /* - * Ensure execution on single thread. - * This prevents sending requests to the UAA server with expired refresh tokens. - */ - .publishOn(connectionContext.getTokenScheduler()); - - // Store the active request atomically - final Mono actualRequest = this.activeTokenRequests.putIfAbsent(connectionContext, newTokenRequest); - if (actualRequest != null) { - // Another thread beat us to it, use their request. This prevents "wasteful" requests. - LOGGER.debug("Another thread created token request first, using theirs instead"); - return actualRequest; - } - - // We successfully stored our request, use it - return newTokenRequest; - }); + return Mono.defer( + () -> { + // Check if there's already an active token request + final Mono existingRequest = + this.activeTokenRequests.get(connectionContext); + if (existingRequest != null) { + LOGGER.debug( + "Reusing existing UAA JWT token request for connection context"); + return existingRequest; + } + + final Mono baseTokenRequest = + createTokenRequest(connectionContext) + .doOnSubscribe( + s -> LOGGER.debug("Starting new UAA JWT token request")) + .doOnSuccess( + token -> + LOGGER.debug( + "UAA JWT token request completed" + + " successfully")) + .doOnError( + error -> + LOGGER.debug( + "UAA JWT token request failed", error)) + .doFinally( + signal -> { + // Clear the active request when done (success or + // error) + this.activeTokenRequests.remove(connectionContext); + }); + + // Apply cache duration from connection context + final Mono newTokenRequest = + connectionContext + .getCacheDuration() + .map(baseTokenRequest::cache) + .orElseGet(baseTokenRequest::cache) + /* + * Ensure execution on single thread. + * This prevents sending requests to the UAA server with expired refresh tokens. + */ + .publishOn(connectionContext.getTokenScheduler()); + + // Store the active request atomically + final Mono actualRequest = + this.activeTokenRequests.putIfAbsent( + connectionContext, newTokenRequest); + if (actualRequest != null) { + // Another thread beat us to it, use their request. This prevents "wasteful" + // requests. + LOGGER.debug( + "Another thread created token request first, using theirs instead"); + return actualRequest; + } + + // We successfully stored our request, use it + return newTokenRequest; + }); } private Mono createTokenRequest(final ConnectionContext connectionContext) { return this.refreshTokens .getOrDefault(connectionContext, Mono.empty()) - .flatMap(refreshToken -> refreshToken(connectionContext, refreshToken) - .doOnSubscribe(s -> LOGGER.debug("Negotiating using refresh token"))) - .switchIfEmpty(primaryToken(connectionContext) - .doOnSubscribe(s -> LOGGER.debug("Negotiating using token provider"))) + .flatMap( + refreshToken -> + refreshToken(connectionContext, refreshToken) + .doOnSubscribe( + s -> + LOGGER.debug( + "Negotiating using refresh token"))) + .switchIfEmpty( + primaryToken(connectionContext) + .doOnSubscribe( + s -> LOGGER.debug("Negotiating using token provider"))) .checkpoint(); } diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java index 5624372173..7722896314 100644 --- a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java @@ -1,280 +1,306 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.cloudfoundry.reactor.tokenprovider; - -import static org.assertj.core.api.Assertions.assertThat; - -import java.io.IOException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; - -import org.cloudfoundry.reactor.ConnectionContext; -import org.cloudfoundry.reactor.DefaultConnectionContext; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import reactor.netty.http.client.HttpClientForm; -import reactor.netty.http.client.HttpClientRequest; - -/** - * Integration-style tests for AbstractUaaTokenProvider that verify the - * corrected behavior for concurrent token requests with expired access tokens. - * - * These tests verify the fix for issue #1146: "Parallel Requests with Expired - * Access Tokens triggering Refresh Token Flow leads to Broken State". - */ -class AbstractUaaTokenProviderConcurrencyTest { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractUaaTokenProviderConcurrencyTest.class); - - private MockUaaServer mockUaaServer; - private ConnectionContext connectionContext; - private TestTokenProvider tokenProvider; - private ExecutorService executorService; - - @BeforeEach - void setUp() throws IOException { - mockUaaServer = new MockUaaServer(); - - // Extract port from URL like "http://localhost:12345/" - final String baseUrl = mockUaaServer.getBaseUrl(); - final int port = Integer.parseInt(baseUrl.split(":")[2].split("/")[0]); - - connectionContext = DefaultConnectionContext.builder() - .apiHost("localhost") - .port(port) - .secure(false) - .cacheDuration(Duration.ofMillis(100)) // Short cache for testing - .build(); - - tokenProvider = new TestTokenProvider(); - executorService = Executors.newFixedThreadPool(10); - } - - @AfterEach - void tearDown() throws IOException { - if (mockUaaServer != null) { - mockUaaServer.shutdown(); - } - if (executorService != null) { - executorService.shutdown(); - } - } - - /** - * Test that concurrent token requests don't cause broken state. - * This is the main test case for issue #1146. - */ - @Test - void concurrentTokenRequestsWithRefreshTokenRotation() throws Exception { - // Set up initial refresh token - mockUaaServer.setInitialRefreshToken("initial-refresh-token"); - - // Get initial token to establish refresh token - final String initialToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); - assertThat(initialToken).isNotNull(); - - // Reset request count to focus on concurrent requests - mockUaaServer.resetRequestCount(); - - // Invalidate the token to force refresh on next request - tokenProvider.invalidate(connectionContext); - - // Launch multiple concurrent requests - final int concurrentRequests = 5; - final CountDownLatch startLatch = new CountDownLatch(1); - final List> futures = new ArrayList<>(); - - for (int i = 0; i < concurrentRequests; i++) { - final CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - startLatch.await(5, TimeUnit.SECONDS); - return tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(10)); - } catch (final Exception e) { - LOGGER.error("Error getting token", e); - throw new RuntimeException(e); - } - }, executorService); - futures.add(future); - } - - // Start all requests simultaneously - startLatch.countDown(); - - // Wait for all requests to complete - final List tokens = new ArrayList<>(); - for (final CompletableFuture future : futures) { - final String token = future.get(15, TimeUnit.SECONDS); - assertThat(token).isNotNull(); - tokens.add(token); - } - - // Verify all tokens are valid (not null/empty) - assertThat(tokens).hasSize(concurrentRequests); - for (final String token : tokens) { - assertThat(token).isNotNull().isNotEmpty(); - } - - // Verify that subsequent requests still work (no broken state) - final String subsequentToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); - assertThat(subsequentToken).isNotNull(); - - LOGGER.info("Concurrent test completed successfully. Total UAA requests: {}", mockUaaServer.getRequestCount()); - } - - /** - * Test that the token provider handles UAA server errors gracefully during - * concurrent requests. - */ - @Test - void concurrentTokenRequestsWithServerErrors() throws Exception { - // Set up initial refresh token - mockUaaServer.setInitialRefreshToken("initial-refresh-token"); - - // Get initial token - final String initialToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); - assertThat(initialToken).isNotNull(); - - // Configure server to fail the first few refresh requests - mockUaaServer.setShouldFailRefreshRequests(true, 2); - - // Invalidate to force refresh - tokenProvider.invalidate(connectionContext); - - // Launch concurrent requests - final int concurrentRequests = 3; - final CountDownLatch startLatch = new CountDownLatch(1); - final List> futures = new ArrayList<>(); - - for (int i = 0; i < concurrentRequests; i++) { - final CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - startLatch.await(5, TimeUnit.SECONDS); - return tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(10)); - } catch (final Exception e) { - LOGGER.error("Error getting token", e); - return null; // Allow some failures - } - }, executorService); - futures.add(future); - } - - startLatch.countDown(); - - // Collect results (some may be null due to failures) - final List tokens = new ArrayList<>(); - for (final CompletableFuture future : futures) { - final String token = future.get(15, TimeUnit.SECONDS); - if (token != null) { - tokens.add(token); - } - } - - // At least one request should succeed eventually - assertThat(tokens).isNotEmpty(); - - // Verify system recovers and subsequent requests work - final String recoveryToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); - assertThat(recoveryToken).isNotNull(); - - LOGGER.info("Error handling test completed. Successful tokens: {}, Total UAA requests: {}", - tokens.size(), mockUaaServer.getRequestCount()); - } - - /** - * Test that the token provider properly serializes token requests to prevent - * multiple concurrent UAA requests with the same refresh token. - */ - @Test - void tokenRequestSerialization() throws Exception { - // Set up initial refresh token - mockUaaServer.setInitialRefreshToken("initial-refresh-token"); - - // Get initial token - final String initialToken = tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); - assertThat(initialToken).isNotNull(); - - final int initialRequestCount = mockUaaServer.getRequestCount(); - - // Invalidate to force refresh - tokenProvider.invalidate(connectionContext); - - // Launch many concurrent requests - final int concurrentRequests = 10; - final CountDownLatch startLatch = new CountDownLatch(1); - final List> futures = new ArrayList<>(); - - for (int i = 0; i < concurrentRequests; i++) { - final CompletableFuture future = CompletableFuture.supplyAsync(() -> { - try { - startLatch.await(5, TimeUnit.SECONDS); - return tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(10)); - } catch (final Exception e) { - LOGGER.error("Error getting token", e); - throw new RuntimeException(e); - } - }, executorService); - futures.add(future); - } - - startLatch.countDown(); - - // Wait for all to complete - for (final CompletableFuture future : futures) { - final String token = future.get(15, TimeUnit.SECONDS); - assertThat(token).isNotNull(); - } - - final int finalRequestCount = mockUaaServer.getRequestCount(); - final int newRequests = finalRequestCount - initialRequestCount; - - // The key assertion: despite many concurrent requests, only a minimal number of - // actual UAA requests should be made due to proper serialization and caching - assertThat(newRequests).isLessThanOrEqualTo(3); // Allow some margin for timing - - LOGGER.info("Serialization test completed. Concurrent requests: {}, Actual UAA requests: {}", - concurrentRequests, newRequests); - } - - /** - * Test token provider implementation for testing purposes. - */ - private static class TestTokenProvider extends AbstractUaaTokenProvider { - - @Override - String getIdentityZoneSubdomain() { - return null; - } - - @Override - void tokenRequestTransformer(final HttpClientRequest request, final HttpClientForm form) { - form.multipart(false) - .attr("client_id", getClientId()) - .attr("client_secret", getClientSecret()) - .attr("grant_type", "password") - .attr("username", "test-user") - .attr("password", "test-password"); - } - } -} +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.tokenprovider; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.cloudfoundry.reactor.ConnectionContext; +import org.cloudfoundry.reactor.DefaultConnectionContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.netty.http.client.HttpClientForm; +import reactor.netty.http.client.HttpClientRequest; + +/** + * Integration-style tests for AbstractUaaTokenProvider that verify the + * corrected behavior for concurrent token requests with expired access tokens. + * + * These tests verify the fix for issue #1146: "Parallel Requests with Expired + * Access Tokens triggering Refresh Token Flow leads to Broken State". + */ +class AbstractUaaTokenProviderConcurrencyTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AbstractUaaTokenProviderConcurrencyTest.class); + + private MockUaaServer mockUaaServer; + private ConnectionContext connectionContext; + private TestTokenProvider tokenProvider; + private ExecutorService executorService; + + @BeforeEach + void setUp() throws IOException { + mockUaaServer = new MockUaaServer(); + + // Extract port from URL like "http://localhost:12345/" + final String baseUrl = mockUaaServer.getBaseUrl(); + final int port = Integer.parseInt(baseUrl.split(":")[2].split("/")[0]); + + connectionContext = + DefaultConnectionContext.builder() + .apiHost("localhost") + .port(port) + .secure(false) + .cacheDuration(Duration.ofMillis(100)) // Short cache for testing + .build(); + + tokenProvider = new TestTokenProvider(); + executorService = Executors.newFixedThreadPool(10); + } + + @AfterEach + void tearDown() throws IOException { + if (mockUaaServer != null) { + mockUaaServer.shutdown(); + } + if (executorService != null) { + executorService.shutdown(); + } + } + + /** + * Test that concurrent token requests don't cause broken state. + * This is the main test case for issue #1146. + */ + @Test + void concurrentTokenRequestsWithRefreshTokenRotation() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token to establish refresh token + final String initialToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + // Reset request count to focus on concurrent requests + mockUaaServer.resetRequestCount(); + + // Invalidate the token to force refresh on next request + tokenProvider.invalidate(connectionContext); + + // Launch multiple concurrent requests + final int concurrentRequests = 5; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider + .getToken(connectionContext) + .block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + throw new RuntimeException(e); + } + }, + executorService); + futures.add(future); + } + + // Start all requests simultaneously + startLatch.countDown(); + + // Wait for all requests to complete + final List tokens = new ArrayList<>(); + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + assertThat(token).isNotNull(); + tokens.add(token); + } + + // Verify all tokens are valid (not null/empty) + assertThat(tokens).hasSize(concurrentRequests); + for (final String token : tokens) { + assertThat(token).isNotNull().isNotEmpty(); + } + + // Verify that subsequent requests still work (no broken state) + final String subsequentToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(subsequentToken).isNotNull(); + + LOGGER.info( + "Concurrent test completed successfully. Total UAA requests: {}", + mockUaaServer.getRequestCount()); + } + + /** + * Test that the token provider handles UAA server errors gracefully during + * concurrent requests. + */ + @Test + void concurrentTokenRequestsWithServerErrors() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token + final String initialToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + // Configure server to fail the first few refresh requests + mockUaaServer.setShouldFailRefreshRequests(true, 2); + + // Invalidate to force refresh + tokenProvider.invalidate(connectionContext); + + // Launch concurrent requests + final int concurrentRequests = 3; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider + .getToken(connectionContext) + .block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + return null; // Allow some failures + } + }, + executorService); + futures.add(future); + } + + startLatch.countDown(); + + // Collect results (some may be null due to failures) + final List tokens = new ArrayList<>(); + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + if (token != null) { + tokens.add(token); + } + } + + // At least one request should succeed eventually + assertThat(tokens).isNotEmpty(); + + // Verify system recovers and subsequent requests work + final String recoveryToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(recoveryToken).isNotNull(); + + LOGGER.info( + "Error handling test completed. Successful tokens: {}, Total UAA requests: {}", + tokens.size(), + mockUaaServer.getRequestCount()); + } + + /** + * Test that the token provider properly serializes token requests to prevent + * multiple concurrent UAA requests with the same refresh token. + */ + @Test + void tokenRequestSerialization() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token + final String initialToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + final int initialRequestCount = mockUaaServer.getRequestCount(); + + // Invalidate to force refresh + tokenProvider.invalidate(connectionContext); + + // Launch many concurrent requests + final int concurrentRequests = 10; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider + .getToken(connectionContext) + .block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + throw new RuntimeException(e); + } + }, + executorService); + futures.add(future); + } + + startLatch.countDown(); + + // Wait for all to complete + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + assertThat(token).isNotNull(); + } + + final int finalRequestCount = mockUaaServer.getRequestCount(); + final int newRequests = finalRequestCount - initialRequestCount; + + // The key assertion: despite many concurrent requests, only a minimal number of + // actual UAA requests should be made due to proper serialization and caching + assertThat(newRequests).isLessThanOrEqualTo(3); // Allow some margin for timing + + LOGGER.info( + "Serialization test completed. Concurrent requests: {}, Actual UAA requests: {}", + concurrentRequests, + newRequests); + } + + /** + * Test token provider implementation for testing purposes. + */ + private static class TestTokenProvider extends AbstractUaaTokenProvider { + + @Override + String getIdentityZoneSubdomain() { + return null; + } + + @Override + void tokenRequestTransformer(final HttpClientRequest request, final HttpClientForm form) { + form.multipart(false) + .attr("client_id", getClientId()) + .attr("client_secret", getClientSecret()) + .attr("grant_type", "password") + .attr("username", "test-user") + .attr("password", "test-password"); + } + } +} diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java index 9ac1ee590f..7c5c36ad34 100644 --- a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java @@ -1,232 +1,247 @@ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.cloudfoundry.reactor.tokenprovider; - -import com.fasterxml.jackson.databind.ObjectMapper; -import java.io.IOException; -import java.time.Instant; -import java.util.Base64; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; -import mockwebserver3.Dispatcher; -import mockwebserver3.MockResponse; -import mockwebserver3.MockWebServer; -import mockwebserver3.RecordedRequest; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Mock UAA server for testing token provider behavior. - * Simulates UAA refresh token rotation and invalidation behavior. - */ -class MockUaaServer { - - private static final Logger LOGGER = LoggerFactory.getLogger(MockUaaServer.class); - - private final MockWebServer mockWebServer; - private final ObjectMapper objectMapper; - private final AtomicReference currentValidRefreshToken; - private final AtomicInteger requestCount; - private volatile boolean shouldFailRefreshRequests = false; - private volatile int failureCount = 0; - private volatile int maxFailures = 0; - - public MockUaaServer() throws IOException { - this.mockWebServer = new MockWebServer(); - this.objectMapper = new ObjectMapper(); - this.currentValidRefreshToken = new AtomicReference<>(); - this.requestCount = new AtomicInteger(0); - - // Set up dispatcher to handle token requests - this.mockWebServer.setDispatcher(new TokenRequestDispatcher()); - - this.mockWebServer.start(); - } - - public String getBaseUrl() { - return mockWebServer.url("/").toString(); - } - - public void setInitialRefreshToken(final String refreshToken) { - this.currentValidRefreshToken.set(refreshToken); - } - - public void setShouldFailRefreshRequests(final boolean shouldFail) { - this.shouldFailRefreshRequests = shouldFail; - this.failureCount = 0; - } - - public void setShouldFailRefreshRequests(final boolean shouldFail, final int maxFailures) { - this.shouldFailRefreshRequests = shouldFail; - this.maxFailures = maxFailures; - this.failureCount = 0; - } - - public int getRequestCount() { - return requestCount.get(); - } - - public void resetRequestCount() { - requestCount.set(0); - } - - public void shutdown() throws IOException { - mockWebServer.shutdown(); - } - - private MockResponse handleRequest(final RecordedRequest request) throws IOException { - requestCount.incrementAndGet(); - - final String path = request.getPath(); - LOGGER.debug("Received request: {} {}", request.getMethod(), path); - - if ("/v2/info".equals(path)) { - return handleInfoRequest(); - } else if ("/oauth/token".equals(path)) { - return handleTokenRequest(request); - } else { - LOGGER.debug("Unknown path requested: {}", path); - return new MockResponse().setResponseCode(404); - } - } - - private MockResponse handleTokenRequest(final RecordedRequest request) throws IOException { - final String requestBody = request.getBody().readUtf8(); - LOGGER.debug("Received token request: {}", requestBody); - - // Check if this is a refresh token request - if (requestBody.contains("grant_type=refresh_token")) { - return handleRefreshTokenRequest(requestBody); - } - - // Handle other grant types (password, client_credentials, etc.) - return handlePrimaryTokenRequest(); - } - - private MockResponse handleInfoRequest() throws IOException { - LOGGER.debug("Handling /v2/info request"); - - final Map infoResponse = new HashMap<>(); - infoResponse.put("authorization_endpoint", getBaseUrl().replaceAll("/$", "")); - infoResponse.put("token_endpoint", getBaseUrl().replaceAll("/$", "")); - infoResponse.put("app_ssh_endpoint", "ssh.localhost:2222"); - infoResponse.put("app_ssh_host_key_fingerprint", "test-fingerprint"); - infoResponse.put("api_version", "2.165.0"); - infoResponse.put("name", "test-cf"); - infoResponse.put("build", "test-build"); - infoResponse.put("version", 0); - infoResponse.put("description", "Unit Test Cloud Foundry"); - - final String responseBody = objectMapper.writeValueAsString(infoResponse); - LOGGER.debug("Generated info response: {}", responseBody); - - return new MockResponse() - .setResponseCode(200) - .setHeader("Content-Type", "application/json") - .setBody(responseBody); - } - - /* Warning! synchronized is really necessary here to ensure no concurrent threads are trying to run into a refresh token handling! */ - private synchronized MockResponse handleRefreshTokenRequest(final String requestBody) throws IOException { - // Check if we should fail requests - if (shouldFailRefreshRequests) { - if (maxFailures == 0 || failureCount < maxFailures) { - failureCount++; - LOGGER.debug("Failing refresh token request (failure {} of {})", failureCount, maxFailures); - return new MockResponse() - .setResponseCode(401) - .setBody("{\"error\":\"invalid_grant\",\"error_description\":\"Invalid refresh token\"}"); - } else { - // Reset failure mode after max failures reached - shouldFailRefreshRequests = false; - } - } - - // Extract refresh token from request body - final String refreshTokenFromRequest = extractRefreshTokenFromRequest(requestBody); - final String currentValid = currentValidRefreshToken.get(); - - if (currentValid != null && !currentValid.equals(refreshTokenFromRequest)) { - LOGGER.debug("Invalid refresh token provided. Expected: {}, Got: {}", currentValid, refreshTokenFromRequest); - return new MockResponse() - .setResponseCode(401) - .setBody("{\"error\":\"invalid_grant\",\"error_description\":\"Invalid refresh token\"}"); - } - - // Generate new tokens - return generateTokenResponse(); - } - - private MockResponse handlePrimaryTokenRequest() throws IOException { - // For primary token requests (password grant, etc.), always succeed - return generateTokenResponse(); - } - - private MockResponse generateTokenResponse() throws IOException { - final long now = Instant.now().getEpochSecond(); - final long expiresAt = now + 3600; // 1 hour from now - - // Create a simple JWT-like token (not cryptographically valid, just for testing) - final String header = Base64.getEncoder().encodeToString("{\"alg\":\"none\"}".getBytes()); - final String payload = Base64.getEncoder().encodeToString( - String.format("{\"exp\":%d,\"iat\":%d,\"sub\":\"test-user\"}", expiresAt, now).getBytes() - ); - final String accessToken = header + "." + payload + "."; - - // Generate new refresh token - final String newRefreshToken = "refresh-token-" + System.nanoTime(); - currentValidRefreshToken.set(newRefreshToken); - - final Map response = new HashMap<>(); - response.put("access_token", accessToken); - response.put("token_type", "Bearer"); - response.put("expires_in", 3600); - response.put("refresh_token", newRefreshToken); - response.put("scope", "openid"); - - final String responseBody = objectMapper.writeValueAsString(response); - LOGGER.debug("Generated token response with refresh token: {}", newRefreshToken); - - return new MockResponse() - .setResponseCode(200) - .setHeader("Content-Type", "application/json") - .setBody(responseBody); - } - - private String extractRefreshTokenFromRequest(final String requestBody) { - // Simple extraction of refresh_token parameter - final String[] parts = requestBody.split("&"); - for (final String part : parts) { - if (part.startsWith("refresh_token=")) { - return part.substring("refresh_token=".length()); - } - } - return null; - } - - private class TokenRequestDispatcher extends Dispatcher { - @Override - public MockResponse dispatch(final RecordedRequest request) { - try { - return handleRequest(request); - } catch (final Exception e) { - LOGGER.error("Error handling request", e); - return new MockResponse().setResponseCode(500); - } - } - } -} +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.tokenprovider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import mockwebserver3.Dispatcher; +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import mockwebserver3.RecordedRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock UAA server for testing token provider behavior. + * Simulates UAA refresh token rotation and invalidation behavior. + */ +class MockUaaServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MockUaaServer.class); + + private final MockWebServer mockWebServer; + private final ObjectMapper objectMapper; + private final AtomicReference currentValidRefreshToken; + private final AtomicInteger requestCount; + private volatile boolean shouldFailRefreshRequests = false; + private volatile int failureCount = 0; + private volatile int maxFailures = 0; + + public MockUaaServer() throws IOException { + this.mockWebServer = new MockWebServer(); + this.objectMapper = new ObjectMapper(); + this.currentValidRefreshToken = new AtomicReference<>(); + this.requestCount = new AtomicInteger(0); + + // Set up dispatcher to handle token requests + this.mockWebServer.setDispatcher(new TokenRequestDispatcher()); + + this.mockWebServer.start(); + } + + public String getBaseUrl() { + return mockWebServer.url("/").toString(); + } + + public void setInitialRefreshToken(final String refreshToken) { + this.currentValidRefreshToken.set(refreshToken); + } + + public void setShouldFailRefreshRequests(final boolean shouldFail) { + this.shouldFailRefreshRequests = shouldFail; + this.failureCount = 0; + } + + public void setShouldFailRefreshRequests(final boolean shouldFail, final int maxFailures) { + this.shouldFailRefreshRequests = shouldFail; + this.maxFailures = maxFailures; + this.failureCount = 0; + } + + public int getRequestCount() { + return requestCount.get(); + } + + public void resetRequestCount() { + requestCount.set(0); + } + + public void shutdown() throws IOException { + mockWebServer.shutdown(); + } + + private MockResponse handleRequest(final RecordedRequest request) throws IOException { + requestCount.incrementAndGet(); + + final String path = request.getPath(); + LOGGER.debug("Received request: {} {}", request.getMethod(), path); + + if ("/v2/info".equals(path)) { + return handleInfoRequest(); + } else if ("/oauth/token".equals(path)) { + return handleTokenRequest(request); + } else { + LOGGER.debug("Unknown path requested: {}", path); + return new MockResponse().setResponseCode(404); + } + } + + private MockResponse handleTokenRequest(final RecordedRequest request) throws IOException { + final String requestBody = request.getBody().readUtf8(); + LOGGER.debug("Received token request: {}", requestBody); + + // Check if this is a refresh token request + if (requestBody.contains("grant_type=refresh_token")) { + return handleRefreshTokenRequest(requestBody); + } + + // Handle other grant types (password, client_credentials, etc.) + return handlePrimaryTokenRequest(); + } + + private MockResponse handleInfoRequest() throws IOException { + LOGGER.debug("Handling /v2/info request"); + + final Map infoResponse = new HashMap<>(); + infoResponse.put("authorization_endpoint", getBaseUrl().replaceAll("/$", "")); + infoResponse.put("token_endpoint", getBaseUrl().replaceAll("/$", "")); + infoResponse.put("app_ssh_endpoint", "ssh.localhost:2222"); + infoResponse.put("app_ssh_host_key_fingerprint", "test-fingerprint"); + infoResponse.put("api_version", "2.165.0"); + infoResponse.put("name", "test-cf"); + infoResponse.put("build", "test-build"); + infoResponse.put("version", 0); + infoResponse.put("description", "Unit Test Cloud Foundry"); + + final String responseBody = objectMapper.writeValueAsString(infoResponse); + LOGGER.debug("Generated info response: {}", responseBody); + + return new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(responseBody); + } + + /* Warning! synchronized is really necessary here to ensure no concurrent threads are trying to run into a refresh token handling! */ + private synchronized MockResponse handleRefreshTokenRequest(final String requestBody) + throws IOException { + // Check if we should fail requests + if (shouldFailRefreshRequests) { + if (maxFailures == 0 || failureCount < maxFailures) { + failureCount++; + LOGGER.debug( + "Failing refresh token request (failure {} of {})", + failureCount, + maxFailures); + return new MockResponse() + .setResponseCode(401) + .setBody( + "{\"error\":\"invalid_grant\",\"error_description\":\"Invalid" + + " refresh token\"}"); + } else { + // Reset failure mode after max failures reached + shouldFailRefreshRequests = false; + } + } + + // Extract refresh token from request body + final String refreshTokenFromRequest = extractRefreshTokenFromRequest(requestBody); + final String currentValid = currentValidRefreshToken.get(); + + if (currentValid != null && !currentValid.equals(refreshTokenFromRequest)) { + LOGGER.debug( + "Invalid refresh token provided. Expected: {}, Got: {}", + currentValid, + refreshTokenFromRequest); + return new MockResponse() + .setResponseCode(401) + .setBody( + "{\"error\":\"invalid_grant\",\"error_description\":\"Invalid refresh" + + " token\"}"); + } + + // Generate new tokens + return generateTokenResponse(); + } + + private MockResponse handlePrimaryTokenRequest() throws IOException { + // For primary token requests (password grant, etc.), always succeed + return generateTokenResponse(); + } + + private MockResponse generateTokenResponse() throws IOException { + final long now = Instant.now().getEpochSecond(); + final long expiresAt = now + 3600; // 1 hour from now + + // Create a simple JWT-like token (not cryptographically valid, just for testing) + final String header = Base64.getEncoder().encodeToString("{\"alg\":\"none\"}".getBytes()); + final String payload = + Base64.getEncoder() + .encodeToString( + String.format( + "{\"exp\":%d,\"iat\":%d,\"sub\":\"test-user\"}", + expiresAt, now) + .getBytes()); + final String accessToken = header + "." + payload + "."; + + // Generate new refresh token + final String newRefreshToken = "refresh-token-" + System.nanoTime(); + currentValidRefreshToken.set(newRefreshToken); + + final Map response = new HashMap<>(); + response.put("access_token", accessToken); + response.put("token_type", "Bearer"); + response.put("expires_in", 3600); + response.put("refresh_token", newRefreshToken); + response.put("scope", "openid"); + + final String responseBody = objectMapper.writeValueAsString(response); + LOGGER.debug("Generated token response with refresh token: {}", newRefreshToken); + + return new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(responseBody); + } + + private String extractRefreshTokenFromRequest(final String requestBody) { + // Simple extraction of refresh_token parameter + final String[] parts = requestBody.split("&"); + for (final String part : parts) { + if (part.startsWith("refresh_token=")) { + return part.substring("refresh_token=".length()); + } + } + return null; + } + + private class TokenRequestDispatcher extends Dispatcher { + @Override + public MockResponse dispatch(final RecordedRequest request) { + try { + return handleRequest(request); + } catch (final Exception e) { + LOGGER.error("Error handling request", e); + return new MockResponse().setResponseCode(500); + } + } + } +}