Skip to content

Commit

Permalink
✨ 统一规范日志
Browse files Browse the repository at this point in the history
  • Loading branch information
li-xunhuan committed Nov 28, 2024
1 parent 7289838 commit 3c2f6b6
Showing 1 changed file with 31 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,11 @@ private static void connAckByReturnCode(String clientId, String uniqueId, Channe
.returnCode(returnCode)
.sessionPresent(false)
.build();
Tio.send(context, message);
if (MqttConnectReasonCode.CONNECTION_ACCEPTED == returnCode) {
logger.info("Connect successful, clientId: {} uniqueId:{}", clientId, uniqueId);
boolean result = Tio.send(context, message);
if (returnCode.isAccepted()) {
logger.info("Connect successful, clientId: {} uniqueId:{} result:{}", clientId, uniqueId, result);
} else {
logger.error("Connect error - clientId: {} uniqueId:{} returnCode:{}", clientId, uniqueId, returnCode);
logger.error("Connect error - clientId: {} uniqueId:{} returnCode:{} result:{}", clientId, uniqueId, returnCode, result);
}
}

Expand Down Expand Up @@ -261,23 +261,23 @@ public void processPublish(ChannelContext context, MqttPublishMessage message) {

@Override
public void processPubAck(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
int packetId = variableHeader.messageId();
String clientId = context.getBsId();
logger.debug("PubAck - clientId:{}, messageId:{}", clientId, messageId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
logger.debug("PubAck - clientId:{}, packetId:{}", clientId, packetId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, packetId);
if (pendingPublish == null) {
return;
}
pendingPublish.onPubAckReceived();
sessionManager.removePendingPublish(clientId, messageId);
sessionManager.removePendingPublish(clientId, packetId);
}

@Override
public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
String clientId = context.getBsId();
int messageId = variableHeader.messageId();
logger.debug("PubRec - clientId:{}, messageId:{}", clientId, messageId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
int packetId = variableHeader.messageId();
logger.debug("PubRec - clientId:{}, packetId:{}", clientId, packetId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, packetId);
if (pendingPublish == null) {
return;
}
Expand All @@ -289,48 +289,48 @@ public void processPubRec(ChannelContext context, MqttMessageIdVariableHeader va
pendingPublish.startPubRelRetransmissionTimer(taskService, context);

boolean result = Tio.send(context, pubRelMessage);
logger.debug("Publish - PubRel send clientId:{} packetId:{} result:{}", clientId, messageId, result);
logger.debug("Publish - PubRel send clientId:{} packetId:{} result:{}", clientId, packetId, result);
}

@Override
public void processPubRel(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
String clientId = context.getBsId();
int messageId = variableHeader.messageId();
logger.debug("PubRel - clientId:{}, messageId:{}", clientId, messageId);
MqttPendingQos2Publish pendingQos2Publish = sessionManager.getPendingQos2Publish(clientId, messageId);
int packetId = variableHeader.messageId();
logger.debug("PubRel - clientId:{}, packetId:{}", clientId, packetId);
MqttPendingQos2Publish pendingQos2Publish = sessionManager.getPendingQos2Publish(clientId, packetId);
if (pendingQos2Publish != null) {
MqttPublishMessage incomingPublish = pendingQos2Publish.getIncomingPublish();
String topicName = incomingPublish.variableHeader().topicName();
MqttFixedHeader incomingFixedHeader = incomingPublish.fixedHeader();
MqttQoS mqttQoS = incomingFixedHeader.qosLevel();
invokeListenerForPublish(context, clientId, mqttQoS, topicName, incomingPublish);
pendingQos2Publish.onPubRelReceived();
sessionManager.removePendingQos2Publish(clientId, messageId);
sessionManager.removePendingQos2Publish(clientId, packetId);
}
MqttMessage message = MqttMessageFactory.newMessage(
new MqttFixedHeader(MqttMessageType.PUBCOMP, false, MqttQoS.QOS0, false, 0),
MqttMessageIdVariableHeader.from(messageId), null);
MqttMessageIdVariableHeader.from(packetId), null);

boolean result = Tio.send(context, message);
logger.debug("Publish - PubComp send clientId:{} packetId:{} result:{}", clientId, messageId, result);
logger.debug("Publish - PubComp send clientId:{} packetId:{} result:{}", clientId, packetId, result);
}

@Override
public void processPubComp(ChannelContext context, MqttMessageIdVariableHeader variableHeader) {
int messageId = variableHeader.messageId();
int packetId = variableHeader.messageId();
String clientId = context.getBsId();
logger.debug("PubComp - clientId:{}, messageId:{}", clientId, messageId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, messageId);
logger.debug("PubComp - clientId:{}, packetId:{}", clientId, packetId);
MqttPendingPublish pendingPublish = sessionManager.getPendingPublish(clientId, packetId);
if (pendingPublish != null) {
pendingPublish.onPubCompReceived();
sessionManager.removePendingPublish(clientId, messageId);
sessionManager.removePendingPublish(clientId, packetId);
}
}

@Override
public void processSubscribe(ChannelContext context, MqttSubscribeMessage message) {
String clientId = context.getBsId();
int messageId = message.variableHeader().messageId();
int packetId = message.variableHeader().messageId();
// 1. 校验订阅的 topicFilter
List<MqttTopicSubscription> topicSubscriptionList = message.payload().topicSubscriptions();
List<MqttQoS> grantedQosList = new ArrayList<>();
Expand All @@ -345,22 +345,22 @@ public void processSubscribe(ChannelContext context, MqttSubscribeMessage messag
// 校验是否可以订阅
if (enableSubscribeValidator && !subscribeValidator.verifyTopicFilter(context, clientId, topicFilter, mqttQoS)) {
grantedQosList.add(MqttQoS.FAILURE);
logger.error("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} valid failed messageId:{}", clientId, topicFilter, mqttQoS, messageId);
logger.error("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} valid failed packetId:{}", clientId, topicFilter, mqttQoS, packetId);
} else {
grantedQosList.add(mqttQoS);
subscribedTopicList.add(topicFilter);
sessionManager.addSubscribe(topicFilter, clientId, mqttQoS.value());
logger.info("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} messageId:{}", clientId, topicFilter, mqttQoS, messageId);
logger.info("Subscribe - clientId:{} topicFilter:{} mqttQoS:{} packetId:{}", clientId, topicFilter, mqttQoS, packetId);
publishSubscribedEvent(context, clientId, topicFilter, mqttQoS);
}
}
// 3. 返回 ack
MqttMessage subAckMessage = MqttMessageBuilders.subAck()
.addGrantedQosList(grantedQosList)
.packetId(messageId)
.packetId(packetId)
.build();
boolean result = Tio.send(context, subAckMessage);
logger.debug("Subscribe - Aco send clientId:{} packetId:{} result:{}", clientId, messageId, result);
logger.info("Subscribe - SubAck send clientId:{} subscribedTopicList:{} packetId:{} result:{}", clientId, subscribedTopicList, packetId, result);
// 4. 发送保留消息
for (String topic : subscribedTopicList) {
executor.submit(() -> {
Expand Down Expand Up @@ -398,18 +398,18 @@ private void publishSubscribedEvent(ChannelContext context, String clientId, Str
@Override
public void processUnSubscribe(ChannelContext context, MqttUnsubscribeMessage message) {
String clientId = context.getBsId();
int messageId = message.variableHeader().messageId();
int packetId = message.variableHeader().messageId();
List<String> topicFilterList = message.payload().topics();
for (String topicFilter : topicFilterList) {
sessionManager.removeSubscribe(topicFilter, clientId);
publishUnsubscribedEvent(context, clientId, topicFilter);
}
logger.info("UnSubscribe - clientId:{} Topic:{} messageId:{}", clientId, topicFilterList, messageId);
logger.info("UnSubscribe - clientId:{} Topic:{} packetId:{}", clientId, topicFilterList, packetId);
MqttMessage unSubMessage = MqttMessageBuilders.unsubAck()
.packetId(messageId)
.packetId(packetId)
.build();
boolean result = Tio.send(context, unSubMessage);
logger.debug("UnSubscribe - Ack send clientId:{} result:{}", clientId, result);
logger.debug("UnSubscribe - UnSubAck send clientId:{} result:{}", clientId, result);
}

/**
Expand Down

0 comments on commit 3c2f6b6

Please sign in to comment.