Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions broker/src/main/java/io/moquette/BrokerConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public final class BrokerConstants {

public static final String INTERCEPT_HANDLER_PROPERTY_NAME = "intercept.handler";
public static final String BROKER_INTERCEPTOR_THREAD_POOL_SIZE = "intercept.thread_pool.size";
public static final String AUTH_THREAD_POOL_SIZE = "auth.intercept.thread_pool.size";
public static final String PERSISTENT_STORE_PROPERTY_NAME = "persistent_store";
public static final String AUTOSAVE_INTERVAL_PROPERTY_NAME = "autosave_interval";
public static final String PASSWORD_FILE_PROPERTY_NAME = "password_file";
Expand Down
86 changes: 53 additions & 33 deletions broker/src/main/java/io/moquette/broker/MQTTConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package io.moquette.broker;

import io.moquette.broker.subscriptions.Topic;
import io.moquette.broker.security.IAuthenticator;
import io.moquette.broker.subscriptions.Topic;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -28,15 +28,18 @@

import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.*;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.netty.channel.ChannelFutureListener.CLOSE_ON_FAILURE;
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.*;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.*;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_LEAST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;

final class MQTTConnection {

Expand Down Expand Up @@ -158,27 +161,36 @@ void processConnect(MqttConnectMessage msg) {
username, channel);
}

if (!login(msg, clientId)) {
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
channel.close().addListener(CLOSE_ON_FAILURE);
return;
}

try {
LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
sessionRegistry.bindToSession(this, msg, clientId);

initializeKeepAliveTimeout(channel, msg, clientId);
setupInflightResender(channel);

NettyUtils.clientID(channel, clientId);
LOG.trace("CONNACK sent, channel: {}", channel);
postOffice.dispatchConnection(msg);
LOG.trace("dispatch connection: {}", msg.toString());
} catch (SessionCorruptedException scex) {
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", clientId, channel);
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
final String newClientId = clientId;
CompletableFuture<Boolean> future = login(msg, newClientId);
future.whenComplete((status, t) -> {
if (t == null) {
if (status) {
try {
LOG.trace("Binding MQTTConnection (channel: {}) to session", channel);
sessionRegistry.bindToSession(this, msg, newClientId);

initializeKeepAliveTimeout(channel, msg, newClientId);
setupInflightResender(channel);

NettyUtils.clientID(channel, newClientId);
LOG.trace("CONNACK sent, channel: {}", channel);
postOffice.dispatchConnection(msg);
LOG.trace("dispatch connection: {}", msg.toString());
} catch (SessionCorruptedException scex) {
LOG.warn("MQTT session for client ID {} cannot be created, channel: {}", newClientId, channel);
abortConnection(CONNECTION_REFUSED_SERVER_UNAVAILABLE);
}
} else {
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
channel.close().addListener(CLOSE_ON_FAILURE);
}
} else {
LOG.warn("MQTT connection for client ID {} cannot be created, channel: {}. Error message {}", newClientId, channel, t.getMessage());
abortConnection(CONNECTION_REFUSED_BAD_USER_NAME_OR_PASSWORD);
channel.close().addListener(CLOSE_ON_FAILURE);
}
});
}

private void setupInflightResender(Channel channel) {
Expand Down Expand Up @@ -222,27 +234,35 @@ private MqttConnAckMessage connAck(MqttConnectReturnCode returnCode, boolean ses
return new MqttConnAckMessage(mqttFixedHeader, mqttConnAckVariableHeader);
}

private boolean login(MqttConnectMessage msg, final String clientId) {
private CompletableFuture<Boolean> login(MqttConnectMessage msg, final String clientId) {
// handle user authentication
if (msg.variableHeader().hasUserName()) {
byte[] pwd = null;
if (msg.variableHeader().hasPassword()) {
pwd = msg.payload().password().getBytes(StandardCharsets.UTF_8);
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.error("Client didn't supply any password and MQTT anonymous mode is disabled CId={}", clientId);
return false;
return CompletableFuture.completedFuture(false);
}
final String login = msg.payload().userName();
if (!authenticator.checkValid(clientId, login, pwd)) {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);
return false;
}
NettyUtils.userName(channel, login);

return authenticator.checkValid(clientId, login, pwd).handleAsync((status, t) -> {
if (t == null) {
if (status) NettyUtils.userName(channel, login);
else
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}", clientId, login);

return status;
} else {
LOG.error("Authenticator has rejected the MQTT credentials CId={}, username={}. Error message: {}", clientId, login, t.getMessage());
return false;
}
});
} else if (!brokerConfig.isAllowAnonymous()) {
LOG.error("Client didn't supply any credentials and MQTT anonymous mode is disabled. CId={}", clientId);
return false;
return CompletableFuture.completedFuture(false);
}
return true;
return CompletableFuture.completedFuture(true);
}

void handleConnectionLost() {
Expand Down
6 changes: 4 additions & 2 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,9 @@ void dispatchConnectionLost(String clientId,String userName){
}

void flushInFlight(MQTTConnection mqttConnection) {
Session targetSession = sessionRegistry.retrieve(mqttConnection.getClientId());
targetSession.flushAllQueuedMessages();
if(mqttConnection.getClientId() != null) {
Session targetSession = sessionRegistry.retrieve(mqttConnection.getClientId());
targetSession.flushAllQueuedMessages();
}
}
}
2 changes: 1 addition & 1 deletion broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private IAuthenticator initializeAuthenticator(IAuthenticator authenticator, ICo
if (passwdPath.isEmpty()) {
authenticator = new AcceptAllAuthenticator();
} else {
authenticator = new ResourceAuthenticator(resourceLoader, passwdPath);
authenticator = new ResourceAuthenticator(resourceLoader, passwdPath, props);
}
LOG.info("An {} authenticator instance will be used", authenticator.getClass().getName());
}
Expand Down
20 changes: 20 additions & 0 deletions broker/src/main/java/io/moquette/broker/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

/**
* Utility static methods, like Map get with default value, or elvis operator.
Expand All @@ -46,6 +48,24 @@ public static byte[] readBytesAndRewind(ByteBuf payload) {
return payloadContent;
}

public static void shutdownAndAwaitTermination(ExecutorService pool) {
pool.shutdown(); // Disable new tasks from being submitted
try {
// Wait a while for existing tasks to terminate
if (!pool.awaitTermination(30, TimeUnit.SECONDS)) {
pool.shutdownNow(); // Cancel currently executing tasks
// Wait a while for tasks to respond to being cancelled
if (!pool.awaitTermination(30, TimeUnit.SECONDS))
System.err.println("Pool did not terminate");
}
} catch (InterruptedException ie) {
// (Re-)Cancel if current thread also interrupted
pool.shutdownNow();
// Preserve interrupt status
Thread.currentThread().interrupt();
}
}

private Utils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package io.moquette.broker.security;

import java.util.concurrent.CompletableFuture;

public class AcceptAllAuthenticator implements IAuthenticator {

@Override
public boolean checkValid(String clientId, String username, byte[] password) {
return true;
public CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password) {
return CompletableFuture.completedFuture(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.zaxxer.hikari.HikariDataSource;
import io.moquette.BrokerConstants;
import io.moquette.broker.Utils;
import io.moquette.broker.config.IConfig;
import org.apache.commons.codec.binary.Hex;
import org.slf4j.Logger;
Expand All @@ -29,6 +30,9 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Load user credentials from a SQL database. sql driver must be provided at runtime
Expand All @@ -38,14 +42,16 @@ public class DBAuthenticator implements IAuthenticator {
private static final Logger LOG = LoggerFactory.getLogger(DBAuthenticator.class);

private final MessageDigest messageDigest;
private final ExecutorService executor;
private HikariDataSource dataSource;
private String sqlQuery;

public DBAuthenticator(IConfig conf) {
this(conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DRIVER, ""),
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_URL, ""),
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_QUERY, ""),
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DIGEST, ""));
conf.getProperty(BrokerConstants.DB_AUTHENTICATOR_DIGEST, ""),
Integer.parseInt(conf.getProperty(BrokerConstants.AUTH_THREAD_POOL_SIZE, "1")));
}

/**
Expand All @@ -59,11 +65,14 @@ public DBAuthenticator(IConfig conf) {
* : sql query like : "SELECT PASSWORD FROM USER WHERE LOGIN=?"
* @param digestMethod
* : password encoding algorithm : "MD5", "SHA-1", "SHA-256"
* @param authExecutorPoolSize
* : auth executor pool size. Defaults to 1.
*/
public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String digestMethod) {
public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String digestMethod, int authExecutorPoolSize) {
this.sqlQuery = sqlQuery;
this.dataSource = new HikariDataSource();
this.dataSource.setJdbcUrl(jdbcUrl);
this.executor = Executors.newFixedThreadPool(authExecutorPoolSize);

try {
this.messageDigest = MessageDigest.getInstance(digestMethod);
Expand All @@ -74,46 +83,52 @@ public DBAuthenticator(String driver, String jdbcUrl, String sqlQuery, String di
}

@Override
public synchronized boolean checkValid(String clientId, String username, byte[] password) {
// Check Username / Password in DB using sqlQuery
if (username == null || password == null) {
LOG.info("username or password was null");
return false;
}

ResultSet resultSet = null;
PreparedStatement preparedStatement = null;
Connection conn = null;
try {
conn = this.dataSource.getConnection();

preparedStatement = conn.prepareStatement(this.sqlQuery);
preparedStatement.setString(1, username);
resultSet = preparedStatement.executeQuery();
if (resultSet.next()) {
final String foundPwq = resultSet.getString(1);
messageDigest.update(password);
byte[] digest = messageDigest.digest();
String encodedPasswd = new String(Hex.encodeHex(digest));
return foundPwq.equals(encodedPasswd);
public synchronized CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password) {
return CompletableFuture.supplyAsync(() -> {
// Check Username / Password in DB using sqlQuery
if (username == null || password == null) {
LOG.info("username or password was null");
return false;
}
} catch (SQLException sqlex) {
LOG.error("Error quering DB for username: {}", username, sqlex);
} finally {

ResultSet resultSet = null;
PreparedStatement preparedStatement = null;
Connection conn = null;
try {
if (resultSet != null) {
resultSet.close();
}
if (preparedStatement != null) {
preparedStatement.close();
conn = this.dataSource.getConnection();

preparedStatement = conn.prepareStatement(this.sqlQuery);
preparedStatement.setString(1, username);
resultSet = preparedStatement.executeQuery();
if (resultSet.next()) {
final String foundPwq = resultSet.getString(1);
messageDigest.update(password);
byte[] digest = messageDigest.digest();
String encodedPasswd = new String(Hex.encodeHex(digest));
return foundPwq.equals(encodedPasswd);
}
if (conn != null) {
conn.close();
} catch (SQLException sqlex) {
LOG.error("Error quering DB for username: {}", username, sqlex);
} finally {
try {
if (resultSet != null) {
resultSet.close();
}
if (preparedStatement != null) {
preparedStatement.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException e) {
LOG.error("Error releasing connection to the datasource", username, e);
}
} catch (SQLException e) {
LOG.error("Error releasing connection to the datasource", username, e);
}
}
return false;
return false;
}, executor);
}

public void cleanup() {
Utils.shutdownAndAwaitTermination(executor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package io.moquette.broker.security;

import io.moquette.broker.config.FileResourceLoader;
import io.moquette.broker.config.IConfig;

/**
* Load user credentials from a text file. Each line of the file is formatted as
Expand All @@ -35,7 +36,7 @@
*/
public class FileAuthenticator extends ResourceAuthenticator {

public FileAuthenticator(String parent, String filePath) {
super(new FileResourceLoader(parent), filePath);
public FileAuthenticator(String parent, String filePath, IConfig config) {
super(new FileResourceLoader(parent), filePath, config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package io.moquette.broker.security;

import java.util.concurrent.CompletableFuture;

/**
* username and password checker
*/
public interface IAuthenticator {

boolean checkValid(String clientId, String username, byte[] password);
CompletableFuture<Boolean> checkValid(String clientId, String username, byte[] password);
}
Loading