Skip to content

Commit 7dcb368

Browse files
author
admitrov
committed
feat: enhance Kafka container configuration and add JWT decoder bean
1 parent 03e4e4d commit 7dcb368

File tree

7 files changed

+105
-111
lines changed

7 files changed

+105
-111
lines changed

embedded-kafka/src/main/java/com/playtika/testcontainer/kafka/configuration/SchemaRegistryContainerConfiguration.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
package com.playtika.testcontainer.kafka.configuration;
22

33
import com.playtika.testcontainer.common.utils.ContainerUtils;
4+
import com.playtika.testcontainer.kafka.properties.KafkaConfigurationProperties;
45
import com.playtika.testcontainer.kafka.properties.SchemaRegistryConfigurationProperties;
56
import lombok.extern.slf4j.Slf4j;
67
import org.springframework.beans.factory.annotation.Qualifier;
7-
import org.springframework.beans.factory.annotation.Value;
88
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
99
import org.springframework.boot.context.properties.EnableConfigurationProperties;
1010
import org.springframework.context.annotation.Bean;
@@ -16,6 +16,8 @@
1616
import java.util.LinkedHashMap;
1717

1818
import static com.playtika.testcontainer.common.utils.ContainerUtils.configureCommonsAndStart;
19+
import static com.playtika.testcontainer.kafka.configuration.KafkaContainerConfiguration.KAFKA_HOST_NAME;
20+
import static com.playtika.testcontainer.kafka.properties.KafkaConfigurationProperties.KAFKA_BEAN_NAME;
1921
import static com.playtika.testcontainer.kafka.properties.SchemaRegistryConfigurationProperties.SCHEMA_REGISTRY_BEAN_NAME;
2022
import static org.testcontainers.utility.MountableFile.forClasspathResource;
2123

@@ -30,9 +32,12 @@ public class SchemaRegistryContainerConfiguration {
3032
@Bean(name = SCHEMA_REGISTRY_BEAN_NAME, destroyMethod = "stop")
3133
public GenericContainer<?> schemaRegistry(
3234
SchemaRegistryConfigurationProperties properties,
33-
@Value("${embedded.kafka.containerBrokerList}") String kafkaContainerBrokerList,
35+
@Qualifier(KAFKA_BEAN_NAME) GenericContainer<?> kafka,
36+
KafkaConfigurationProperties kafkaProperties,
3437
Network network) {
3538

39+
String kafkaContainerBrokerList = String.format("%s:%d", KAFKA_HOST_NAME, kafkaProperties.getContainerBrokerPort());
40+
3641
GenericContainer<?> schemaRegistry = new GenericContainer<>(ContainerUtils.getDockerImageName(properties))
3742
.withCreateContainerCmdModifier(cmd -> cmd.withHostName(SCHEMA_REGISTRY_HOST_NAME))
3843
.withEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", "PLAINTEXT://" + kafkaContainerBrokerList)

embedded-keycloak/src/test/java/com/playtika/testcontainer/keycloak/spring/SpringTestApplication.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
1010
import org.springframework.security.config.annotation.web.configurers.AbstractHttpConfigurer;
1111
import org.springframework.security.config.http.SessionCreationPolicy;
12+
import org.springframework.security.oauth2.jwt.JwtDecoder;
13+
import org.springframework.security.oauth2.jwt.NimbusJwtDecoder;
1214
import org.springframework.security.web.SecurityFilterChain;
1315
import org.springframework.web.bind.annotation.GetMapping;
1416
import org.springframework.web.bind.annotation.RestController;
@@ -22,6 +24,14 @@ public class SpringTestApplication {
2224
@Value("${testing.keycloak.client}")
2325
private String client;
2426

27+
@Bean
28+
public JwtDecoder jwtDecoder(@Value("${embedded.keycloak.host}") String host,
29+
@Value("${embedded.keycloak.http-port}") String port,
30+
@Value("${testing.keycloak.realm}") String realm) {
31+
String issuerUri = String.format("http://%s:%s/realms/%s", host, port, realm);
32+
return NimbusJwtDecoder.withIssuerLocation(issuerUri).build();
33+
}
34+
2535
@Bean
2636
SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
2737
return http.authorizeHttpRequests(requests -> requests.requestMatchers("/api/**").fullyAuthenticated())

embedded-keycloak/src/test/resources/application.yml

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,6 @@ spring:
22
output:
33
ansi:
44
enabled: always
5-
security:
6-
oauth2:
7-
resourceserver:
8-
jwt:
9-
issuer-uri: http://${embedded.keycloak.host}:${embedded.keycloak.http-port}/realms/${testing.keycloak.realm}
105

116
logging:
127
level:

embedded-native-kafka/pom.xml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@
4646
<groupId>org.apache.kafka</groupId>
4747
<artifactId>kafka-clients</artifactId>
4848
</dependency>
49+
<dependency>
50+
<groupId>org.springframework</groupId>
51+
<artifactId>spring-test</artifactId>
52+
</dependency>
4953
<dependency>
5054
<groupId>org.springframework.boot</groupId>
5155
<artifactId>spring-boot-starter-web</artifactId>
@@ -92,4 +96,4 @@
9296
</plugin>
9397
</plugins>
9498
</build>
95-
</project>
99+
</project>

embedded-native-kafka/src/main/java/com/playtika/testcontainer/nativekafka/configuration/NativeKafkaContainerConfiguration.java

Lines changed: 23 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import com.playtika.testcontainer.nativekafka.NativeKafkaTopicsConfigurer;
44
import com.playtika.testcontainer.nativekafka.properties.NativeKafkaConfigurationProperties;
55
import lombok.extern.slf4j.Slf4j;
6+
import org.springframework.beans.factory.annotation.Qualifier;
67
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
78
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
89
import org.springframework.boot.context.properties.EnableConfigurationProperties;
910
import org.springframework.context.annotation.Bean;
1011
import org.springframework.context.annotation.Configuration;
11-
import org.springframework.core.env.ConfigurableEnvironment;
12-
import org.springframework.core.env.MapPropertySource;
12+
import org.springframework.test.context.DynamicPropertyRegistrar;
1313
import org.testcontainers.containers.BindMode;
1414
import org.testcontainers.containers.GenericContainer;
1515
import org.testcontainers.containers.Network;
@@ -26,7 +26,6 @@
2626
import java.nio.file.attribute.PosixFilePermissions;
2727
import java.time.LocalDateTime;
2828
import java.time.format.DateTimeFormatter;
29-
import java.util.LinkedHashMap;
3029
import java.util.Set;
3130
import java.util.stream.Stream;
3231

@@ -52,7 +51,6 @@ public Network nativeKafkaNetwork() {
5251
@Bean(name = NATIVE_KAFKA_BEAN_NAME, destroyMethod = "stop")
5352
public GenericContainer<?> nativeKafka(
5453
NativeKafkaConfigurationProperties nativeKafkaProperties,
55-
ConfigurableEnvironment environment,
5654
Network network) {
5755

5856
DockerImageName nativeKafkaImageName = DockerImageName.parse(nativeKafkaProperties.getDefaultDockerImage())
@@ -69,20 +67,37 @@ public GenericContainer<?> nativeKafka(
6967
// Configure and start the container using common utilities
7068
nativeKafka = (KafkaContainer) configureCommonsAndStart(nativeKafka, nativeKafkaProperties, log);
7169

72-
// Register environment properties
73-
registerNativeKafkaEnvironment(nativeKafka, environment, nativeKafkaProperties);
74-
7570
return nativeKafka;
7671
}
7772

7873
@Bean
7974
@ConditionalOnMissingBean
8075
public NativeKafkaTopicsConfigurer nativeKafkaTopicsConfigurer(
81-
GenericContainer<?> nativeKafka,
76+
@Qualifier(NATIVE_KAFKA_BEAN_NAME) GenericContainer<?> nativeKafka,
8277
NativeKafkaConfigurationProperties nativeKafkaProperties) {
8378
return new NativeKafkaTopicsConfigurer(nativeKafka, nativeKafkaProperties);
8479
}
8580

81+
@Bean
82+
public DynamicPropertyRegistrar nativeKafkaDynamicPropertyRegistrar(
83+
@Qualifier(NATIVE_KAFKA_BEAN_NAME) GenericContainer<?> nativeKafka,
84+
NativeKafkaConfigurationProperties nativeKafkaProperties) {
85+
return registry -> {
86+
String bootstrapServers = ((KafkaContainer) nativeKafka).getBootstrapServers();
87+
String host = nativeKafka.getHost();
88+
Integer port = nativeKafka.getMappedPort(nativeKafkaProperties.getKafkaPort());
89+
90+
registry.add("embedded.kafka.bootstrapServers", () -> bootstrapServers);
91+
registry.add("embedded.kafka.brokerList", () -> bootstrapServers);
92+
registry.add("embedded.kafka.networkAlias", () -> NATIVE_KAFKA_HOST_NAME);
93+
registry.add("embedded.kafka.host", () -> host);
94+
registry.add("embedded.kafka.port", () -> port);
95+
96+
log.info("Started native kafka broker. Connection details: bootstrapServers={}, host={}, port={}, networkAlias={}",
97+
bootstrapServers, host, port, NATIVE_KAFKA_HOST_NAME);
98+
};
99+
}
100+
86101
private void configureFileSystemBind(NativeKafkaConfigurationProperties nativeKafkaProperties, KafkaContainer nativeKafka) {
87102
NativeKafkaConfigurationProperties.FileSystemBind fileSystemBind = nativeKafkaProperties.getFileSystemBind();
88103
if (fileSystemBind.isEnabled()) {
@@ -96,24 +111,6 @@ private void configureFileSystemBind(NativeKafkaConfigurationProperties nativeKa
96111
}
97112
}
98113

99-
private void registerNativeKafkaEnvironment(GenericContainer<?> nativeKafka,
100-
ConfigurableEnvironment environment,
101-
NativeKafkaConfigurationProperties nativeKafkaProperties) {
102-
LinkedHashMap<String, Object> map = new LinkedHashMap<>();
103-
104-
String bootstrapServers = ((KafkaContainer) nativeKafka).getBootstrapServers();
105-
map.put("embedded.kafka.bootstrapServers", bootstrapServers);
106-
map.put("embedded.kafka.brokerList", bootstrapServers);
107-
map.put("embedded.kafka.networkAlias", NATIVE_KAFKA_HOST_NAME);
108-
map.put("embedded.kafka.host", nativeKafka.getHost());
109-
map.put("embedded.kafka.port", nativeKafka.getMappedPort(nativeKafkaProperties.getKafkaPort()));
110-
111-
MapPropertySource propertySource = new MapPropertySource("embeddedKafkaInfo", map);
112-
113-
log.info("Started native kafka broker. Connection details: {}", map);
114-
115-
environment.getPropertySources().addFirst(propertySource);
116-
}
117114

118115
private void createPathAndParentOrMakeWritable(Path path) {
119116
Stream.of(path.getParent(), path).forEach(p -> {

embedded-native-kafka/src/test/java/com/playtika/testcontainer/nativekafka/configuration/NativeKafkaContainerConfigurationTest.java

Lines changed: 10 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
import org.mockito.Mock;
1111
import org.mockito.MockedStatic;
1212
import org.mockito.junit.jupiter.MockitoExtension;
13-
import org.springframework.core.env.ConfigurableEnvironment;
14-
import org.springframework.core.env.MutablePropertySources;
1513
import org.testcontainers.containers.GenericContainer;
1614
import org.testcontainers.containers.Network;
1715
import org.testcontainers.kafka.KafkaContainer;
@@ -25,7 +23,6 @@
2523
import static org.mockito.ArgumentMatchers.any;
2624
import static org.mockito.ArgumentMatchers.eq;
2725
import static org.mockito.Mockito.mockStatic;
28-
import static org.mockito.Mockito.verify;
2926
import static org.mockito.Mockito.when;
3027

3128
@ExtendWith(MockitoExtension.class)
@@ -35,9 +32,6 @@ class NativeKafkaContainerConfigurationTest {
3532
@Mock
3633
private NativeKafkaConfigurationProperties properties;
3734

38-
@Mock
39-
private ConfigurableEnvironment environment;
40-
4135
@Mock
4236
private Network network;
4337

@@ -47,9 +41,6 @@ class NativeKafkaContainerConfigurationTest {
4741
@Mock
4842
private GenericContainer<?> genericContainer;
4943

50-
@Mock
51-
private MutablePropertySources mutablePropertySources;
52-
5344
@TempDir
5445
private Path tempDir;
5546

@@ -72,7 +63,6 @@ void shouldCreateNetworkWithProperConfiguration() {
7263
@Test
7364
@DisplayName("should create kafka container with proper docker image and network configuration")
7465
void shouldCreateKafkaContainerWithProperConfiguration() {
75-
when(environment.getPropertySources()).thenReturn(mutablePropertySources);
7666
when(properties.getDefaultDockerImage()).thenReturn("apache/kafka-native:4.0.0");
7767
when(properties.getFileSystemBind()).thenReturn(new NativeKafkaConfigurationProperties.FileSystemBind());
7868
when(properties.getKafkaPort()).thenReturn(9092);
@@ -87,7 +77,7 @@ void shouldCreateKafkaContainerWithProperConfiguration() {
8777
when(kafkaContainer.getHost()).thenReturn("localhost");
8878
when(kafkaContainer.getMappedPort(9092)).thenReturn(9092);
8979

90-
GenericContainer<?> result = configuration.nativeKafka(properties, environment, network);
80+
GenericContainer<?> result = configuration.nativeKafka(properties, network);
9181

9282
assertThat(result).isEqualTo(kafkaContainer);
9383
containerUtilsMock.verify(() -> configureCommonsAndStart(any(KafkaContainer.class), eq(properties), any()));
@@ -100,7 +90,6 @@ void shouldConfigureFileSystemBindWhenEnabled() throws IOException {
10090
NativeKafkaConfigurationProperties.FileSystemBind fileSystemBind =
10191
new NativeKafkaConfigurationProperties.FileSystemBind(true, tempDir.toString());
10292

103-
when(environment.getPropertySources()).thenReturn(mutablePropertySources);
10493
when(properties.getDefaultDockerImage()).thenReturn("apache/kafka-native:4.0.0");
10594
when(properties.getFileSystemBind()).thenReturn(fileSystemBind);
10695
when(properties.getKafkaPort()).thenReturn(9092);
@@ -115,7 +104,7 @@ void shouldConfigureFileSystemBindWhenEnabled() throws IOException {
115104
when(kafkaContainer.getHost()).thenReturn("localhost");
116105
when(kafkaContainer.getMappedPort(9092)).thenReturn(9092);
117106

118-
GenericContainer<?> result = configuration.nativeKafka(properties, environment, network);
107+
GenericContainer<?> result = configuration.nativeKafka(properties, network);
119108

120109
assertThat(result).isEqualTo(kafkaContainer);
121110

@@ -130,7 +119,6 @@ void shouldNotConfigureFileSystemBindWhenDisabled() {
130119
NativeKafkaConfigurationProperties.FileSystemBind fileSystemBind =
131120
new NativeKafkaConfigurationProperties.FileSystemBind(false, tempDir.toString());
132121

133-
when(environment.getPropertySources()).thenReturn(mutablePropertySources);
134122
when(properties.getDefaultDockerImage()).thenReturn("apache/kafka-native:4.0.0");
135123
when(properties.getFileSystemBind()).thenReturn(fileSystemBind);
136124
when(properties.getKafkaPort()).thenReturn(9092);
@@ -145,17 +133,16 @@ void shouldNotConfigureFileSystemBindWhenDisabled() {
145133
when(kafkaContainer.getHost()).thenReturn("localhost");
146134
when(kafkaContainer.getMappedPort(9092)).thenReturn(9092);
147135

148-
GenericContainer<?> result = configuration.nativeKafka(properties, environment, network);
136+
GenericContainer<?> result = configuration.nativeKafka(properties, network);
149137

150138
assertThat(result).isEqualTo(kafkaContainer);
151139
assertThat(Files.exists(tempDir.resolve("embedded-native-kafka-data"))).isFalse();
152140
}
153141
}
154142

155143
@Test
156-
@DisplayName("should register environment properties correctly")
157-
void shouldRegisterEnvironmentPropertiesCorrectly() {
158-
when(environment.getPropertySources()).thenReturn(mutablePropertySources);
144+
@DisplayName("should register properties correctly")
145+
void shouldRegisterPropertiesCorrectly() {
159146
when(properties.getDefaultDockerImage()).thenReturn("apache/kafka-native:4.0.0");
160147
when(properties.getFileSystemBind()).thenReturn(new NativeKafkaConfigurationProperties.FileSystemBind());
161148
when(properties.getKafkaPort()).thenReturn(9092);
@@ -170,9 +157,7 @@ void shouldRegisterEnvironmentPropertiesCorrectly() {
170157
when(kafkaContainer.getHost()).thenReturn("localhost");
171158
when(kafkaContainer.getMappedPort(9092)).thenReturn(12345);
172159

173-
configuration.nativeKafka(properties, environment, network);
174-
175-
verify(environment).getPropertySources();
160+
configuration.nativeKafka(properties, network);
176161
}
177162
}
178163

@@ -194,7 +179,6 @@ void shouldHandleDirectoryCreationWithProperPermissions() {
194179
NativeKafkaConfigurationProperties.FileSystemBind fileSystemBind =
195180
new NativeKafkaConfigurationProperties.FileSystemBind(true, testPath.toString());
196181

197-
when(environment.getPropertySources()).thenReturn(mutablePropertySources);
198182
when(properties.getDefaultDockerImage()).thenReturn("apache/kafka-native:4.0.0");
199183
when(properties.getFileSystemBind()).thenReturn(fileSystemBind);
200184
when(properties.getKafkaPort()).thenReturn(9092);
@@ -209,7 +193,7 @@ void shouldHandleDirectoryCreationWithProperPermissions() {
209193
when(kafkaContainer.getHost()).thenReturn("localhost");
210194
when(kafkaContainer.getMappedPort(9092)).thenReturn(9092);
211195

212-
configuration.nativeKafka(properties, environment, network);
196+
configuration.nativeKafka(properties, network);
213197

214198
assertThat(Files.exists(testPath)).isTrue();
215199
assertThat(Files.isDirectory(testPath)).isTrue();
@@ -226,7 +210,6 @@ void shouldHandleParentDirectoryCreation() throws IOException {
226210
NativeKafkaConfigurationProperties.FileSystemBind fileSystemBind =
227211
new NativeKafkaConfigurationProperties.FileSystemBind(true, testPath.toString());
228212

229-
when(environment.getPropertySources()).thenReturn(mutablePropertySources);
230213
when(properties.getDefaultDockerImage()).thenReturn("apache/kafka-native:4.0.0");
231214
when(properties.getFileSystemBind()).thenReturn(fileSystemBind);
232215
when(properties.getKafkaPort()).thenReturn(9092);
@@ -241,7 +224,7 @@ void shouldHandleParentDirectoryCreation() throws IOException {
241224
when(kafkaContainer.getHost()).thenReturn("localhost");
242225
when(kafkaContainer.getMappedPort(9092)).thenReturn(9092);
243226

244-
configuration.nativeKafka(properties, environment, network);
227+
configuration.nativeKafka(properties, network);
245228

246229
assertThat(Files.exists(parentPath)).isTrue();
247230
assertThat(Files.exists(testPath)).isTrue();
@@ -264,7 +247,6 @@ void shouldHandleExistingDirectoryScenario() throws IOException {
264247
NativeKafkaConfigurationProperties.FileSystemBind fileSystemBind =
265248
new NativeKafkaConfigurationProperties.FileSystemBind(true, existingPath.toString());
266249

267-
when(environment.getPropertySources()).thenReturn(mutablePropertySources);
268250
when(properties.getDefaultDockerImage()).thenReturn("apache/kafka-native:4.0.0");
269251
when(properties.getFileSystemBind()).thenReturn(fileSystemBind);
270252
when(properties.getKafkaPort()).thenReturn(9092);
@@ -279,9 +261,9 @@ void shouldHandleExistingDirectoryScenario() throws IOException {
279261
when(kafkaContainer.getHost()).thenReturn("localhost");
280262
when(kafkaContainer.getMappedPort(9092)).thenReturn(9092);
281263

282-
configuration.nativeKafka(properties, environment, network);
264+
configuration.nativeKafka(properties, network);
283265

284266
assertThat(Files.exists(existingPath)).isTrue();
285267
}
286268
}
287-
}
269+
}

0 commit comments

Comments
 (0)