Skip to content

Commit 3fe6e9b

Browse files
author
Dinesh Narayanan
committed
Authentication Changes: IAuthenticator.checkValid should return CompletableFuture<Boolean>
1 parent 6ee0137 commit 3fe6e9b

17 files changed

+264
-123
lines changed

broker/src/main/java/io/moquette/BrokerConstants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public final class BrokerConstants {
2222

2323
public static final String INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler";
2424
public static final String BROKER_INTERCEPTOR_THREAD_POOL_SIZE = "intercept.thread_pool.size";
25+
public static final String AUTH_THREAD_POOL_SIZE = "auth.intercept.thread_pool.size";
2526
public static final String PERSISTENT_STORE_PROPERTY_NAME = "persistent_store";
2627
public static final String AUTOSAVE_INTERVAL_PROPERTY_NAME = "autosave_interval";
2728
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";

broker/src/main/java/io/moquette/broker/MQTTConnection.java

Lines changed: 54 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@
1515
*/
1616
package io.moquette.broker;
1717

18-
import io.moquette.broker.subscriptions.Topic;
1918
import io.moquette.broker.security.IAuthenticator;
19+
import io.moquette.broker.subscriptions.Topic;
2020
import io.netty.buffer.ByteBuf;
2121
import io.netty.channel.Channel;
2222
import io.netty.channel.ChannelFuture;
@@ -28,15 +28,18 @@
2828

2929
import java.net.InetSocketAddress;
3030
import java.nio.charset.StandardCharsets;
31-
import java.util.*;
31+
import java.util.List;
32+
import java.util.UUID;
33+
import java.util.concurrent.CompletableFuture;
3234
import java.util.concurrent.TimeUnit;
3335
import java.util.concurrent.atomic.AtomicInteger;
3436

3537
import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
3638
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
3739
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
3840
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
39-
import static io.netty.handler.codec.mqtt.MqttQoS.*;
41+
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
42+
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
4043

4144
final class MQTTConnection {
4245

@@ -158,27 +161,36 @@ void processConnect(MqttConnectMessage msg) {
158161
username, channel);
159162
}
160163

161-
if (!login(msg, clientId)) {
162-
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
163-
channel.close().addListener(CLOSE_ON_FAILURE);
164-
return;
165-
}
166-
167-
try {
168-
LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
169-
sessionRegistry.bindToSession(this, msg, clientId);
170-
171-
initializeKeepAliveTimeout(channel, msg, clientId);
172-
setupInflightResender(channel);
173-
174-
NettyUtils.clientID(channel, clientId);
175-
LOG.trace("CONNACK sent, channel: {}", channel);
176-
postOffice.dispatchConnection(msg);
177-
LOG.trace("dispatch connection: {}", msg.toString());
178-
} catch (SessionCorruptedException scex) {
179-
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
180-
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
181-
}
164+
final String newClientId = clientId;
165+
CompletableFuture<Boolean> future = login(msg, newClientId);
166+
future.whenComplete((status, t) -> {
167+
if (t == null) {
168+
if (status) {
169+
try {
170+
LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
171+
sessionRegistry.bindToSession(this, msg, newClientId);
172+
173+
initializeKeepAliveTimeout(channel, msg, newClientId);
174+
setupInflightResender(channel);
175+
176+
NettyUtils.clientID(channel, newClientId);
177+
LOG.trace("CONNACK sent, channel: {}", channel);
178+
postOffice.dispatchConnection(msg);
179+
LOG.trace("dispatch connection: {}", msg.toString());
180+
} catch (SessionCorruptedException scex) {
181+
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", newClientId, channel);
182+
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
183+
}
184+
} else {
185+
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
186+
channel.close().addListener(CLOSE_ON_FAILURE);
187+
}
188+
} else {
189+
LOG.warn("MQTT connection for client ID {} cannot be created, channel: {}. Error message {}", newClientId, channel, t.getMessage());
190+
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
191+
channel.close().addListener(CLOSE_ON_FAILURE);
192+
}
193+
});
182194
}
183195

184196
private void setupInflightResender(Channel channel) {
@@ -222,27 +234,36 @@ private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean ses
222234
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
223235
}
224236

225-
private boolean login(MqttConnectMessage msg, final String clientId) {
237+
private CompletableFuture<Boolean> login(MqttConnectMessage msg, final String clientId) {
226238
// handle user authentication
227239
if (msg.variableHeader().hasUserName()) {
228240
byte[] pwd = null;
229241
if (msg.variableHeader().hasPassword()) {
230242
pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8);
231243
} else if (!brokerConfig.isAllowAnonymous()) {
232244
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
233-
return false;
245+
return CompletableFuture.completedFuture(false);
234246
}
235247
final String login = msg.payload().userName();
236-
if (!authenticator.checkValid(clientId, login, pwd)) {
237-
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
238-
return false;
239-
}
240-
NettyUtils.userName(channel, login);
248+
249+
return authenticator.checkValid(clientId, login, pwd).handleAsync((status, t) -> {
250+
if (t == null) {
251+
LOG.info("Authenticator has succeeded the MQTT credentials CId={}, username={}", clientId, login); // TODO delete me
252+
if (status) NettyUtils.userName(channel, login);
253+
else
254+
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
255+
256+
return status;
257+
} else {
258+
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}. Error message: {}", clientId, login, t.getMessage());
259+
return false;
260+
}
261+
});
241262
} else if (!brokerConfig.isAllowAnonymous()) {
242263
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
243-
return false;
264+
return CompletableFuture.completedFuture(false);
244265
}
245-
return true;
266+
return CompletableFuture.completedFuture(true);
246267
}
247268

248269
void handleConnectionLost() {

broker/src/main/java/io/moquette/broker/PostOffice.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,9 @@ void dispatchConnectionLost(String clientId,String userName){
310310
}
311311

312312
void flushInFlight(MQTTConnection mqttConnection) {
313-
Session targetSession = sessionRegistry.retrieve(mqttConnection.getClientId());
314-
targetSession.flushAllQueuedMessages();
313+
if(mqttConnection.getClientId() != null) {
314+
Session targetSession = sessionRegistry.retrieve(mqttConnection.getClientId());
315+
targetSession.flushAllQueuedMessages();
316+
}
315317
}
316318
}

broker/src/main/java/io/moquette/broker/Server.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, ICo
232232
if (passwdPath.isEmpty()) {
233233
authenticator = new AcceptAllAuthenticator();
234234
} else {
235-
authenticator = new ResourceAuthenticator(resourceLoader, passwdPath);
235+
authenticator = new ResourceAuthenticator(resourceLoader, passwdPath, props);
236236
}
237237
LOG.info("An {} authenticator instance will be used", authenticator.getClass().getName());
238238
}

broker/src/main/java/io/moquette/broker/Utils.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import io.netty.handler.codec.mqtt.MqttMessage;
2121
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
2222
import java.util.Map;
23+
import java.util.concurrent.ExecutorService;
24+
import java.util.concurrent.TimeUnit;
2325

2426
/**
2527
* Utility static methods, like Map get with default value, or elvis operator.
@@ -46,6 +48,24 @@ public static byte[] readBytesAndRewind(ByteBuf payload) {
4648
return payloadContent;
4749
}
4850

51+
public static void shutdownAndAwaitTermination(ExecutorService pool) {
52+
pool.shutdown(); // Disable new tasks from being submitted
53+
try {
54+
// Wait a while for existing tasks to terminate
55+
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
56+
pool.shutdownNow(); // Cancel currently executing tasks
57+
// Wait a while for tasks to respond to being cancelled
58+
if (!pool.awaitTermination(30, TimeUnit.SECONDS))
59+
System.err.println("Pool did not terminate");
60+
}
61+
} catch (InterruptedException ie) {
62+
// (Re-)Cancel if current thread also interrupted
63+
pool.shutdownNow();
64+
// Preserve interrupt status
65+
Thread.currentThread().interrupt();
66+
}
67+
}
68+
4969
private Utils() {
5070
}
5171
}

broker/src/main/java/io/moquette/broker/security/AcceptAllAuthenticator.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package io.moquette.broker.security;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
1921
public class AcceptAllAuthenticator implements IAuthenticator {
2022

2123
@Override
22-
public boolean checkValid(String clientId, String username, byte[] password) {
23-
return true;
24+
public CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password) {
25+
return CompletableFuture.completedFuture(true);
2426
}
2527
}

broker/src/main/java/io/moquette/broker/security/DBAuthenticator.java

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.zaxxer.hikari.HikariDataSource;
2020
import io.moquette.BrokerConstants;
21+
import io.moquette.broker.Utils;
2122
import io.moquette.broker.config.IConfig;
2223
import org.apache.commons.codec.binary.Hex;
2324
import org.slf4j.Logger;
@@ -29,6 +30,9 @@
2930
import java.sql.PreparedStatement;
3031
import java.sql.ResultSet;
3132
import java.sql.SQLException;
33+
import java.util.concurrent.CompletableFuture;
34+
import java.util.concurrent.ExecutorService;
35+
import java.util.concurrent.Executors;
3236

3337
/**
3438
* Load user credentials from a SQL database. sql driver must be provided at runtime
@@ -38,14 +42,16 @@ public class DBAuthenticator implements IAuthenticator {
3842
private static final Logger LOG = LoggerFactory.getLogger(DBAuthenticator.class);
3943

4044
private final MessageDigest messageDigest;
45+
private final ExecutorService executor;
4146
private HikariDataSource dataSource;
4247
private String sqlQuery;
4348

4449
public DBAuthenticator(IConfig conf) {
4550
this(conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DRIVER, ""),
4651
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_URL, ""),
4752
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_QUERY, ""),
48-
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DIGEST, ""));
53+
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DIGEST, ""),
54+
Integer.parseInt(conf.getProperty(BrokerConstants.AUTH_THREAD_POOL_SIZE, "1")));
4955
}
5056

5157
/**
@@ -59,11 +65,14 @@ public DBAuthenticator(IConfig conf) {
5965
* : sql query like : "SELECT PASSWORD FROM USER WHERE LOGIN=?"
6066
* @param digestMethod
6167
* : password encoding algorithm : "MD5", "SHA-1", "SHA-256"
68+
* @param authExecutorPoolSize
69+
* : auth executor pool size. Defaults to 1.
6270
*/
63-
public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String digestMethod) {
71+
public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String digestMethod, int authExecutorPoolSize) {
6472
this.sqlQuery = sqlQuery;
6573
this.dataSource = new HikariDataSource();
6674
this.dataSource.setJdbcUrl(jdbcUrl);
75+
this.executor = Executors.newFixedThreadPool(authExecutorPoolSize);
6776

6877
try {
6978
this.messageDigest = MessageDigest.getInstance(digestMethod);
@@ -74,46 +83,52 @@ public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String di
7483
}
7584

7685
@Override
77-
public synchronized boolean checkValid(String clientId, String username, byte[] password) {
78-
// Check Username / Password in DB using sqlQuery
79-
if (username == null || password == null) {
80-
LOG.info("username or password was null");
81-
return false;
82-
}
83-
84-
ResultSet resultSet = null;
85-
PreparedStatement preparedStatement = null;
86-
Connection conn = null;
87-
try {
88-
conn = this.dataSource.getConnection();
89-
90-
preparedStatement = conn.prepareStatement(this.sqlQuery);
91-
preparedStatement.setString(1, username);
92-
resultSet = preparedStatement.executeQuery();
93-
if (resultSet.next()) {
94-
final String foundPwq = resultSet.getString(1);
95-
messageDigest.update(password);
96-
byte[] digest = messageDigest.digest();
97-
String encodedPasswd = new String(Hex.encodeHex(digest));
98-
return foundPwq.equals(encodedPasswd);
86+
public synchronized CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password) {
87+
return CompletableFuture.supplyAsync(() -> {
88+
// Check Username / Password in DB using sqlQuery
89+
if (username == null || password == null) {
90+
LOG.info("username or password was null");
91+
return false;
9992
}
100-
} catch (SQLException sqlex) {
101-
LOG.error("Error quering DB for username: {}", username, sqlex);
102-
} finally {
93+
94+
ResultSet resultSet = null;
95+
PreparedStatement preparedStatement = null;
96+
Connection conn = null;
10397
try {
104-
if (resultSet != null) {
105-
resultSet.close();
106-
}
107-
if (preparedStatement != null) {
108-
preparedStatement.close();
98+
conn = this.dataSource.getConnection();
99+
100+
preparedStatement = conn.prepareStatement(this.sqlQuery);
101+
preparedStatement.setString(1, username);
102+
resultSet = preparedStatement.executeQuery();
103+
if (resultSet.next()) {
104+
final String foundPwq = resultSet.getString(1);
105+
messageDigest.update(password);
106+
byte[] digest = messageDigest.digest();
107+
String encodedPasswd = new String(Hex.encodeHex(digest));
108+
return foundPwq.equals(encodedPasswd);
109109
}
110-
if (conn != null) {
111-
conn.close();
110+
} catch (SQLException sqlex) {
111+
LOG.error("Error quering DB for username: {}", username, sqlex);
112+
} finally {
113+
try {
114+
if (resultSet != null) {
115+
resultSet.close();
116+
}
117+
if (preparedStatement != null) {
118+
preparedStatement.close();
119+
}
120+
if (conn != null) {
121+
conn.close();
122+
}
123+
} catch (SQLException e) {
124+
LOG.error("Error releasing connection to the datasource", username, e);
112125
}
113-
} catch (SQLException e) {
114-
LOG.error("Error releasing connection to the datasource", username, e);
115126
}
116-
}
117-
return false;
127+
return false;
128+
}, executor);
129+
}
130+
131+
public void cleanup() {
132+
Utils.shutdownAndAwaitTermination(executor);
118133
}
119134
}

broker/src/main/java/io/moquette/broker/security/FileAuthenticator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package io.moquette.broker.security;
1818

1919
import io.moquette.broker.config.FileResourceLoader;
20+
import io.moquette.broker.config.IConfig;
2021

2122
/**
2223
* Load user credentials from a text file. Each line of the file is formatted as
@@ -35,7 +36,7 @@
3536
*/
3637
public class FileAuthenticator extends ResourceAuthenticator {
3738

38-
public FileAuthenticator(String parent, String filePath) {
39-
super(new FileResourceLoader(parent), filePath);
39+
public FileAuthenticator(String parent, String filePath, IConfig config) {
40+
super(new FileResourceLoader(parent), filePath, config);
4041
}
4142
}

broker/src/main/java/io/moquette/broker/security/IAuthenticator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616

1717
package io.moquette.broker.security;
1818

19+
import java.util.concurrent.CompletableFuture;
20+
1921
/**
2022
* username and password checker
2123
*/
2224
public interface IAuthenticator {
2325

24-
boolean checkValid(String clientId, String username, byte[] password);
26+
CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password);
2527
}

0 commit comments

Comments
 (0)