Skip to content

Commit 4a2ce3b

Browse files
committed
[improve][proxy] Refactor broker and proxy auth
1 parent cc030d6 commit 4a2ce3b

File tree

9 files changed

+548
-282
lines changed

9 files changed

+548
-282
lines changed

pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/BinaryAuthSession.java

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ public class BinaryAuthSession {
5151
private final BinaryAuthContext context;
5252

5353
private AuthResult defaultAuthResult;
54+
private boolean supportsAuthRefresh;
5455

5556
public BinaryAuthSession(@NonNull BinaryAuthContext context) {
5657
this.context = context;
@@ -59,6 +60,8 @@ public BinaryAuthSession(@NonNull BinaryAuthContext context) {
5960
public CompletableFuture<AuthResult> doAuthentication() {
6061
var connect = context.getCommandConnect();
6162
try {
63+
supportsAuthRefresh = connect.getFeatureFlags().hasSupportsAuthRefresh() && connect.getFeatureFlags()
64+
.isSupportsAuthRefresh();
6265
var authData = connect.hasAuthData() ? connect.getAuthData() : emptyArray;
6366
var clientData = AuthData.of(authData);
6467
// init authentication
@@ -267,6 +270,47 @@ private CompletableFuture<Void> authenticateOriginalData() {
267270
}, context.getExecutor());
268271
}
269272

273+
public boolean isExpired() {
274+
if (originalAuthState != null) {
275+
return originalAuthState.isExpired();
276+
}
277+
return authState.isExpired();
278+
}
279+
280+
public boolean supportsAuthenticationRefresh(){
281+
if (originalPrincipal != null && originalAuthState == null) {
282+
// This case is only checked when the authState is expired because we've reached a point where
283+
// authentication needs to be refreshed, but the protocol does not support it unless the proxy forwards
284+
// the originalAuthData.
285+
log.info(
286+
"[{}] Cannot revalidate user credential when using proxy and"
287+
+ " not forwarding the credentials.",
288+
context.getRemoteAddress());
289+
return false;
290+
}
291+
292+
if (!supportsAuthRefresh) {
293+
log.warn("[{}] Client doesn't support auth credentials refresh",
294+
context.getRemoteAddress());
295+
return false;
296+
}
297+
298+
return true;
299+
}
300+
301+
public AuthResult refreshAuthentication() throws AuthenticationException {
302+
if (originalAuthState != null) {
303+
return AuthResult.builder()
304+
.authData(originalAuthState.refreshAuthentication())
305+
.authMethod(originalAuthMethod)
306+
.build();
307+
}
308+
return AuthResult.builder()
309+
.authData(authState.refreshAuthentication())
310+
.authMethod(authMethod)
311+
.build();
312+
}
313+
270314
@Builder
271315
@Getter
272316
public static class AuthResult {

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java

Lines changed: 65 additions & 76 deletions
Large diffs are not rendered by default.

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ServerCnxTest.java

Lines changed: 81 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -680,6 +680,7 @@ public void testAuthChallengePrincipalChangeFails() throws Exception {
680680
Object responseConnected = getResponse();
681681
assertTrue(responseConnected instanceof CommandConnected);
682682
assertEquals(serverCnx.getState(), State.Connected);
683+
assertEquals(serverCnx.getAuthRole(), "pass.client");
683684
assertEquals(serverCnx.getPrincipal(), "pass.client");
684685
assertTrue(serverCnx.isActive());
685686

@@ -1273,7 +1274,7 @@ private class ClientChannel implements Closeable {
12731274
4),
12741275
serverCnx);
12751276
public ClientChannel() {
1276-
serverCnx.setAuthRole("");
1277+
serverCnx.clearBinaryAuthSession();
12771278
}
12781279
public void close(){
12791280
if (channel != null && channel.isActive()) {
@@ -1681,6 +1682,51 @@ public void testProducerOnNotOwnedTopic() throws Exception {
16811682
channel.finish();
16821683
}
16831684

1685+
private PulsarAuthorizationProvider injectAuth() throws Exception {
1686+
svcConfig.setAuthorizationEnabled(true);
1687+
AuthorizationService authorizationService =
1688+
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
1689+
Field providerField = AuthorizationService.class.getDeclaredField("provider");
1690+
providerField.setAccessible(true);
1691+
PulsarAuthorizationProvider authorizationProvider =
1692+
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
1693+
pulsar.getPulsarResources());
1694+
providerField.set(authorizationService, authorizationProvider);
1695+
doReturn(authorizationService).when(brokerService).getAuthorizationService();
1696+
svcConfig.setAuthorizationEnabled(true);
1697+
1698+
AuthenticationService authenticationService = mock(AuthenticationService.class);
1699+
// use a dummy authentication provider
1700+
AuthenticationProvider authenticationProvider = new AuthenticationProvider() {
1701+
@Override
1702+
public void initialize(ServiceConfiguration config) throws IOException {
1703+
1704+
}
1705+
1706+
@Override
1707+
public String authenticate(AuthenticationDataSource authData) throws AuthenticationException {
1708+
return "role";
1709+
}
1710+
1711+
@Override
1712+
public String getAuthMethodName() {
1713+
return "dummy";
1714+
}
1715+
1716+
@Override
1717+
public void close() throws IOException {
1718+
1719+
}
1720+
};
1721+
1722+
String authMethodName = authenticationProvider.getAuthMethodName();
1723+
when(brokerService.getAuthenticationService()).thenReturn(authenticationService);
1724+
when(authenticationService.getAuthenticationProvider(authMethodName)).thenReturn(authenticationProvider);
1725+
svcConfig.setAuthenticationEnabled(true);
1726+
1727+
return authorizationProvider;
1728+
}
1729+
16841730
@Test(timeOut = 30000)
16851731
public void testProducerCommandWithAuthorizationPositive() throws Exception {
16861732
AuthorizationService authorizationService = mock(AuthorizationService.class);
@@ -1709,32 +1755,27 @@ public void testProducerCommandWithAuthorizationPositive() throws Exception {
17091755

17101756
@Test(timeOut = 30000)
17111757
public void testNonExistentTopic() throws Exception {
1712-
AuthorizationService authorizationService =
1713-
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
1714-
doReturn(authorizationService).when(brokerService).getAuthorizationService();
1715-
svcConfig.setAuthorizationEnabled(true);
1716-
svcConfig.setAuthorizationEnabled(true);
1717-
Field providerField = AuthorizationService.class.getDeclaredField("provider");
1718-
providerField.setAccessible(true);
1719-
PulsarAuthorizationProvider authorizationProvider =
1720-
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
1721-
pulsar.getPulsarResources());
1722-
providerField.set(authorizationService, authorizationProvider);
1758+
resetChannel();
1759+
PulsarAuthorizationProvider authorizationProvider = injectAuth();
17231760
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
17241761
.isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
17251762

1763+
// Connect
1764+
ByteBuf connectCommand = Commands.newConnect("dummy", "", null);
1765+
BinaryAuthSession binaryAuthSession =
1766+
spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig);
1767+
when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
1768+
channel.writeInbound(connectCommand);
1769+
Object response = getResponse();
1770+
assertTrue(response instanceof CommandConnected);
1771+
17261772
// Test producer creation
1727-
resetChannel();
1728-
setChannelConnected();
17291773
ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
17301774
"prod-name", Collections.emptyMap(), false);
17311775
channel.writeInbound(newProducerCmd);
17321776
assertTrue(getResponse() instanceof CommandError);
1733-
channel.finish();
17341777

17351778
// Test consumer creation
1736-
resetChannel();
1737-
setChannelConnected();
17381779
ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, //
17391780
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
17401781
"test" /* consumer name */, 0);
@@ -1745,17 +1786,9 @@ public void testNonExistentTopic() throws Exception {
17451786

17461787
@Test(timeOut = 30000)
17471788
public void testClusterAccess() throws Exception {
1748-
svcConfig.setAuthorizationEnabled(true);
1749-
AuthorizationService authorizationService =
1750-
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
1751-
Field providerField = AuthorizationService.class.getDeclaredField("provider");
1752-
providerField.setAccessible(true);
1753-
PulsarAuthorizationProvider authorizationProvider =
1754-
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
1755-
pulsar.getPulsarResources());
1756-
providerField.set(authorizationService, authorizationProvider);
1757-
doReturn(authorizationService).when(brokerService).getAuthorizationService();
1758-
svcConfig.setAuthorizationEnabled(true);
1789+
resetChannel();
1790+
1791+
PulsarAuthorizationProvider authorizationProvider = injectAuth();
17591792
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
17601793
.isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
17611794
doReturn(CompletableFuture.completedFuture(false)).when(authorizationProvider)
@@ -1764,15 +1797,20 @@ public void testClusterAccess() throws Exception {
17641797
.checkPermission(any(TopicName.class), Mockito.anyString(),
17651798
any(AuthAction.class));
17661799

1767-
resetChannel();
1768-
setChannelConnected();
1800+
// Connect
1801+
ByteBuf connectCommand = Commands.newConnect("dummy", "", null);
1802+
BinaryAuthSession binaryAuthSession =
1803+
spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig);
1804+
when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
1805+
channel.writeInbound(connectCommand);
1806+
Object response = getResponse();
1807+
assertTrue(response instanceof CommandConnected);
1808+
17691809
ByteBuf clientCommand = Commands.newProducer(successTopicName, 1 /* producer id */, 1 /* request id */,
17701810
"prod-name", Collections.emptyMap(), false);
17711811
channel.writeInbound(clientCommand);
17721812
assertTrue(getResponse() instanceof CommandProducerSuccess);
17731813

1774-
resetChannel();
1775-
setChannelConnected();
17761814
clientCommand = Commands.newProducer(topicWithNonLocalCluster, 1 /* producer id */, 1 /* request id */,
17771815
"prod-name", Collections.emptyMap(), false);
17781816
channel.writeInbound(clientCommand);
@@ -1782,22 +1820,21 @@ public void testClusterAccess() throws Exception {
17821820

17831821
@Test(timeOut = 30000)
17841822
public void testNonExistentTopicSuperUserAccess() throws Exception {
1785-
AuthorizationService authorizationService =
1786-
spyWithClassAndConstructorArgs(AuthorizationService.class, svcConfig, pulsar.getPulsarResources());
1787-
doReturn(authorizationService).when(brokerService).getAuthorizationService();
1788-
svcConfig.setAuthorizationEnabled(true);
1789-
Field providerField = AuthorizationService.class.getDeclaredField("provider");
1790-
providerField.setAccessible(true);
1791-
PulsarAuthorizationProvider authorizationProvider =
1792-
spyWithClassAndConstructorArgs(PulsarAuthorizationProvider.class, svcConfig,
1793-
pulsar.getPulsarResources());
1794-
providerField.set(authorizationService, authorizationProvider);
1823+
resetChannel();
1824+
PulsarAuthorizationProvider authorizationProvider = injectAuth();
17951825
doReturn(CompletableFuture.completedFuture(true)).when(authorizationProvider)
17961826
.isSuperUser(Mockito.anyString(), Mockito.any(), Mockito.any());
17971827

1828+
// Connect
1829+
ByteBuf connectCommand = Commands.newConnect("dummy", "", null);
1830+
BinaryAuthSession binaryAuthSession =
1831+
spyBinaryAuthSession(brokerService.getAuthenticationService(), connectCommand.copy(), svcConfig);
1832+
when(brokerService.getAuthenticationService().createBinaryAuthSession(any())).thenReturn(binaryAuthSession);
1833+
channel.writeInbound(connectCommand);
1834+
Object response = getResponse();
1835+
assertTrue(response instanceof CommandConnected);
1836+
17981837
// Test producer creation
1799-
resetChannel();
1800-
setChannelConnected();
18011838
ByteBuf newProducerCmd = Commands.newProducer(nonExistentTopicName, 1 /* producer id */, 1 /* request id */,
18021839
"prod-name", Collections.emptyMap(), false);
18031840
channel.writeInbound(newProducerCmd);
@@ -1806,11 +1843,8 @@ public void testNonExistentTopicSuperUserAccess() throws Exception {
18061843
PersistentTopic topicRef = (PersistentTopic) brokerService.getTopicReference(nonExistentTopicName).get();
18071844
assertNotNull(topicRef);
18081845
assertEquals(topicRef.getProducers().size(), 1);
1809-
channel.finish();
18101846

18111847
// Test consumer creation
1812-
resetChannel();
1813-
setChannelConnected();
18141848
ByteBuf newSubscribeCmd = Commands.newSubscribe(nonExistentTopicName, //
18151849
successSubName, 1 /* consumer id */, 1 /* request id */, SubType.Exclusive, 0,
18161850
"test" /* consumer name */, 0 /* avoid reseting cursor */);
@@ -2916,7 +2950,6 @@ protected void resetChannel() throws Exception {
29162950
channel.close().get();
29172951
}
29182952
serverCnx = new ServerCnx(pulsar);
2919-
serverCnx.setAuthRole("");
29202953
channel = new EmbeddedChannel(new LengthFieldBasedFrameDecoder(
29212954
maxMessageSize,
29222955
0,

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/DirectProxyHandler.java

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,17 @@
4343
import io.netty.handler.timeout.ReadTimeoutHandler;
4444
import io.netty.util.CharsetUtil;
4545
import java.net.InetSocketAddress;
46+
import java.nio.charset.StandardCharsets;
4647
import java.util.Arrays;
4748
import java.util.Map;
4849
import java.util.concurrent.ConcurrentHashMap;
4950
import java.util.concurrent.TimeUnit;
5051
import lombok.Getter;
5152
import lombok.SneakyThrows;
5253
import org.apache.pulsar.PulsarVersion;
54+
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
55+
import org.apache.pulsar.broker.authentication.AuthenticationState;
56+
import org.apache.pulsar.broker.authentication.BinaryAuthSession;
5357
import org.apache.pulsar.client.api.Authentication;
5458
import org.apache.pulsar.client.api.AuthenticationDataProvider;
5559
import org.apache.pulsar.client.api.PulsarClientException;
@@ -100,9 +104,23 @@ public DirectProxyHandler(ProxyService service, ProxyConnection proxyConnection)
100104
this.inboundChannel = proxyConnection.ctx().channel();
101105
this.proxyConnection = proxyConnection;
102106
this.inboundChannelRequestsRate = new Rate();
103-
this.originalPrincipal = proxyConnection.clientAuthRole;
104-
this.clientAuthData = proxyConnection.clientAuthData;
105-
this.clientAuthMethod = proxyConnection.clientAuthMethod;
107+
BinaryAuthSession binaryAuthSession = proxyConnection.getBinaryAuthSession();
108+
if (binaryAuthSession != null) {
109+
AuthenticationState originalAuthState = binaryAuthSession.getOriginalAuthState();
110+
boolean forwardOriginal =
111+
originalAuthState != null && service.getConfiguration().isForwardAuthorizationCredentials();
112+
AuthenticationDataSource authDataSource = forwardOriginal ? binaryAuthSession.getOriginalAuthData() :
113+
binaryAuthSession.getAuthenticationData();
114+
clientAuthData = AuthData.of(authDataSource.getCommandData().getBytes(StandardCharsets.UTF_8));
115+
clientAuthMethod =
116+
forwardOriginal ? binaryAuthSession.getOriginalAuthMethod() : binaryAuthSession.getAuthMethod();
117+
originalPrincipal =
118+
forwardOriginal ? binaryAuthSession.getOriginalPrincipal() : binaryAuthSession.getAuthRole();
119+
} else {
120+
originalPrincipal = null;
121+
clientAuthData = null;
122+
clientAuthMethod = null;
123+
}
106124
this.tlsEnabledWithBroker = service.getConfiguration().isTlsEnabledWithBroker();
107125
this.tlsHostnameVerificationEnabled = service.getConfiguration().isTlsHostnameVerificationEnabled();
108126
this.onHandshakeCompleteAction = proxyConnection::cancelKeepAliveTask;

0 commit comments

Comments
 (0)