大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺
基于java的多设备类型物联网架构实现
前言:19年11月开始从 【金融】行业转 【物联网】,路途坎坷,一个人摸索前进,不过也学到了很多新的东西,交了很多好朋友,在此感谢各位!
以下是一些经验分享,希望能帮到有需要的朋友。
1、架构思路
考虑了很久打算用springboot + mysql 去实现,因为熟悉这个框架,而且能减轻70%的机械性开发工作量,以后改springcloud也方便(注意逻辑实现不然工作量很大)。
物联网和互联网可以说是有共同点的,但是也有很多的不一样。
先说协议,互联网很多都是https或者http,但是物联网这块就不仅仅是这两种协议,会有UDP协议,TCP协议。
上干货:
环境:java+mysql+redis+rabbitMQ+Mqtt
图解:
这个是比较简单的逻辑图,里面的复杂逻辑还是不能说的。
这里面涉及到几个问题,好多物联网设备终端会有心跳,事件数据上来,怎么保证并发?入库的数据唯一?多种设备的数据上行,怎么存储?怎么管理?有多个第三方服务怎么分发数据?
咱们一点点说:
1、根据协议解析数据
终端上行的数据有433协议,有蓝牙的,有zigbee的,还有tcp、http的,还分1代、2代、3代等协议。我用了一个笨方法:根据协议的不同解析,存入不同的表
我整了个枚举类来存放不同的类型,在解析方法里面通过唯一性的一段上行数据去区分(为什么这么做?以为上行数据是16进制的)
HH0F22AEBB8200011100020001BB23AABB8637
“HH0F”就是A协议的特有字符,那这个就入A协议库。
A协议是A设备专用的,那建表就是这样的
A_tag —————— 设备表
A_data ——————设备数据表
A_gateway ——————网关表
那么问题来了,要是10种设备不得30个表了?
做过物联网的都知道,不同类型的设备可能带的属性都是不一样的,有的设备可能就3个:
电量、包序、特征值
但是有的设备可能就不止了,比方说:
电量、包序、心跳、呼吸、体温、动态值。。。。
data表中数据不得爆掉?——我深思熟虑也只是想到通用字段存储,data1~data20
多设备类型的暂时解决了,后面怎么具体操作呢?
2、怎么保证数据不会重复并快速入库
上图我采用了两个服务,一个接受服务,一个处理服务,具体处理方法:
接受服务将受到的数据转base64编码后直接发到消息队列(rabbitMQ),处理服务监听rabbitMQ消息,走解析服务。就是这么简单!
但是,有一点,rabbitMQ要开启ack模式!!!而且可以用负载来做这步!
问题二:设备一多,并发问题就来了,怎么搞?
Semaphore 这可是个好东西
如果来一条数据就insert,哪个数据库都受不了,后来采用批量方式插入:
// ----------------------数据批量入库开始-------------------------
private static List<NnData> listNnObjectSaveDO = new ArrayList<NnData>();
private static Long startTimeNnData = 0L;
//private static int countSend = 0;
private static Semaphore semaphoreNnData = new Semaphore(1);
@Override
public void saveDO(NnData nnData) {
try {
semaphoreNnData.acquire();
if (startTimeNnData == 0L) {
startTimeNnData = System.currentTimeMillis();
}
listNnObjectSaveDO.add(nnData);
if (System.currentTimeMillis() - startTimeNnData > 2000) {
List<NnData> listSaveDOOne = new ArrayList<NnData>();
listSaveDOOne.addAll(listNnObjectSaveDO);
asyncInsertBatch(listSaveDOOne);
listNnObjectSaveDO.clear();
startTimeNnData = 0L;
}
} catch (Exception e) {
e.printStackTrace();
} finally {
semaphoreNnData.release();
}
}
@Async
private void asyncInsertBatch(List<NnData> list) {
this.insertBatch(list);
//countSend = countSend + list.size();
//System.out.println("=================入库数据条数:" + countSend);
}
// ----------------------数据批量入库结束-------------------------
我做过测试,这样的方式远超2000条/秒,有眼尖的朋友看出来了:
System.currentTimeMillis() - startTimeNnData > 2000 //2秒批量入库一次
暂时就到这儿,后续想到我再补充!
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/192118.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...