大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全家桶1年46,售后保障稳定
序言
Datahub的相关介绍和优势,我在这里就不一一赘述。大家可以自己去看官方文档。我想在这里记录的是我做这个
需求中遇到的一点问题和它们的解决办法,如果大家有更好的思路和办法,欢迎大家指正。
问题
1、在项目开始是只运行一次任务代码,即DataHub的接流任务。由于写博客的时间距离我当初写代码的时间比较久了,
我已经记不得这个问题当初具体的详细情景,项目用的spring的框架。在网上也找了很久的办法,最后,总之,我用
了一个取巧的方法,利用spring的定时任务,每十年执行一次,相信应该没有一个项目会连续在服务器上跑十年吧。。。
/** * 一次性定时任务,每十年执行一次 */
@Scheduled(fixedRate = 1000*60*60*24*365*10)
2、第二个问题是关于DataHub的游标问题。在早期的DataHub的产品中并没有提供游标的存储,用户需要自己存储游
标,以便在项目重启后、或接流异常中断以后继续读取数据。当然,目前的DataHub已经支持游标的存储,只需要我们
进行简单的配置。在这里,需要注意一点,一个project下多个topic只要一个游标就可以了。这块可以去看官方的示
例代码,对着改就行了。
3、利用多线程对多个topic进行接流。个人认为,这是我多线程目前为止用的最好的一次了。
4、几个线程池概念,这块有机会还是要重新理一下。
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,
则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照
指定顺序(FIFO, LIFO, 优先级)执行。
代码
package com.bywin.apsarabrain.trafficgis.web.api.road.task;
import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.common.data.Field;
import com.aliyun.datahub.common.data.FieldType;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.*;
import com.bywin.apsarabrain.trafficgis.biz.service.IDwdTfcEvtSftAmapreportRtServ;
import com.bywin.apsarabrain.trafficgis.dal.entity.DwdTfcEvtSftAmapreportRtEntity;
import com.bywin.apsarabrain.trafficgisb.common.util.StringUtil;
import org.codehaus.jackson.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/** * @Author: zyye * @Date: 2018/10/29 22:00 * @Description: 交通事件(一次性定时任务,每十年执行一次) */
@Component("TrafficEventTask")
public class TrafficEventTask {
private static final Logger LOGGER = LoggerFactory.getLogger("TrafficEventTask");
@Autowired
private IDwdTfcEvtSftAmapreportRtServ serv;
@Value("${dataHub.accessId}")
private String accessId;
@Value("${dataHub.accessKey}")
private String accessKey;
@Value("${dataHub.endpoint}")
private String endpoint;
@Value("${dataHub.projectName}")
private String projectName;
@Value("${dataHub.topicName}")
private String topicName;
@Value("${dataHub.subId}")
private String subId;
DatahubConfiguration conf;
private DatahubClient client;
/** * 一个基于数组结构的有界阻塞队列,按FIFO原则进行排序 */
static BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
/** * 线程池 */
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10,
1, TimeUnit.MINUTES, blockingQueue);
/** * 一次性定时任务,每十年执行一次 */
@Scheduled(fixedRate = 1000*60*60*24*365*10)
public void run() {
LOGGER.info("--------------run trafficEventTask----------------");
//多台服务器,只需要一台服务器执行定时任务,通过路径判断
if (StringUtil.isRunTask()){
LOGGER.info("++++++++++++++++++++runing trafficEventTask++++++++++++++++++++++++");
AliyunAccount account = new AliyunAccount(accessId, accessKey);
conf = new DatahubConfiguration(account, endpoint);
client = new DatahubClient(conf);
//查询有几个shard
ListShardResult listShardResult = client.listShard(projectName, topicName);
//根据shard的数量创建线程消费
for(int i=0;i<listShardResult.getShards().size();i++)
{
String shardId = listShardResult.getShards().get(i).getShardId();
Runnable runnable=new TaskWithoutResult(shardId);
threadPoolExecutor.submit(runnable);
}
//threadPoolExecutor.execute(()-> System.out.println(Thread.currentThread().getName()));
//threadPoolExecutor.shutdown();//不会触发中断
//threadPoolExecutor.shutdownNow();//会触发中断
}
}
/** * 无返回值的多线程任务 * @author zyye * */
class TaskWithoutResult implements Runnable
{
/** * dataHub的shardId */
private String shardId;
public TaskWithoutResult(String shardId)
{
this.shardId=shardId;
}
@Override
public void run(){
LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"开始运行");
try {
task(shardId);
} catch (Exception e) {
//捕捉中断异常
LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"被中断");
}
LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"结束运行");
}
}
/** * 根据dataHub的shardId从多线程里读取数据 * @param shardId */
private void task(String shardId) {
LOGGER.info("开始执行dataHub任务!shardId="+shardId);
//获得dataHub上对应的字段
RecordSchema schema = new RecordSchema();
schema.addField(new Field("src_evt_id", FieldType.STRING));
schema.addField(new Field("gmt_create", FieldType.STRING));
schema.addField(new Field("lng", FieldType.DOUBLE));
schema.addField(new Field("lat", FieldType.DOUBLE));
schema.addField(new Field("geohash", FieldType.STRING));
schema.addField(new Field("evt_desc", FieldType.STRING));
schema.addField(new Field("evt_start_time", FieldType.STRING));
schema.addField(new Field("evt_end_time", FieldType.STRING));
schema.addField(new Field("evt_update_time", FieldType.STRING));
schema.addField(new Field("evt_detail", FieldType.STRING));
schema.addField(new Field("evt_type_no", FieldType.BIGINT));
schema.addField(new Field("sub_evt_type_no", FieldType.BIGINT));
schema.addField(new Field("nick_name", FieldType.STRING));
schema.addField(new Field("picture", FieldType.STRING));
schema.addField(new Field("road_name", FieldType.STRING));
schema.addField(new Field("region_shape", FieldType.STRING));
schema.addField(new Field("dt", FieldType.STRING));
schema.addField(new Field("adcode", FieldType.STRING));
try {
boolean bExit = false;
GetTopicResult topicResult = client.getTopic(projectName, topicName);
// 首先初始化offset上下文
OffsetContext offsetCtx = client.initOffsetContext(projectName, topicName, subId, shardId);
// 开始消费的cursor
String cursor = null;
if (!offsetCtx.hasOffset()) {
// 之前没有存储过点位,先获取初始点位,比如这里获取当前该shard最早的数据
GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
cursor = cursorResult.getCursor();
} else {
// 否则,获取当前已消费点位的下一个cursor
cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
}
// System.out.println("Start consume records, begin offset context:" + offsetCtx.toObjectNode().toString()
// + ", cursor:" + cursor);
long recordNum = 0L;
while (!bExit) {
try {
GetRecordsResult recordResult = client.getRecords(projectName, topicName, shardId, cursor, 10,
topicResult.getRecordSchema());
List<RecordEntry> records = recordResult.getRecords();
if (records.size() == 0) {
// 将最后一次消费点位上报
client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
// 可以先休眠(30秒)一会,再继续消费新记录
Thread.sleep(1000*30);
// System.out.println("sleep 1s and continue consume records! shard id:" + shardId);
} else {
//将dataHub的数据序列化以后存到数据库
List<DwdTfcEvtSftAmapreportRtEntity> eventList = new ArrayList<>();
for (RecordEntry record : records) {
// 处理记录逻辑
DwdTfcEvtSftAmapreportRtEntity dwdTfcEvtSftAmapreportRtEntity = new DwdTfcEvtSftAmapreportRtEntity();
JsonNode jsonNode = record.toJsonNode().get("Data");
// System.out.println(jsonNode.toString());
eventList.add(dwdTfcEvtSftAmapreportRtEntity.jsonNodeToEntity(jsonNode));
// 上报点位,该示例是每处理100条记录上报一次点位
offsetCtx.setOffset(record.getOffset());
recordNum++;
if (recordNum % 100 == 0) {
client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
}
}
serv.batchInsert(eventList);
cursor = recordResult.getNextCursor();
}
} catch (SubscriptionOfflineException e) {
// 订阅下线,退出
bExit = true;
e.printStackTrace();
} catch (OffsetResetedException e) {
// 点位被重置,更新offset上下文
client.updateOffsetContext(offsetCtx);
cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
LOGGER.info("Restart consume shard:" + shardId + ", reset offset:"
+ offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
} catch (OffsetSessionChangedException e) {
// 其他consumer同时消费了该订阅下的相同shard,退出
bExit = true;
e.printStackTrace();
} catch (Exception e) {
//bExit = true;
//当线程意外中断时,等待一段时间再继续运行,而不是退出
LOGGER.info("thread shardId="+shardId+"is Exception........................");
Thread.sleep(1000*30);
e.printStackTrace();
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
总结
一个人的未来永远不会取决于你目前所在的位置,而是取决你的心想要到达的地方。
我希望这是一个开始,我期待生活中的更多可能性。
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/234670.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...