java高并发下数据入库

java高并发下数据入库java高并发下数据批量入库该服务利用线程池并结合缓存类来处理高并发下数据入库问题,做到实时数据存入redis和数据批量入库,使用的时候需要修改为自己的业务数据,该服务暂时适合下面两种情况:1、达到设置的超时时间。2、达到最大批次。packageio.renren.service.impl;importcom.alibaba.fastjson.JSON;importcom.alibaba.fastjson.JSONArray;importlombok.extern.slf4j.Slf

大家好,又见面了,我是你们的朋友全栈君。

java高并发下数据入库

该服务利用线程池并结合缓存类来处理高并发下数据入库问题,做到实时数据存入redis和数据批量入库,使用的时候需要修改为自己的业务数据,该模块是根据下面的设置进行高并发处理。
1、达到设置的超时时间。
2、达到最大批次。

package io.jack.service.impl;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/** * <pre> * 数据批量入库服务 * </pre> * Created by RuiXing Hou on 2021-08-05. * * @since 1.0 */
@Component
@Slf4j
public class BatchDataStorageService implements InitializingBean
{ 

/** * 最大批次数量 */
@Value("${app.db.maxBatchCount:800}")
private int maxBatchCount;
/** * 最大线程数 */
@Value("${app.db.maxBatchThreads:100}")
private int maxBatchThreads;
/** * 超时时间 */
@Value("${app.db.batchTimeout:3000}")
private int batchTimeout;
/** * 批次数量 */
private int batchCount = 0;
/** * 批次号 */
private static long batchNo = 0;
/** * 线程池定义接口 */
private ExecutorService executorService = null;
/** * 服务器缓存工具类,下面提供源码 */
@Resource
private CacheService cacheService;
/** * 业务接口 */
@Resource
private DeviceRealTimeService deviceRealTimeService;
/** * redis工具类 */
@Resource
private RedisUtils redisUtils;
@Override
public void afterPropertiesSet() { 

this.executorService = Executors.newFixedThreadPool(this.maxBatchThreads, r -> { 

Thread thread = new Thread(r);
if (r instanceof BatchWorker) { 

thread.setName("batch-worker-" + ((BatchWorker) r).batchKey);
}
return thread;
});
}
/** * 需要做高并发处理的类只需要调用该方法 (我用的是rabbitMq) * * @param deviceRealTimeDTO */
public void saveRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { 

final String failedCacheKey = "device:real_time:failed_records";
try { 

String durationKey = "device:real_time:batchDuration" + batchNo;
String batchKey = "device:real_time:batch" + batchNo;
if (!cacheService.exists(durationKey)) { 

cacheService.put(durationKey, System.currentTimeMillis());
new BatchTimeoutCommitThread(batchKey, durationKey, failedCacheKey).start();
}
cacheService.lPush(batchKey, deviceRealTimeDTO);
if (++batchCount >= maxBatchCount) { 

// 达到最大批次,执行入库逻辑
dataStorage(durationKey, batchKey, failedCacheKey);
}
} catch (Exception ex) { 

log.warn("[DB:FAILED] 设备上报记录入批处理集合异常: " + ex.getMessage() + ", DeviceRealTimeDTO: " + JSON.toJSONString(deviceRealTimeDTO), ex);
cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
} finally { 

updateRealTimeData(deviceRealTimeDTO);
}
}
/** * 更新实时数据 * @param deviceRealTimeDTO 业务POJO */
private void updateRealTimeData(DeviceRealTimeDTO deviceRealTimeDTO) { 

redisUtils.set("real_time:"+deviceRealTimeDTO.getDeviceId(), JSONArray.toJSONString(deviceRealTimeDTO));
}
/** * * @param durationKey 持续时间标识 * @param batchKey 批次标识 * @param failedCacheKey 错误标识 */
private void dataStorage(String durationKey, String batchKey, String failedCacheKey) { 

batchNo++;
batchCount = 0;
cacheService.del(durationKey);
if (batchNo >= Long.MAX_VALUE) { 

batchNo = 0;
}
executorService.execute(new BatchWorker(batchKey, failedCacheKey));
}
private class BatchWorker implements Runnable
{ 

private final String failedCacheKey;
private final String batchKey;
public BatchWorker(String batchKey, String failedCacheKey) { 

this.batchKey = batchKey;
this.failedCacheKey = failedCacheKey;
}
@Override
public void run() { 

final List<DeviceRealTimeDTO> deviceRealTimeDTOList = new ArrayList<>();
try { 

DeviceRealTimeDTO deviceRealTimeDTO = cacheService.lPop(batchKey);
while(deviceRealTimeDTO != null) { 

deviceRealTimeDTOList.add(deviceRealTimeDTO);
deviceRealTimeDTO = cacheService.lPop(batchKey);
}
long timeMillis = System.currentTimeMillis();
try { 

List<DeviceRealTimeEntity> deviceRealTimeEntityList = ConvertUtils.sourceToTarget(deviceRealTimeDTOList, DeviceRealTimeEntity.class);
deviceRealTimeService.insertBatch(deviceRealTimeEntityList);
} finally { 

cacheService.del(batchKey);
log.info("[DB:BATCH_WORKER] 批次:" + batchKey + ",保存设备上报记录数:" + deviceRealTimeDTOList.size() + ", 耗时:" + (System.currentTimeMillis() - timeMillis) + "ms");
}
} catch (Exception e) { 

log.warn("[DB:FAILED] 设备上报记录批量入库失败:" + e.getMessage() + ", DeviceRealTimeDTO: " + deviceRealTimeDTOList.size(), e);
for (DeviceRealTimeDTO deviceRealTimeDTO : deviceRealTimeDTOList) { 

cacheService.lPush(failedCacheKey, deviceRealTimeDTO);
}
}
}
}
class BatchTimeoutCommitThread extends Thread { 

private final String batchKey;
private final String durationKey;
private final String failedCacheKey;
public BatchTimeoutCommitThread(String batchKey, String durationKey, String failedCacheKey) { 

this.batchKey = batchKey;
this.durationKey = durationKey;
this.failedCacheKey = failedCacheKey;
this.setName("batch-thread-" + batchKey);
}
public void run() { 

try { 

Thread.sleep(batchTimeout);
} catch (InterruptedException e) { 

log.error("[DB] 内部错误,直接提交:" + e.getMessage());
}
if (cacheService.exists(durationKey)) { 

// 达到最大批次的超时间,执行入库逻辑
dataStorage(durationKey, batchKey, failedCacheKey);
}
}
}
}
package io.jack.service;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@Component
@Scope("singleton")
public class CacheService implements InitializingBean { 

private Map<String, Object> objectCache = new ConcurrentHashMap<>();
private Map<String, AtomicLong> statCache = new ConcurrentHashMap<>();
@Override
public void afterPropertiesSet() { 

statCache.put("terminals", new AtomicLong(0));
statCache.put("connections", new AtomicLong(0));
}
public long incr(String statName) { 

if (!statCache.containsKey(statName))
statCache.put(statName, new AtomicLong(0));
return statCache.get(statName).incrementAndGet();
}
public long decr(String statName) { 

if (!statCache.containsKey(statName))
statCache.put(statName, new AtomicLong(0));
return statCache.get(statName).decrementAndGet();
}
public long stat(String statName) { 

if (!statCache.containsKey(statName))
statCache.put(statName, new AtomicLong(0));
return statCache.get(statName).get();
}
public <T> void put(String key, T object) { 

objectCache.put(key, object);
}
public <T> T get(String key) { 

return (T) objectCache.get(key);
}
public void remove(String key) { 

objectCache.remove(key);
}
public void hSet(String key, String subkey, Object value) { 

synchronized (objectCache) { 

HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
if (submap == null) { 

submap = new HashMap<>();
objectCache.put(key, submap);
}
submap.put(subkey, value);
}
}
public <T> T hGet(String key, String subkey) { 

synchronized (objectCache) { 

HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
if (submap != null) { 

return (T) submap.get(subkey);
}
return null;
}
}
public boolean hExists(String key, String subkey) { 

synchronized (objectCache) { 

HashMap<String, Object> submap = (HashMap<String, Object>) objectCache.get(key);
if (submap != null) { 

return submap.containsKey(subkey);
}
return false;
}
}
public void lPush(String key, Object value) { 

synchronized (objectCache) { 

LinkedList queue = (LinkedList) objectCache.get (key);
if (queue == null) { 

queue = new LinkedList();
objectCache.put(key, queue);
}
queue.addLast(value);
}
}
public <T> T lPop(String key) { 

synchronized (objectCache) { 

LinkedList queue = (LinkedList) objectCache.get (key);
if (queue != null) { 

if (!queue.isEmpty()) { 

return (T)queue.removeLast();
}
objectCache.remove(key);
}
return null;
}
}
public void del(String key) { 

objectCache.remove(key);
}
public boolean exists(String key) { 

return objectCache.containsKey(key);
}
public void dump() { 

}
}
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/144596.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • zookeeper入门(1)「建议收藏」

    zookeeper入门(1)「建议收藏」zookeeper应用场景zookeeper特点zookeeper数据模型Ubuntu配置zookeeper是一个典型的分布式数据一致性解决方案,致力于为分布式应用提供一个高性能,高可用,且具有严格顺序访问控制能力的分布式协调存储服务应用场景维护配置信息分布式锁服务集群管理生成分布式唯一ID维护配置信息如java编程经常遇到配置项,比如数据路连接的url,password等等。通常这些配置文件需要放在服务器上,但需要更改配置文件的时候需要去服务器上更改。但是随着分布式系统的兴起,由于

  • [leetcode]Best Time to Buy and Sell Stock II @ Python

    [leetcode]Best Time to Buy and Sell Stock II @ Python

  • JDK环境变量配置

    JDK环境变量配置一.下载JDK安装包并安装JDK下载链接二.JDK环境变量配置1.右击我的电脑->属性->高级系统设置->环境变量2.在系统变量区域新建一个JAVA_HOME,变量值为上一步JDK安装目录3.编辑PATH变量,新增环境变量%JAVA_HOME%\bin4.新增系统变量CLASSPATH,变量值输入.;%JAVA_HOME%\lib;%JAVA_HOME%\lib\tools.jar(注意最前面有一点)5.连续点击确定保存之后,打开命令提示符窗口,输入“java-v

  • ICP证书_dwcc2018怎么用

    ICP证书_dwcc2018怎么用输入44 21 2 4 84 0100 99 98 972 210000 100005 30 0 0 0 1696RichmanImpossible代码#include<bits/stdc++.h>using namespace std;typedef long long ll;const int N = 1e5 + 10;int a[N];int main(){ int T; cin>>T; while(T -..

  • 自动阅读 到底 能不能赚钱

    自动阅读 到底 能不能赚钱

    2021年11月11日
  • 碟刹和V刹的区别「建议收藏」

    碟刹和V刹的区别「建议收藏」0首先拍死的一个观点就是碟刹比V刹要好,要高档──似乎大部分对于运动自行车陌生新手往往认为碟刹一定比V刹要好,我们听到过这样的话:“都2000多的车了,还没有碟刹”───这样的话真的让人哭笑不得,看看不论是国外的比赛还是国内的专业比赛,如果是晴天的比赛,V刹车还是占了大部分的,当然目前也有碟刹车增多的趋势,但是对于大部分休闲骑行和不参加业余级别比赛的车友的来说,V刹尤其是好些的V刹还是够用的,…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号