Skip to content
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
.idea/
*.iml
broker/target/
metrics_prometheus/target/
client/target/
parser_commons/target/
netty_parser/target/
Expand Down Expand Up @@ -47,3 +48,4 @@ broker/nbactions-Server.xml
maven-metadata.xml
/.gradle/
/bin/
/metrics_prometheus/nbproject/
4 changes: 4 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
Version 0.19-SNAPSHOT
[feature] Added metrics framework and Prometheus implementation.
- Enable by setting `metrics_provider_class` to `MetricsProviderPrometheus`.
- Metrics endpoint can be found on http://localhost:9400/metrics by default.
- port can be changed with `metrics_endpoint_port`. Set to 0 to disable http endpoint.
[fix] Fixed SegmentedQueues not being cleaned up on session purge. (#833)

Version 0.18:
Expand Down
19 changes: 13 additions & 6 deletions broker/src/main/java/io/moquette/broker/PostOffice.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttSubscriptionOption;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
import io.netty.util.ReferenceCountUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -58,6 +57,8 @@
import java.util.stream.Collectors;

import static io.moquette.broker.Utils.messageId;
import io.moquette.metrics.MetricsManager;
import io.moquette.metrics.MetricsProvider;
import static io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader.from;
import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE;
import static io.netty.handler.codec.mqtt.MqttQoS.EXACTLY_ONCE;
Expand Down Expand Up @@ -211,6 +212,8 @@ public RouteResult ifFailed(Runnable action) {
private final ScheduledExpirationService<ISessionsRepository.Will> willExpirationService;
private final ScheduledExpirationService<ExpirableTopic> retainedMessagesExpirationService;
private final MqttQoS maxServerGrantedQos;
private final MetricsProvider metricsProvider;


static class ExpirableTopic implements Expirable {

Expand All @@ -233,22 +236,23 @@ public Optional<Instant> expireAt() {
* */
PostOffice(ISubscriptionsDirectory subscriptions, IRetainedRepository retainedRepository,
SessionRegistry sessionRegistry, ISessionsRepository sessionRepository, BrokerInterceptor interceptor, Authorizator authorizator,
SessionEventLoopGroup sessionLoops) {
this(subscriptions, retainedRepository, sessionRegistry, sessionRepository, interceptor, authorizator, sessionLoops, Clock.systemDefaultZone());
SessionEventLoopGroup sessionLoops, MetricsProvider metricsProvider) {
this(subscriptions, retainedRepository, sessionRegistry, sessionRepository, interceptor, authorizator, sessionLoops, Clock.systemDefaultZone(), metricsProvider);
}

PostOffice(ISubscriptionsDirectory subscriptions, IRetainedRepository retainedRepository,
SessionRegistry sessionRegistry, ISessionsRepository sessionRepository, BrokerInterceptor interceptor,
Authorizator authorizator,
SessionEventLoopGroup sessionLoops, Clock clock) {
SessionEventLoopGroup sessionLoops, Clock clock, MetricsProvider metricsProvider) {
this(subscriptions, retainedRepository, sessionRegistry, sessionRepository, interceptor, authorizator,
sessionLoops, clock, EXACTLY_ONCE);
sessionLoops, clock, EXACTLY_ONCE, metricsProvider);
}

PostOffice(ISubscriptionsDirectory subscriptions, IRetainedRepository retainedRepository,
SessionRegistry sessionRegistry, ISessionsRepository sessionRepository, BrokerInterceptor interceptor,
Authorizator authorizator,
SessionEventLoopGroup sessionLoops, Clock clock, MqttQoS maxServerGrantedQos) {
SessionEventLoopGroup sessionLoops, Clock clock, MqttQoS maxServerGrantedQos,
MetricsProvider metricsProvider) {
this.authorizator = authorizator;
this.subscriptions = subscriptions;
this.retainedRepository = retainedRepository;
Expand All @@ -258,6 +262,7 @@ public Optional<Instant> expireAt() {
this.sessionLoops = sessionLoops;
this.clock = clock;
this.maxServerGrantedQos = maxServerGrantedQos;
this.metricsProvider = metricsProvider;

this.willExpirationService = new ScheduledExpirationService<>(clock, this::publishWill);
recreateWillExpires(sessionRepository);
Expand Down Expand Up @@ -858,6 +863,7 @@ public int countBatches() {
private RoutingResults publish2Subscribers(String publisherClientId,
Set<String> filterTargetClients, Instant messageExpiry,
MqttPublishMessage msg) {
metricsProvider.addPublish();
final boolean retainPublish = msg.fixedHeader().isRetain();
final Topic topic = new Topic(msg.variableHeader().topicName());
final MqttQoS publishingQos = msg.fixedHeader().qosLevel();
Expand Down Expand Up @@ -945,6 +951,7 @@ private void publishToSession(ByteBuf payload, Topic topic, Subscription sub, Mq
LOG.debug("Sending PUBLISH message to active subscriber CId: {}, topicFilter: {}, qos: {}",
sub.getClientId(), sub.getTopicFilter(), qos);

metricsProvider.addMessage(SessionEventLoop.getThreadQueueId(), qos.value());
Collection<? extends MqttProperties.MqttProperty> existingProperties = msg.variableHeader().properties().listAll();
final MqttProperties.MqttProperty[] properties = prepareSubscriptionProperties(sub, existingProperties);
final SessionRegistry.PublishedMessage publishedMessage =
Expand Down
18 changes: 14 additions & 4 deletions broker/src/main/java/io/moquette/broker/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@
import java.util.concurrent.ScheduledExecutorService;

import static io.moquette.broker.Session.INFINITE_EXPIRY;
import static io.moquette.logging.LoggingUtils.getInterceptorIds;
import static io.moquette.metrics.MetricsUtils.getInterceptorIds;
import io.moquette.metrics.MetricsManager;
import io.moquette.metrics.MetricsProvider;

public class Server {

Expand All @@ -83,6 +85,7 @@ public class Server {
private H2Builder h2Builder;
private SessionRegistry sessions;
private boolean standalone = false;
private MetricsProvider metricsProvider;

public static void main(String[] args) throws IOException {
final Server server = new Server();
Expand Down Expand Up @@ -182,6 +185,8 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
}
LOG.trace("Starting Moquette Server. MQTT message interceptors={}", getInterceptorIds(handlers));

metricsProvider = MetricsManager.createMetricsProvider(config);

scheduler = Executors.newScheduledThreadPool(1);

final String handlerProp = System.getProperty(BrokerConstants.INTERCEPT_HANDLER_PROPERTY_NAME);
Expand Down Expand Up @@ -254,13 +259,13 @@ public void startServer(IConfig config, List<? extends InterceptHandler> handler
}

final int sessionQueueSize = config.intProp(IConfig.SESSION_QUEUE_SIZE, 1024);
final SessionEventLoopGroup loopsGroup = new SessionEventLoopGroup(interceptor, sessionQueueSize);
final SessionEventLoopGroup loopsGroup = new SessionEventLoopGroup(interceptor, sessionQueueSize, metricsProvider);
sessions = new SessionRegistry(subscriptions, sessionsRepository, queueRepository, authorizator, scheduler,
clock, globalSessionExpiry, loopsGroup);
clock, globalSessionExpiry, loopsGroup, metricsProvider);

final MqttQoS serverGrantedQoS = parseMaxGrantedQoS(config);
dispatcher = new PostOffice(subscriptions, retainedRepository, sessions, sessionsRepository, interceptor,
authorizator, loopsGroup, clock, serverGrantedQoS);
authorizator, loopsGroup, clock, serverGrantedQoS, metricsProvider);
final BrokerConfiguration brokerConfig = new BrokerConfiguration(config);
MQTTConnectionFactory connectionFactory = new MQTTConnectionFactory(brokerConfig, authenticator, sessions,
dispatcher);
Expand Down Expand Up @@ -619,6 +624,7 @@ public void stopServer() {

interceptor.stop();
dispatcher.terminate();
metricsProvider.stop();
LOG.info("Moquette integration has been stopped.");
}

Expand All @@ -630,6 +636,10 @@ public int getSslPort() {
return acceptor.getSslPort();
}

public MetricsProvider getMetricsProvider() {
return metricsProvider;
}

/**
* SPI method used by Broker embedded applications to get list of subscribers. Returns null if
* the broker is not started.
Expand Down
31 changes: 26 additions & 5 deletions broker/src/main/java/io/moquette/broker/SessionEventLoop.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.moquette.broker;

import io.moquette.metrics.MetricsManager;
import io.moquette.metrics.MetricsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -12,25 +14,36 @@ final class SessionEventLoop extends Thread {

private final BlockingQueue<FutureTask<String>> taskQueue;
private final boolean flushOnExit;
private final int queueId;
private final MetricsProvider metricsProvider;
/**
* Allows a task to fetch the id of the session queue that is executing it.
*/
private static final ThreadLocal<Integer> threadQueueId = new ThreadLocal<>();

public SessionEventLoop(BlockingQueue<FutureTask<String>> taskQueue) {
this(taskQueue, true);
public SessionEventLoop(BlockingQueue<FutureTask<String>> taskQueue, int queueId, MetricsProvider metricsProvider) {
this(taskQueue, queueId, true, metricsProvider);
}

/**
* @param flushOnExit consume the commands queue before exit.
* */
public SessionEventLoop(BlockingQueue<FutureTask<String>> taskQueue, boolean flushOnExit) {
*
*/
public SessionEventLoop(BlockingQueue<FutureTask<String>> taskQueue, int queueId, boolean flushOnExit, MetricsProvider metricsProvider) {
this.taskQueue = taskQueue;
this.queueId = queueId;
this.flushOnExit = flushOnExit;
this.metricsProvider = metricsProvider;
}

@Override
public void run() {
threadQueueId.set(queueId);
while (!Thread.interrupted() || (Thread.interrupted() && !taskQueue.isEmpty() && flushOnExit)) {
try {
// blocking call
final FutureTask<String> task = this.taskQueue.take();
final FutureTask<String> task = taskQueue.take();
metricsProvider.sessionQueueDec(queueId);
executeTask(task);
} catch (InterruptedException e) {
LOG.info("SessionEventLoop {} interrupted", Thread.currentThread().getName());
Expand All @@ -53,4 +66,12 @@ public static void executeTask(final FutureTask<String> task) {
}
}
}

public static int getThreadQueueId() {
Integer id = threadQueueId.get();
if (id == null) {
return -1;
}
return id;
}
}
14 changes: 11 additions & 3 deletions broker/src/main/java/io/moquette/broker/SessionEventLoopGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.moquette.interception.BrokerInterceptor;
import io.moquette.interception.messages.InterceptExceptionMessage;
import io.moquette.metrics.MetricsManager;
import io.moquette.metrics.MetricsProvider;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -20,15 +22,18 @@ class SessionEventLoopGroup {
private final BlockingQueue<FutureTask<String>>[] sessionQueues;
private final int eventLoops = Runtime.getRuntime().availableProcessors();
private final ConcurrentMap<String, Throwable> loopThrownExceptions = new ConcurrentHashMap<>();
private final MetricsProvider metricsProvider;

SessionEventLoopGroup(BrokerInterceptor interceptor, int sessionQueueSize) {
SessionEventLoopGroup(BrokerInterceptor interceptor, int sessionQueueSize, MetricsProvider metricsProvider) {
this.metricsProvider = metricsProvider;
this.sessionQueues = new BlockingQueue[eventLoops];
metricsProvider.initSessionQueues(eventLoops, sessionQueueSize);
for (int i = 0; i < eventLoops; i++) {
this.sessionQueues[i] = new ArrayBlockingQueue<>(sessionQueueSize);
}
this.sessionExecutors = new SessionEventLoop[eventLoops];
for (int i = 0; i < eventLoops; i++) {
SessionEventLoop newLoop = new SessionEventLoop(this.sessionQueues[i]);
SessionEventLoop newLoop = new SessionEventLoop(this.sessionQueues[i], i, metricsProvider);
newLoop.setName(sessionLoopName(i));
newLoop.setUncaughtExceptionHandler((loopThread, ex) -> {
// executed in session loop thread
Expand Down Expand Up @@ -78,10 +83,13 @@ public PostOffice.RouteResult routeCommand(String clientId, String actionDescrip
SessionEventLoop.executeTask(task);
return PostOffice.RouteResult.success(clientId, cmd.completableFuture());
}
if (this.sessionQueues[targetQueueId].offer(task)) {
final BlockingQueue<FutureTask<String>> targetQueue = this.sessionQueues[targetQueueId];
if (targetQueue.offer(task)) {
metricsProvider.sessionQueueInc(targetQueueId);
return PostOffice.RouteResult.success(clientId, cmd.completableFuture());
} else {
LOG.warn("Session command queue {} is full executing action {}", targetQueueId, actionDescription);
metricsProvider.addSessionQueueOverrun(targetQueueId);
return PostOffice.RouteResult.failed(clientId);
}
}
Expand Down
42 changes: 27 additions & 15 deletions broker/src/main/java/io/moquette/broker/SessionRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;

import static io.moquette.BrokerConstants.INFLIGHT_WINDOW_SIZE;
import static io.moquette.broker.Session.INFINITE_EXPIRY;
import io.moquette.metrics.MetricsManager;
import io.moquette.metrics.MetricsProvider;

public class SessionRegistry {

Expand Down Expand Up @@ -152,8 +153,8 @@ private static int findPublicationExpiryProperty(MqttProperties.MqttProperty[] p
}

private static boolean isPublicationExpiryProperty(MqttProperties.MqttProperty property) {
return property instanceof MqttProperties.IntegerProperty &&
property.propertyId() == MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value();
return property instanceof MqttProperties.IntegerProperty
&& property.propertyId() == MqttProperties.MqttPropertyType.PUBLICATION_EXPIRY_INTERVAL.value();
}

public Instant getMessageExpiry() {
Expand All @@ -162,18 +163,19 @@ public Instant getMessageExpiry() {

@Override
public String toString() {
return "PublishedMessage{" +
"topic=" + topic +
", publishingQos=" + publishingQos +
", payload=" + payload +
", retained=" + retained +
", messageExpiry=" + messageExpiry +
", mqttProperties=" + Arrays.toString(mqttProperties) +
'}';
return "PublishedMessage{"
+ "topic=" + topic
+ ", publishingQos=" + publishingQos
+ ", payload=" + payload
+ ", retained=" + retained
+ ", messageExpiry=" + messageExpiry
+ ", mqttProperties=" + Arrays.toString(mqttProperties)
+ '}';
}
}

public static final class PubRelMarker extends EnqueuedMessage {

@Override
public String toString() {
return "PubRelMarker{}";
Expand Down Expand Up @@ -205,15 +207,17 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr
private final IQueueRepository queueRepository;
private final Authorizator authorizator;
private final Clock clock;
private final MetricsProvider metricsProvider;

// Used in testing
SessionRegistry(ISubscriptionsDirectory subscriptionsDirectory,
ISessionsRepository sessionsRepository,
IQueueRepository queueRepository,
Authorizator authorizator,
ScheduledExecutorService scheduler,
SessionEventLoopGroup loopsGroup) {
this(subscriptionsDirectory, sessionsRepository, queueRepository, authorizator, scheduler, Clock.systemDefaultZone(), INFINITE_EXPIRY, loopsGroup);
SessionEventLoopGroup loopsGroup,
MetricsProvider metricsProvider) {
this(subscriptionsDirectory, sessionsRepository, queueRepository, authorizator, scheduler, Clock.systemDefaultZone(), INFINITE_EXPIRY, loopsGroup, metricsProvider);
}

SessionRegistry(ISubscriptionsDirectory subscriptionsDirectory,
Expand All @@ -222,7 +226,8 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr
Authorizator authorizator,
ScheduledExecutorService scheduler,
Clock clock, int globalExpirySeconds,
SessionEventLoopGroup loopsGroup) {
SessionEventLoopGroup loopsGroup,
MetricsProvider metricsProvider) {
this.subscriptionsDirectory = subscriptionsDirectory;
this.sessionsRepository = sessionsRepository;
this.queueRepository = queueRepository;
Expand All @@ -231,6 +236,7 @@ public SessionCreationResult(Session session, CreationModeEnum mode, boolean alr
this.clock = clock;
this.globalExpirySeconds = globalExpirySeconds;
this.loopsGroup = loopsGroup;
this.metricsProvider = metricsProvider;
recreateSessionPool();
}

Expand Down Expand Up @@ -261,6 +267,7 @@ private void recreateSessionPool() {
queues.remove(session.clientId());
Session rehydrated = new Session(session, false, persistentQueue);
pool.put(session.clientId(), rehydrated);
metricsProvider.addOpenSession();

trackForRemovalOnExpiration(session);
}
Expand All @@ -280,6 +287,7 @@ SessionCreationResult createOrReopenSession(MqttConnectMessage msg, String clien

// publish the session
final Session previous = pool.put(clientId, newSession);
metricsProvider.addOpenSession();
if (previous != null) {
// if this happens mean that another Session Event Loop thread processed a CONNECT message
// with the same clientId. This is a bug because all messages for the same clientId should
Expand Down Expand Up @@ -307,7 +315,10 @@ private SessionCreationResult reopenExistingSession(MqttConnectMessage msg, Stri
purgeSessionState(oldSession);
// publish new session
final Session newSession = createNewSession(msg, clientId);
pool.put(clientId, newSession);
Session previous = pool.put(clientId, newSession);
if (previous != null) {
LOG.error("We're re-opening a session for clientId {} and we purged the old session, but there is still a session in the pool! this is a bug!", clientId);
}

LOG.trace("case 2, oldSession with same CId {} disconnected", clientId);
creationResult = new SessionCreationResult(newSession, CreationModeEnum.CREATED_CLEAN_NEW, true);
Expand Down Expand Up @@ -499,6 +510,7 @@ private void purgeSessionState(Session session) {
void remove(String clientID) {
final Session old = pool.remove(clientID);
if (old != null) {
metricsProvider.removeOpenSession();
// remove from expired tracker if present
sessionExpirationService.untrack(clientID);
loopsGroup.routeCommand(clientID, "Clean up removed session", () -> {
Expand Down
Loading