Skip to content

Commit d3ba6b4

Browse files
committed
fix mqtt pub/sub topic is null
1 parent 3af72af commit d3ba6b4

File tree

9 files changed

+148
-103
lines changed

9 files changed

+148
-103
lines changed

toolkit-common/src/main/java/iot/technology/client/toolkit/common/constants/PubData.java

Lines changed: 15 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,29 @@ public class PubData implements Serializable {
3333

3434
private String payload;
3535

36-
public static PubData validate(String data) {
36+
public static boolean validate(String data, TopicAndQos bizDomain) {
3737
ResourceBundle bundle = ResourceBundle.getBundle(StorageConstants.LANG_MESSAGES);
38-
PubData pubData = new PubData();
39-
String topic = "";
40-
int qos = 0;
41-
if (StringUtils.isBlank(data)) {
38+
if (StringUtils.isBlank(data) || !data.contains(" ")) {
4239
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
40+
return false;
4341
}
44-
if (!data.contains("=")) {
45-
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
46-
}
47-
int equalIndex = data.indexOf("=");
42+
int equalIndex = data.indexOf(" ");
4843
String topicAndQos = data.substring(0, equalIndex);
4944
if (StringUtils.isBlank(topicAndQos)) {
5045
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
46+
return false;
5147
}
52-
ObjectUtils.topicAndQosValidator(topicAndQos, topic, qos, bundle);
53-
String payload = data.substring(equalIndex + 1);
54-
if (StringUtils.isBlank(payload)) {
55-
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
48+
boolean validateResult = ObjectUtils.topicAndQosValidator(topicAndQos, bizDomain);
49+
if (validateResult) {
50+
String payload = data.substring(equalIndex + 1);
51+
if (StringUtils.isBlank(payload)) {
52+
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
53+
return false;
54+
}
55+
bizDomain.setPayload(payload);
56+
return true;
5657
}
57-
pubData.setTopic(topic);
58-
pubData.setQos(qos);
59-
pubData.setPayload(payload);
60-
return pubData;
58+
return false;
6159
}
6260

6361
public String getTopic() {

toolkit-common/src/main/java/iot/technology/client/toolkit/common/constants/SubData.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -33,31 +33,23 @@ public class SubData implements Serializable {
3333

3434
private int qos;
3535

36-
public static SubData validate(String data) {
37-
SubData subData = new SubData();
38-
String topic = "";
39-
int qos = 0;
36+
public static boolean validate(String data, TopicAndQos bizDomain) {
4037
ResourceBundle bundle = ResourceBundle.getBundle(StorageConstants.LANG_MESSAGES);
41-
if (StringUtils.isBlank(data)) {
42-
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
43-
}
44-
if ((!data.contains("add") && !data.contains("del")) || !data.contains(" ")) {
38+
if (StringUtils.isBlank(data)
39+
|| (!data.startsWith("add") && !data.startsWith("del"))
40+
|| !data.contains(" ")) {
4541
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
42+
return false;
4643
}
4744
int spaceIndex = data.indexOf(" ");
48-
String operationStr = data.substring(0, spaceIndex);
49-
if ((!operationStr.contains("add") && !operationStr.contains("del"))) {
50-
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
51-
}
52-
subData.setOperation(operationStr);
45+
String operation = data.substring(0, spaceIndex);
46+
bizDomain.setOperation(operation);
5347
String topicAndQos = data.substring(spaceIndex + 1).trim();
5448
if (StringUtils.isBlank(topicAndQos)) {
5549
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
50+
return false;
5651
}
57-
ObjectUtils.topicAndQosValidator(topicAndQos, topic, qos, bundle);
58-
subData.setTopic(topic);
59-
subData.setQos(qos);
60-
return subData;
52+
return ObjectUtils.topicAndQosValidator(topicAndQos, bizDomain);
6153
}
6254

6355

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright © 2019-2023 The Toolkit Authors
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package iot.technology.client.toolkit.common.constants;
17+
18+
import java.io.Serializable;
19+
20+
/**
21+
* @author mushuwei
22+
*/
23+
public class TopicAndQos implements Serializable {
24+
25+
private String operation;
26+
27+
private String topic;
28+
29+
private int qos = 0;
30+
31+
private String payload;
32+
33+
public String getTopic() {
34+
return topic;
35+
}
36+
37+
public void setTopic(String topic) {
38+
this.topic = topic;
39+
}
40+
41+
public int getQos() {
42+
return qos;
43+
}
44+
45+
public void setQos(int qos) {
46+
this.qos = qos;
47+
}
48+
49+
public String getPayload() {
50+
return payload;
51+
}
52+
53+
public void setPayload(String payload) {
54+
this.payload = payload;
55+
}
56+
57+
public String getOperation() {
58+
return operation;
59+
}
60+
61+
public void setOperation(String operation) {
62+
this.operation = operation;
63+
}
64+
}

toolkit-common/src/main/java/iot/technology/client/toolkit/common/utils/ObjectUtils.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package iot.technology.client.toolkit.common.utils;
1717

18+
import iot.technology.client.toolkit.common.constants.StorageConstants;
19+
import iot.technology.client.toolkit.common.constants.TopicAndQos;
1820
import iot.technology.client.toolkit.common.rule.TkNode;
1921

2022
import java.lang.reflect.Field;
@@ -90,29 +92,27 @@ public static Object setValue(Object obj, String propName, String value) {
9092
return obj;
9193
}
9294

93-
public static void topicAndQosValidator(String topicAndQos, String topic, int qos, ResourceBundle bundle) {
95+
public static boolean topicAndQosValidator(String topicAndQos, TopicAndQos domain) {
96+
ResourceBundle bundle = ResourceBundle.getBundle(StorageConstants.LANG_MESSAGES);
9497
if (!topicAndQos.contains(":")) {
95-
topic = topicAndQos;
96-
} else {
97-
int divide = topicAndQos.indexOf(":");
98-
String qosStr = topicAndQos.substring(divide + 1);
99-
String topicStr = topicAndQos.substring(0, divide);
100-
if (StringUtils.isBlank(topicStr)) {
101-
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
102-
}
103-
topic = topicStr;
104-
if (StringUtils.isBlank(qosStr) || !StringUtils.isNumeric(qosStr)) {
105-
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
106-
}
107-
Integer qosValue = Integer.parseInt(qosStr);
108-
if (qosValue.equals(0)
109-
|| qosValue.equals(1)
110-
|| qosValue.equals(2)) {
111-
qos = qosValue;
112-
} else {
113-
System.out.format(ColorUtils.redError(bundle.getString("mqtt.qos.error")));
114-
}
98+
domain.setTopic(topicAndQos);
99+
return true;
100+
}
101+
int divide = topicAndQos.indexOf(":");
102+
String qosStr = topicAndQos.substring(divide + 1);
103+
String topicStr = topicAndQos.substring(0, divide);
104+
if (StringUtils.isBlank(topicStr)) {
105+
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
106+
return false;
107+
}
108+
domain.setTopic(topicStr);
109+
if (StringUtils.isBlank(qosStr) || !StringUtils.isNumeric(qosStr)) {
110+
System.out.format(ColorUtils.redError(bundle.getString("param.error")));
111+
return false;
115112
}
113+
int qosValue = Integer.parseInt(qosStr) < 0 || Integer.parseInt(qosStr) > 2 ? 0 : Integer.parseInt(qosStr);
114+
domain.setQos(qosValue);
115+
return true;
116116
}
117117

118118
public static TkNode initComponent(String node) {

toolkit-mqtt/src/main/java/iot/technology/client/toolkit/mqtt/command/sub/MqttPublishCommand.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,9 @@ public Integer call() throws Exception {
9292
data = reader.readLine(tkNode.nodePrompt());
9393
context.setData(data);
9494
tkNode.check(context);
95-
bizService.printValueToConsole(code, tkNode.getValue(context), context);
96-
ObjectUtils.setValue(domain, code, tkNode.getValue(context));
95+
String codeData = tkNode.getValue(context);
96+
bizService.printValueToConsole(code, codeData, context);
97+
ObjectUtils.setValue(domain, code, codeData);
9798
bizService.mqttProcessorAfterLogic(code, data, domain, true);
9899
code = tkNode.nextNode(context);
99100
} catch (UserInterruptException | EndOfFileException e) {

toolkit-mqtt/src/main/java/iot/technology/client/toolkit/mqtt/service/MqttBizService.java

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -127,13 +127,20 @@ public void mqttProcessorAfterLogic(String code, String data, MqttConfigSettings
127127
break;
128128
}
129129
if (code.equals(MqttSettingsCodeEnum.PUBLISH_MESSAGE.getCode())) {
130-
PubData pubData = PubData.validate(data);
131-
mqttPubLogic(pubData, domain.getClient());
130+
TopicAndQos bizDomain = new TopicAndQos();
131+
boolean validate = PubData.validate(data, bizDomain);
132+
if (validate) {
133+
mqttPubLogic(bizDomain, domain.getClient());
134+
}
132135
break;
133136
}
134137
if (code.equals(MqttSettingsCodeEnum.SUBSCRIBE_MESSAGE.getCode())) {
135-
SubData subData = SubData.validate(data);
136-
mqttSubLogic(subData, domain.getClient());
138+
TopicAndQos bizDomain = new TopicAndQos();
139+
boolean validate = SubData.validate(data, bizDomain);
140+
if (validate) {
141+
mqttSubLogic(bizDomain, domain.getClient());
142+
}
143+
break;
137144
}
138145
if ((code.equals(MqttSettingsCodeEnum.LASTWILLANDTESTAMENT.getCode()) &&
139146
data.toUpperCase().equals(ConfirmCodeEnum.NO.getValue()))
@@ -209,20 +216,21 @@ private MqttClientService connectBroker(MqttClientConfig config) {
209216
return mqttClientService;
210217
}
211218

212-
private void mqttPubLogic(PubData data, MqttClientService client) {
213-
MqttQoS qosLevel = MqttQoS.valueOf(data.getQos());
214-
client.publish(data.getTopic(), Unpooled.wrappedBuffer(data.getPayload().getBytes(StandardCharsets.UTF_8)), qosLevel, false);
219+
private void mqttPubLogic(TopicAndQos bizDomain, MqttClientService client) {
220+
MqttQoS qosLevel = MqttQoS.valueOf(bizDomain.getQos());
221+
client.publish(bizDomain.getTopic(), Unpooled.wrappedBuffer(bizDomain.getPayload().getBytes(StandardCharsets.UTF_8)), qosLevel,
222+
false);
215223
System.out.format(
216-
data.getPayload() + " " + bundle.getString("publishMessage.success") + String.format(EmojiEnum.successEmoji) + "%n");
224+
bizDomain.getPayload() + " " + bundle.getString("publishMessage.success") + String.format(EmojiEnum.successEmoji) + "%n");
217225
}
218226

219-
private void mqttSubLogic(SubData data, MqttClientService client) {
220-
MqttQoS qosLevel = MqttQoS.valueOf(data.getQos());
227+
private void mqttSubLogic(TopicAndQos bizDomain, MqttClientService client) {
228+
MqttQoS qosLevel = MqttQoS.valueOf(bizDomain.getQos());
221229
MqttSubMessageHandler handler = new MqttSubMessageHandler();
222-
if (data.getOperation().equals("add")) {
223-
client.on(data.getTopic(), handler, qosLevel);
230+
if (bizDomain.getOperation().equals("add")) {
231+
client.on(bizDomain.getTopic(), handler, qosLevel);
224232
} else {
225-
client.off(data.getTopic());
233+
client.off(bizDomain.getTopic());
226234
}
227235
}
228236

toolkit-mqtt/src/main/java/iot/technology/client/toolkit/mqtt/service/MqttClientConfig.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,12 @@
1919
import io.netty.handler.ssl.SslContext;
2020
import iot.technology.client.toolkit.mqtt.service.domain.MqttLastWill;
2121

22-
import java.util.Random;
23-
import java.util.concurrent.ThreadLocalRandom;
24-
2522
/**
2623
* @author mushuwei
2724
*/
2825
public final class MqttClientConfig {
2926

3027
private final SslContext sslContext;
31-
private final String randomClientId;
3228

3329
private String clientId;
3430
private int timeoutSeconds = 10;
@@ -51,14 +47,6 @@ public MqttClientConfig() {
5147

5248
public MqttClientConfig(SslContext sslContext) {
5349
this.sslContext = sslContext;
54-
ThreadLocalRandom random = ThreadLocalRandom.current();
55-
String id = "toolkit_mqtt_";
56-
String[] options = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789".split("");
57-
for (int i = 0; i < 8; i++) {
58-
id += options[random.nextInt(options.length)];
59-
}
60-
this.clientId = id;
61-
this.randomClientId = id;
6250
}
6351

6452

@@ -92,11 +80,7 @@ public String getClientId() {
9280
}
9381

9482
public void setClientId(String clientId) {
95-
if (clientId == null) {
96-
this.clientId = randomClientId;
97-
} else {
98-
this.clientId = clientId;
99-
}
83+
this.clientId = clientId;
10084
}
10185

10286
public int getMaxBytesInMessage() {

toolkit-mqtt/src/main/java/iot/technology/client/toolkit/mqtt/service/node/PublishMessageNode.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package iot.technology.client.toolkit.mqtt.service.node;
22

3-
import iot.technology.client.toolkit.common.constants.GlobalConstants;
4-
import iot.technology.client.toolkit.common.constants.MqttSettingsCodeEnum;
5-
import iot.technology.client.toolkit.common.constants.PubData;
6-
import iot.technology.client.toolkit.common.constants.StorageConstants;
3+
import iot.technology.client.toolkit.common.constants.*;
74
import iot.technology.client.toolkit.common.rule.NodeContext;
85
import iot.technology.client.toolkit.common.rule.TkNode;
96
import iot.technology.client.toolkit.common.utils.ColorUtils;
@@ -19,16 +16,19 @@ public class PublishMessageNode implements TkNode {
1916

2017
@Override
2118
public void prePrompt(NodeContext context) {
22-
System.out.format(ColorUtils.greenItalic(bundle.getString("publishMessage.pre.prompt") + " topic:qos=message") + "%n");
23-
System.out.format(ColorUtils.greenItalic(bundle.getString("publishMessage.pre.example") + " hello:0=hello world") + "%n");
19+
System.out.format(ColorUtils.greenItalic(
20+
bundle.getString("publishMessage.pre.prompt") + " topic:qos message or topic message (qos default 0)") + "%n");
21+
System.out.format(
22+
ColorUtils.greenItalic(bundle.getString("publishMessage.pre.example") + " hello:0 hello world") + "%n");
2423

2524
}
2625

2726
@Override
2827
public boolean check(NodeContext context) {
29-
PubData.validate(context.getData());
30-
context.setCheck(true);
31-
return true;
28+
TopicAndQos bizDomain = new TopicAndQos();
29+
boolean validate = PubData.validate(context.getData(), bizDomain);
30+
context.setCheck(validate);
31+
return validate;
3232
}
3333

3434
@Override

toolkit-mqtt/src/main/java/iot/technology/client/toolkit/mqtt/service/node/SubscribeMessageNode.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package iot.technology.client.toolkit.mqtt.service.node;
22

3-
import iot.technology.client.toolkit.common.constants.GlobalConstants;
4-
import iot.technology.client.toolkit.common.constants.MqttSettingsCodeEnum;
5-
import iot.technology.client.toolkit.common.constants.StorageConstants;
6-
import iot.technology.client.toolkit.common.constants.SubData;
3+
import iot.technology.client.toolkit.common.constants.*;
74
import iot.technology.client.toolkit.common.rule.NodeContext;
85
import iot.technology.client.toolkit.common.rule.TkNode;
96
import iot.technology.client.toolkit.common.utils.ColorUtils;
@@ -25,9 +22,10 @@ public void prePrompt(NodeContext context) {
2522

2623
@Override
2724
public boolean check(NodeContext context) {
28-
SubData.validate(context.getData());
29-
context.setCheck(true);
30-
return true;
25+
TopicAndQos bizDomain = new TopicAndQos();
26+
boolean validate = SubData.validate(context.getData(), bizDomain);
27+
context.setCheck(validate);
28+
return validate;
3129
}
3230

3331
@Override

0 commit comments

Comments
 (0)