java连接MQTT服务器(Springboot整合MQTT)

java连接MQTT服务器(Springboot整合MQTT)目录一、业务场景二、本文只讲解java连接MQTT服务器进行数据处理一、业务场景硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。MQTT是基于发布(Publish)/订阅(Subscribe)模式来进行通信及数据交换的。二、本文只讲解java连接MQTT服务器进行数据处理…

大家好,又见面了,我是你们的朋友全栈君。

目录

一、业务场景

二、本文只讲解java连接MQTT服务器进行数据处理


一、业务场景

 硬件采集的数据传入EMQX平台(采用MQTT协议),java通过代码连接MQTT服务器,进行采集数据接收、解析、业务处理、存储入库、数据展示。

MQTT 是基于 发布(Publish)/订阅(Subscribe) 模式来进行通信及数据交换的。

二、本文只讲解java连接MQTT服务器进行数据处理

1、新建springboot项目,pom文件中直接引入下面的mqtt依赖

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-mqtt</artifactId>
        </dependency>

2、 编写MQTT工具类

package com.siborui.dc.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * MQTT工具类操作
 *
 * @author Mr.Qu
 * @since v1.1.0 2020-01-10
 */
@Slf4j
@Component
public class MQTTConnect {

    private String HOST = "tcp://127.0.0.1:1883"; //mqtt服务器的地址和端口号
    private final String clientId = "DC" + (int) (Math.random() * 100000000);
    private MqttClient mqttClient;

    /**
     * 客户端connect连接mqtt服务器
     *
     * @param userName     用户名
     * @param passWord     密码
     * @param mqttCallback 回调函数
     **/
    public void setMqttClient(String userName, String passWord, MqttCallback mqttCallback) throws MqttException {
        MqttConnectOptions options = mqttConnectOptions(userName, passWord);
        if (mqttCallback == null) {
            mqttClient.setCallback(new Callback());
        } else {
            mqttClient.setCallback(mqttCallback);
        }
        mqttClient.connect(options);
    }

    /**
     * MQTT连接参数设置
     */
    private MqttConnectOptions mqttConnectOptions(String userName, String passWord) throws MqttException {
        mqttClient = new MqttClient(HOST, clientId, new MemoryPersistence());
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName(userName);
        options.setPassword(passWord.toCharArray());
        options.setConnectionTimeout(10);///默认:30
        options.setAutomaticReconnect(true);//默认:false
        options.setCleanSession(false);//默认:true
        //options.setKeepAliveInterval(20);//默认:60
        return options;
    }

    /**
     * 关闭MQTT连接
     */
    public void close() throws MqttException {
        mqttClient.close();
        mqttClient.disconnect();
    }

    /**
     * 向某个主题发布消息 默认qos:1
     *
     * @param topic:发布的主题
     * @param msg:发布的消息
     */
    public void pub(String topic, String msg) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        //mqttMessage.setQos(2);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 向某个主题发布消息
     *
     * @param topic: 发布的主题
     * @param msg:   发布的消息
     * @param qos:   消息质量    Qos:0、1、2
     */
    public void pub(String topic, String msg, int qos) throws MqttException {
        MqttMessage mqttMessage = new MqttMessage();
        mqttMessage.setQos(qos);
        mqttMessage.setPayload(msg.getBytes());
        MqttTopic mqttTopic = mqttClient.getTopic(topic);
        MqttDeliveryToken token = mqttTopic.publish(mqttMessage);
        token.waitForCompletion();
    }

    /**
     * 订阅某一个主题 ,此方法默认的的Qos等级为:1
     *
     * @param topic 主题
     */
    public void sub(String topic) throws MqttException {
        mqttClient.subscribe(topic);
    }

    /**
     * 订阅某一个主题,可携带Qos
     *
     * @param topic 所要订阅的主题
     * @param qos   消息质量:0、1、2
     */
    public void sub(String topic, int qos) throws MqttException {
        mqttClient.subscribe(topic, qos);
    }

    /**
     * main函数自己测试用
     */
    public static void main(String[] args) throws MqttException {
        MQTTConnect mqttConnect = new MQTTConnect();
        mqttConnect.setMqttClient("admin", "public", new Callback());
        mqttConnect.sub("com/iot/init");
        mqttConnect.pub("com/iot/init", "Mr.Qu" + (int) (Math.random() * 100000000));
    }
}

3、编写MQTT的回调函数

package com.siborui.dc.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttMessage;

/**
 * 常规MQTT回调函数
 *
 * @author Mr.Qu
 * @since 2020/1/9 16:26
 */
@Slf4j
public class Callback implements MqttCallback {

    /**
     * MQTT 断开连接会执行此方法
     */
    @Override
    public void connectionLost(Throwable throwable) {
        log.info("断开了MQTT连接 :{}", throwable.getMessage());
        log.error(throwable.getMessage(), throwable);
    }

    /**
     * publish发布成功后会执行到这里
     */
    @Override
    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
        log.info("发布消息成功");
    }

    /**
     * subscribe订阅后得到的消息会执行到这里
     */
    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        //  TODO    此处可以将订阅得到的消息进行业务处理、数据存储
        log.info("收到来自 " + topic + " 的消息:{}", new String(message.getPayload()));
    }
}

4、由于业务场景需要,在项目启动时,监听MQTT主题Topic,编写MQTT监听器

package com.siborui.dc.mqtt;

import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.stereotype.Component;

import com.siborui.dc.mqtt.Callback;

/**
 * 项目启动 监听主题
 *
 * @author Mr.Qu
 * @since 2020/1/10
 */
@Slf4j
@Component
public class MQTTListener implements ApplicationListener<ContextRefreshedEvent> {

    private final MQTTConnect server;

    @Autowired
    public MQTTListener(MQTTConnect server) {
        this.server = server;
    }

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        try {
            server.setMqttClient("admin", "public", new Callback());
            server.sub("com/iot/init");
        } catch (MqttException e) {
            log.error(e.getMessage(), e);
        }
    }
}


5、源码传送门

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/140440.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 手把手教你做出数据可视化项目(七)可视化图表数据动态获取及界面跳转[通俗易懂]

    手把手教你做出数据可视化项目(七)可视化图表数据动态获取及界面跳转[通俗易懂]数据可视化前言:https://blog.csdn.net/diviner_s/article/details/115933789ApacheEcharts简介:https://blog.csdn.net/diviner_s/article/details/115934089项目最终效果图:此篇博客为自己学习pink老师的课后完成的项目的总结与记录,仅供交流参考。版权所有,转载请标注原作者!使用echarts技术做的可视图,此外其项目包含的技术有html、css、js、jquerry、aja.

  • ⭐️UI自动化控制微信发送或转发图片消息✨

    ⭐️UI自动化控制微信发送或转发图片消息✨在前面《UI自动化轻松解决微信手工群发消息的烦恼》一文中,我演示了如果使用python发送文本消息。前段时间有群友询问:虽然之前觉得太简单懒得做,但今天周末又想起这个问题,考虑到很多网友不知道如何实现,所以我今天再简单演示一下。uiautomation控制微信发送图片????其实原理非常简单,之前无非是将需要发送的文本复制到剪切板再粘贴,那么其实对于图片也一样,我们只需要将需要发送的图片放入剪切板即可,其他地方逻辑都一样。那么如何将图片放入剪切板呢?uiautomation已经提供了SetCli

  • docker 安装confluence 6.3.4 破解

    docker 安装confluence 6.3.4 破解

  • 异步FIFO理解[通俗易懂]

    一、异步FIFO与同步FIFO的区别 二、关键点及解决方法 三、深度的计算 四、整体结构图(style#1ifyouhavesawSNUGuserguide)SimulationandSynthesisTechniquesforAsynchronous的网盘链接链接:http://pan.baidu.com/s/1ntsqGjR密码:scf

  • 小熊工厂 bt[通俗易懂]

    小熊工厂 bt[通俗易懂]Welcometomyblog!小熊工厂 软件大小:20986KB软件语言:简体中文软件类别:国产软件/共享版/趣味软件应用平台:Win9x/NT/2000/XP/2003界面预览:无插件情况: 投诉更新时间:2006-09-2214:55:33下载次数:4426推荐等级:联系人:ricky408163.com开发商:小熊工厂

  • ZigBee协议栈简介

    ZigBee协议栈简介文章目录Zigbee协议栈简介如何理解Zigbee协议栈如何使用Zigbee协议栈Zigbee协议栈简介  Zigbee协议分为2部分:IEEE802.15.4定义了PHY(物理层)和MAC(介质访问层)技术规范。Zigbee联盟定义了NWK(网络层)、APS(应用程序支持层)、APL(应用层)技术规范。  Zigbee协议栈就是将各个层定义的协议都集合在一起,以函数的形式实现,并给用户提供API,用户可以直接调用。如何理解Zigbee协议栈  TI推出的ZigBee2007协议栈也

发表回复

您的电子邮箱地址不会被公开。

评论列表(1条)

  • silencezheng
    silencezheng 2022年8月25日 上午9:55

    请问硬件设备如何与mqtt建立连接呢?

关注全栈程序员社区公众号