diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java index 4c90f669f7..417cdea62c 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/ConnectionContext.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.util.Optional; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; import reactor.netty.http.client.HttpClient; /** @@ -52,6 +53,11 @@ public interface ConnectionContext { */ RootProvider getRootProvider(); + /** + * The {@link Scheduler} to use for token operations + */ + Scheduler getTokenScheduler(); + /** * Attempt to explicitly trust the TLS certificate of an endpoint. Implementations can choose whether any actual trusting will happen. * diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java index 0aea075c6f..62ceea663d 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/_DefaultConnectionContext.java @@ -32,6 +32,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; import reactor.netty.http.client.HttpClient; import reactor.netty.resources.ConnectionProvider; import reactor.netty.resources.LoopResources; @@ -74,6 +76,7 @@ abstract class _DefaultConnectionContext implements ConnectionContext { @PreDestroy public final void dispose() { getConnectionProvider().ifPresent(ConnectionProvider::dispose); + getTokenScheduler().dispose(); getThreadPool().dispose(); try { @@ -154,6 +157,12 @@ public Mono trust(String host, int port) { .orElse(Mono.empty()); } + @Override + @Value.Derived + public Scheduler getTokenScheduler() { + return Schedulers.newSingle(String.format("token-provider-%s/%d", getApiHost(), getPort().orElse(DEFAULT_PORT))); + } + /** * Additional configuration for the underlying HttpClient */ diff --git a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java index 09c23487c0..b89dfac02a 100644 --- a/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java +++ b/cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProvider.java @@ -83,6 +83,9 @@ public abstract class AbstractUaaTokenProvider implements TokenProvider { private final ConcurrentMap> refreshTokens = new ConcurrentHashMap<>(1); + private final ConcurrentMap> activeTokenRequests = + new ConcurrentHashMap<>(1); + /** * The client id. Defaults to {@code cf}. */ @@ -297,30 +300,107 @@ private void setAuthorization(HttpHeaders headers) { headers.set(AUTHORIZATION, String.format("Basic %s", encoded)); } - private Mono token(ConnectionContext connectionContext) { - Mono cached = - this.refreshTokens - .getOrDefault(connectionContext, Mono.empty()) - .flatMap( - refreshToken -> - refreshToken(connectionContext, refreshToken) - .doOnSubscribe( - s -> - LOGGER.debug( - "Negotiating using refresh" - + " token"))) - .switchIfEmpty( - primaryToken(connectionContext) + private Mono token(final ConnectionContext connectionContext) { + /* + * Beware of issue #1146! + * There can be both multiple concurrent callers coming here during creation of the Mono + * and there can be concurrent callers during the subscription/execution of the Mono: + * - The latter is very harmful: This may lead to concurrent execution of the logic + * written in requestToken and hence to multiple requests to the UAA server using + * the same *value for the refresh token*! The UAA server will sequentialize them, + * one will go through just as normal and a new refresh token gets issued. + * The UAA server invalidates the old refresh token. The second request then arrives + * with the old refresh token and gets rejected. In an earlier version this led + * to caching the second request, hence to cache an error. This caused deadlocks. + * - The first is only "not nice", if the second issue is resolved: It causes that + * we will request two access tokens from the UAA server shortly after the other, + * but having use appropriate refresh tokens sequentially. The second request + * simply is to be considered waste. + * + * The coding below fixes both issues: It ensures that the execution of the Mono + * is synchronized and it ensures that two threads arriving to fetch the JWT in a + * non-caching situation does not trigger "wasteful" requests to the UAA server. + */ + + /* + * We use Mono.defer to ensure that the execution of the locking not happens + * during creation of the Mono (where it is of little relevance), but + * during the execution/subscription. + */ + return Mono.defer( + () -> { + // Check if there's already an active token request + final Mono existingRequest = + this.activeTokenRequests.get(connectionContext); + if (existingRequest != null) { + LOGGER.debug( + "Reusing existing UAA JWT token request for connection context"); + return existingRequest; + } + + final Mono baseTokenRequest = + createTokenRequest(connectionContext) + .doOnSubscribe( + s -> LOGGER.debug("Starting new UAA JWT token request")) + .doOnSuccess( + token -> + LOGGER.debug( + "UAA JWT token request completed" + + " successfully")) + .doOnError( + error -> + LOGGER.debug( + "UAA JWT token request failed", error)) + .doFinally( + signal -> { + // Clear the active request when done (success or + // error) + this.activeTokenRequests.remove(connectionContext); + }); + + // Apply cache duration from connection context + final Mono newTokenRequest = + connectionContext + .getCacheDuration() + .map(baseTokenRequest::cache) + .orElseGet(baseTokenRequest::cache) + /* + * Ensure execution on single thread. + * This prevents sending requests to the UAA server with expired refresh tokens. + */ + .publishOn(connectionContext.getTokenScheduler()); + + // Store the active request atomically + final Mono actualRequest = + this.activeTokenRequests.putIfAbsent( + connectionContext, newTokenRequest); + if (actualRequest != null) { + // Another thread beat us to it, use their request. This prevents "wasteful" + // requests. + LOGGER.debug( + "Another thread created token request first, using theirs instead"); + return actualRequest; + } + + // We successfully stored our request, use it + return newTokenRequest; + }); + } + + private Mono createTokenRequest(final ConnectionContext connectionContext) { + return this.refreshTokens + .getOrDefault(connectionContext, Mono.empty()) + .flatMap( + refreshToken -> + refreshToken(connectionContext, refreshToken) .doOnSubscribe( s -> LOGGER.debug( - "Negotiating using token" - + " provider"))); - - return connectionContext - .getCacheDuration() - .map(cached::cache) - .orElseGet(cached::cache) + "Negotiating using refresh token"))) + .switchIfEmpty( + primaryToken(connectionContext) + .doOnSubscribe( + s -> LOGGER.debug("Negotiating using token provider"))) .checkpoint(); } diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java new file mode 100644 index 0000000000..7722896314 --- /dev/null +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/AbstractUaaTokenProviderConcurrencyTest.java @@ -0,0 +1,306 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.tokenprovider; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.IOException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import org.cloudfoundry.reactor.ConnectionContext; +import org.cloudfoundry.reactor.DefaultConnectionContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.netty.http.client.HttpClientForm; +import reactor.netty.http.client.HttpClientRequest; + +/** + * Integration-style tests for AbstractUaaTokenProvider that verify the + * corrected behavior for concurrent token requests with expired access tokens. + * + * These tests verify the fix for issue #1146: "Parallel Requests with Expired + * Access Tokens triggering Refresh Token Flow leads to Broken State". + */ +class AbstractUaaTokenProviderConcurrencyTest { + + private static final Logger LOGGER = + LoggerFactory.getLogger(AbstractUaaTokenProviderConcurrencyTest.class); + + private MockUaaServer mockUaaServer; + private ConnectionContext connectionContext; + private TestTokenProvider tokenProvider; + private ExecutorService executorService; + + @BeforeEach + void setUp() throws IOException { + mockUaaServer = new MockUaaServer(); + + // Extract port from URL like "http://localhost:12345/" + final String baseUrl = mockUaaServer.getBaseUrl(); + final int port = Integer.parseInt(baseUrl.split(":")[2].split("/")[0]); + + connectionContext = + DefaultConnectionContext.builder() + .apiHost("localhost") + .port(port) + .secure(false) + .cacheDuration(Duration.ofMillis(100)) // Short cache for testing + .build(); + + tokenProvider = new TestTokenProvider(); + executorService = Executors.newFixedThreadPool(10); + } + + @AfterEach + void tearDown() throws IOException { + if (mockUaaServer != null) { + mockUaaServer.shutdown(); + } + if (executorService != null) { + executorService.shutdown(); + } + } + + /** + * Test that concurrent token requests don't cause broken state. + * This is the main test case for issue #1146. + */ + @Test + void concurrentTokenRequestsWithRefreshTokenRotation() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token to establish refresh token + final String initialToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + // Reset request count to focus on concurrent requests + mockUaaServer.resetRequestCount(); + + // Invalidate the token to force refresh on next request + tokenProvider.invalidate(connectionContext); + + // Launch multiple concurrent requests + final int concurrentRequests = 5; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider + .getToken(connectionContext) + .block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + throw new RuntimeException(e); + } + }, + executorService); + futures.add(future); + } + + // Start all requests simultaneously + startLatch.countDown(); + + // Wait for all requests to complete + final List tokens = new ArrayList<>(); + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + assertThat(token).isNotNull(); + tokens.add(token); + } + + // Verify all tokens are valid (not null/empty) + assertThat(tokens).hasSize(concurrentRequests); + for (final String token : tokens) { + assertThat(token).isNotNull().isNotEmpty(); + } + + // Verify that subsequent requests still work (no broken state) + final String subsequentToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(subsequentToken).isNotNull(); + + LOGGER.info( + "Concurrent test completed successfully. Total UAA requests: {}", + mockUaaServer.getRequestCount()); + } + + /** + * Test that the token provider handles UAA server errors gracefully during + * concurrent requests. + */ + @Test + void concurrentTokenRequestsWithServerErrors() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token + final String initialToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + // Configure server to fail the first few refresh requests + mockUaaServer.setShouldFailRefreshRequests(true, 2); + + // Invalidate to force refresh + tokenProvider.invalidate(connectionContext); + + // Launch concurrent requests + final int concurrentRequests = 3; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider + .getToken(connectionContext) + .block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + return null; // Allow some failures + } + }, + executorService); + futures.add(future); + } + + startLatch.countDown(); + + // Collect results (some may be null due to failures) + final List tokens = new ArrayList<>(); + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + if (token != null) { + tokens.add(token); + } + } + + // At least one request should succeed eventually + assertThat(tokens).isNotEmpty(); + + // Verify system recovers and subsequent requests work + final String recoveryToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(recoveryToken).isNotNull(); + + LOGGER.info( + "Error handling test completed. Successful tokens: {}, Total UAA requests: {}", + tokens.size(), + mockUaaServer.getRequestCount()); + } + + /** + * Test that the token provider properly serializes token requests to prevent + * multiple concurrent UAA requests with the same refresh token. + */ + @Test + void tokenRequestSerialization() throws Exception { + // Set up initial refresh token + mockUaaServer.setInitialRefreshToken("initial-refresh-token"); + + // Get initial token + final String initialToken = + tokenProvider.getToken(connectionContext).block(Duration.ofSeconds(5)); + assertThat(initialToken).isNotNull(); + + final int initialRequestCount = mockUaaServer.getRequestCount(); + + // Invalidate to force refresh + tokenProvider.invalidate(connectionContext); + + // Launch many concurrent requests + final int concurrentRequests = 10; + final CountDownLatch startLatch = new CountDownLatch(1); + final List> futures = new ArrayList<>(); + + for (int i = 0; i < concurrentRequests; i++) { + final CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + try { + startLatch.await(5, TimeUnit.SECONDS); + return tokenProvider + .getToken(connectionContext) + .block(Duration.ofSeconds(10)); + } catch (final Exception e) { + LOGGER.error("Error getting token", e); + throw new RuntimeException(e); + } + }, + executorService); + futures.add(future); + } + + startLatch.countDown(); + + // Wait for all to complete + for (final CompletableFuture future : futures) { + final String token = future.get(15, TimeUnit.SECONDS); + assertThat(token).isNotNull(); + } + + final int finalRequestCount = mockUaaServer.getRequestCount(); + final int newRequests = finalRequestCount - initialRequestCount; + + // The key assertion: despite many concurrent requests, only a minimal number of + // actual UAA requests should be made due to proper serialization and caching + assertThat(newRequests).isLessThanOrEqualTo(3); // Allow some margin for timing + + LOGGER.info( + "Serialization test completed. Concurrent requests: {}, Actual UAA requests: {}", + concurrentRequests, + newRequests); + } + + /** + * Test token provider implementation for testing purposes. + */ + private static class TestTokenProvider extends AbstractUaaTokenProvider { + + @Override + String getIdentityZoneSubdomain() { + return null; + } + + @Override + void tokenRequestTransformer(final HttpClientRequest request, final HttpClientForm form) { + form.multipart(false) + .attr("client_id", getClientId()) + .attr("client_secret", getClientSecret()) + .attr("grant_type", "password") + .attr("username", "test-user") + .attr("password", "test-password"); + } + } +} diff --git a/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java new file mode 100644 index 0000000000..7c5c36ad34 --- /dev/null +++ b/cloudfoundry-client-reactor/src/test/java/org/cloudfoundry/reactor/tokenprovider/MockUaaServer.java @@ -0,0 +1,247 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.cloudfoundry.reactor.tokenprovider; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.time.Instant; +import java.util.Base64; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import mockwebserver3.Dispatcher; +import mockwebserver3.MockResponse; +import mockwebserver3.MockWebServer; +import mockwebserver3.RecordedRequest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Mock UAA server for testing token provider behavior. + * Simulates UAA refresh token rotation and invalidation behavior. + */ +class MockUaaServer { + + private static final Logger LOGGER = LoggerFactory.getLogger(MockUaaServer.class); + + private final MockWebServer mockWebServer; + private final ObjectMapper objectMapper; + private final AtomicReference currentValidRefreshToken; + private final AtomicInteger requestCount; + private volatile boolean shouldFailRefreshRequests = false; + private volatile int failureCount = 0; + private volatile int maxFailures = 0; + + public MockUaaServer() throws IOException { + this.mockWebServer = new MockWebServer(); + this.objectMapper = new ObjectMapper(); + this.currentValidRefreshToken = new AtomicReference<>(); + this.requestCount = new AtomicInteger(0); + + // Set up dispatcher to handle token requests + this.mockWebServer.setDispatcher(new TokenRequestDispatcher()); + + this.mockWebServer.start(); + } + + public String getBaseUrl() { + return mockWebServer.url("/").toString(); + } + + public void setInitialRefreshToken(final String refreshToken) { + this.currentValidRefreshToken.set(refreshToken); + } + + public void setShouldFailRefreshRequests(final boolean shouldFail) { + this.shouldFailRefreshRequests = shouldFail; + this.failureCount = 0; + } + + public void setShouldFailRefreshRequests(final boolean shouldFail, final int maxFailures) { + this.shouldFailRefreshRequests = shouldFail; + this.maxFailures = maxFailures; + this.failureCount = 0; + } + + public int getRequestCount() { + return requestCount.get(); + } + + public void resetRequestCount() { + requestCount.set(0); + } + + public void shutdown() throws IOException { + mockWebServer.shutdown(); + } + + private MockResponse handleRequest(final RecordedRequest request) throws IOException { + requestCount.incrementAndGet(); + + final String path = request.getPath(); + LOGGER.debug("Received request: {} {}", request.getMethod(), path); + + if ("/v2/info".equals(path)) { + return handleInfoRequest(); + } else if ("/oauth/token".equals(path)) { + return handleTokenRequest(request); + } else { + LOGGER.debug("Unknown path requested: {}", path); + return new MockResponse().setResponseCode(404); + } + } + + private MockResponse handleTokenRequest(final RecordedRequest request) throws IOException { + final String requestBody = request.getBody().readUtf8(); + LOGGER.debug("Received token request: {}", requestBody); + + // Check if this is a refresh token request + if (requestBody.contains("grant_type=refresh_token")) { + return handleRefreshTokenRequest(requestBody); + } + + // Handle other grant types (password, client_credentials, etc.) + return handlePrimaryTokenRequest(); + } + + private MockResponse handleInfoRequest() throws IOException { + LOGGER.debug("Handling /v2/info request"); + + final Map infoResponse = new HashMap<>(); + infoResponse.put("authorization_endpoint", getBaseUrl().replaceAll("/$", "")); + infoResponse.put("token_endpoint", getBaseUrl().replaceAll("/$", "")); + infoResponse.put("app_ssh_endpoint", "ssh.localhost:2222"); + infoResponse.put("app_ssh_host_key_fingerprint", "test-fingerprint"); + infoResponse.put("api_version", "2.165.0"); + infoResponse.put("name", "test-cf"); + infoResponse.put("build", "test-build"); + infoResponse.put("version", 0); + infoResponse.put("description", "Unit Test Cloud Foundry"); + + final String responseBody = objectMapper.writeValueAsString(infoResponse); + LOGGER.debug("Generated info response: {}", responseBody); + + return new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(responseBody); + } + + /* Warning! synchronized is really necessary here to ensure no concurrent threads are trying to run into a refresh token handling! */ + private synchronized MockResponse handleRefreshTokenRequest(final String requestBody) + throws IOException { + // Check if we should fail requests + if (shouldFailRefreshRequests) { + if (maxFailures == 0 || failureCount < maxFailures) { + failureCount++; + LOGGER.debug( + "Failing refresh token request (failure {} of {})", + failureCount, + maxFailures); + return new MockResponse() + .setResponseCode(401) + .setBody( + "{\"error\":\"invalid_grant\",\"error_description\":\"Invalid" + + " refresh token\"}"); + } else { + // Reset failure mode after max failures reached + shouldFailRefreshRequests = false; + } + } + + // Extract refresh token from request body + final String refreshTokenFromRequest = extractRefreshTokenFromRequest(requestBody); + final String currentValid = currentValidRefreshToken.get(); + + if (currentValid != null && !currentValid.equals(refreshTokenFromRequest)) { + LOGGER.debug( + "Invalid refresh token provided. Expected: {}, Got: {}", + currentValid, + refreshTokenFromRequest); + return new MockResponse() + .setResponseCode(401) + .setBody( + "{\"error\":\"invalid_grant\",\"error_description\":\"Invalid refresh" + + " token\"}"); + } + + // Generate new tokens + return generateTokenResponse(); + } + + private MockResponse handlePrimaryTokenRequest() throws IOException { + // For primary token requests (password grant, etc.), always succeed + return generateTokenResponse(); + } + + private MockResponse generateTokenResponse() throws IOException { + final long now = Instant.now().getEpochSecond(); + final long expiresAt = now + 3600; // 1 hour from now + + // Create a simple JWT-like token (not cryptographically valid, just for testing) + final String header = Base64.getEncoder().encodeToString("{\"alg\":\"none\"}".getBytes()); + final String payload = + Base64.getEncoder() + .encodeToString( + String.format( + "{\"exp\":%d,\"iat\":%d,\"sub\":\"test-user\"}", + expiresAt, now) + .getBytes()); + final String accessToken = header + "." + payload + "."; + + // Generate new refresh token + final String newRefreshToken = "refresh-token-" + System.nanoTime(); + currentValidRefreshToken.set(newRefreshToken); + + final Map response = new HashMap<>(); + response.put("access_token", accessToken); + response.put("token_type", "Bearer"); + response.put("expires_in", 3600); + response.put("refresh_token", newRefreshToken); + response.put("scope", "openid"); + + final String responseBody = objectMapper.writeValueAsString(response); + LOGGER.debug("Generated token response with refresh token: {}", newRefreshToken); + + return new MockResponse() + .setResponseCode(200) + .setHeader("Content-Type", "application/json") + .setBody(responseBody); + } + + private String extractRefreshTokenFromRequest(final String requestBody) { + // Simple extraction of refresh_token parameter + final String[] parts = requestBody.split("&"); + for (final String part : parts) { + if (part.startsWith("refresh_token=")) { + return part.substring("refresh_token=".length()); + } + } + return null; + } + + private class TokenRequestDispatcher extends Dispatcher { + @Override + public MockResponse dispatch(final RecordedRequest request) { + try { + return handleRequest(request); + } catch (final Exception e) { + LOGGER.error("Error handling request", e); + return new MockResponse().setResponseCode(500); + } + } + } +}