DataHub Java接入实时数据

DataHub Java接入实时数据DataHubJava接入实时数据序言问题代码总结序言Datahub的相关介绍和优势,我在这里就不一一赘述,留个官方文档的连接([DataHub官方文档](https://help.aliyun.com/document_detail/47439.html?spm=a2c0j.8235941.654670.ddoc.26d91a22JWAbt9)),大家可以自己去看看。我想在这里记录的是…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

DataHub Java接入实时数据

序言

  Datahub的相关介绍和优势,我在这里就不一一赘述。大家可以自己去看官方文档。我想在这里记录的是我做这个
  需求中遇到的一点问题和它们的解决办法,如果大家有更好的思路和办法,欢迎大家指正。

Jetbrains全家桶1年46,售后保障稳定

问题

1、在项目开始是只运行一次任务代码,即DataHub的接流任务。由于写博客的时间距离我当初写代码的时间比较久了,
我已经记不得这个问题当初具体的详细情景,项目用的spring的框架。在网上也找了很久的办法,最后,总之,我用
了一个取巧的方法,利用spring的定时任务,每十年执行一次,相信应该没有一个项目会连续在服务器上跑十年吧。。。
/** * 一次性定时任务,每十年执行一次 */
@Scheduled(fixedRate = 1000*60*60*24*365*10)
2、第二个问题是关于DataHub的游标问题。在早期的DataHub的产品中并没有提供游标的存储,用户需要自己存储游
标,以便在项目重启后、或接流异常中断以后继续读取数据。当然,目前的DataHub已经支持游标的存储,只需要我们
进行简单的配置。在这里,需要注意一点,一个project下多个topic只要一个游标就可以了。这块可以去看官方的示
例代码,对着改就行了。
3、利用多线程对多个topic进行接流。个人认为,这是我多线程目前为止用的最好的一次了。
4、几个线程池概念,这块有机会还是要重新理一下。
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,
则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照
指定顺序(FIFO, LIFO, 优先级)执行。

代码

package com.bywin.apsarabrain.trafficgis.web.api.road.task;
import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.common.data.Field;
import com.aliyun.datahub.common.data.FieldType;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.*;
import com.bywin.apsarabrain.trafficgis.biz.service.IDwdTfcEvtSftAmapreportRtServ;
import com.bywin.apsarabrain.trafficgis.dal.entity.DwdTfcEvtSftAmapreportRtEntity;
import com.bywin.apsarabrain.trafficgisb.common.util.StringUtil;
import org.codehaus.jackson.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
/** * @Author: zyye * @Date: 2018/10/29 22:00 * @Description: 交通事件(一次性定时任务,每十年执行一次) */
@Component("TrafficEventTask")
public class TrafficEventTask { 

private static final Logger LOGGER = LoggerFactory.getLogger("TrafficEventTask");
@Autowired
private IDwdTfcEvtSftAmapreportRtServ serv;
@Value("${dataHub.accessId}")
private String accessId;
@Value("${dataHub.accessKey}")
private String accessKey;
@Value("${dataHub.endpoint}")
private String endpoint;
@Value("${dataHub.projectName}")
private String projectName;
@Value("${dataHub.topicName}")
private String topicName;
@Value("${dataHub.subId}")
private String subId;
DatahubConfiguration conf;
private DatahubClient client;
/** * 一个基于数组结构的有界阻塞队列,按FIFO原则进行排序 */
static BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);
/** * 线程池 */
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10,
1, TimeUnit.MINUTES, blockingQueue);
/** * 一次性定时任务,每十年执行一次 */
@Scheduled(fixedRate = 1000*60*60*24*365*10)
public void run() { 

LOGGER.info("--------------run trafficEventTask----------------");
//多台服务器,只需要一台服务器执行定时任务,通过路径判断
if (StringUtil.isRunTask()){ 

LOGGER.info("++++++++++++++++++++runing trafficEventTask++++++++++++++++++++++++");
AliyunAccount account = new AliyunAccount(accessId, accessKey);
conf = new DatahubConfiguration(account, endpoint);
client = new DatahubClient(conf);
//查询有几个shard
ListShardResult listShardResult = client.listShard(projectName, topicName);
//根据shard的数量创建线程消费
for(int i=0;i<listShardResult.getShards().size();i++)
{ 

String shardId = listShardResult.getShards().get(i).getShardId();
Runnable runnable=new TaskWithoutResult(shardId);
threadPoolExecutor.submit(runnable);
}
//threadPoolExecutor.execute(()-> System.out.println(Thread.currentThread().getName()));
//threadPoolExecutor.shutdown();//不会触发中断
//threadPoolExecutor.shutdownNow();//会触发中断
}
}
/** * 无返回值的多线程任务 * @author zyye * */
class TaskWithoutResult implements Runnable
{ 

/** * dataHub的shardId */
private String shardId;
public TaskWithoutResult(String shardId)
{ 

this.shardId=shardId;
}
@Override
public void run(){ 

LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"开始运行");
try { 

task(shardId);
} catch (Exception e) { 

//捕捉中断异常
LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"被中断");
}
LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"结束运行");
}
}
/** * 根据dataHub的shardId从多线程里读取数据 * @param shardId */
private void task(String shardId) { 

LOGGER.info("开始执行dataHub任务!shardId="+shardId);
//获得dataHub上对应的字段
RecordSchema schema = new RecordSchema();
schema.addField(new Field("src_evt_id", FieldType.STRING));
schema.addField(new Field("gmt_create", FieldType.STRING));
schema.addField(new Field("lng", FieldType.DOUBLE));
schema.addField(new Field("lat", FieldType.DOUBLE));
schema.addField(new Field("geohash", FieldType.STRING));
schema.addField(new Field("evt_desc", FieldType.STRING));
schema.addField(new Field("evt_start_time", FieldType.STRING));
schema.addField(new Field("evt_end_time", FieldType.STRING));
schema.addField(new Field("evt_update_time", FieldType.STRING));
schema.addField(new Field("evt_detail", FieldType.STRING));
schema.addField(new Field("evt_type_no", FieldType.BIGINT));
schema.addField(new Field("sub_evt_type_no", FieldType.BIGINT));
schema.addField(new Field("nick_name", FieldType.STRING));
schema.addField(new Field("picture", FieldType.STRING));
schema.addField(new Field("road_name", FieldType.STRING));
schema.addField(new Field("region_shape", FieldType.STRING));
schema.addField(new Field("dt", FieldType.STRING));
schema.addField(new Field("adcode", FieldType.STRING));
try { 

boolean bExit = false;
GetTopicResult topicResult = client.getTopic(projectName, topicName);
// 首先初始化offset上下文
OffsetContext offsetCtx = client.initOffsetContext(projectName, topicName, subId, shardId);
// 开始消费的cursor
String cursor = null;
if (!offsetCtx.hasOffset()) { 

// 之前没有存储过点位,先获取初始点位,比如这里获取当前该shard最早的数据
GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
cursor = cursorResult.getCursor();
} else { 

// 否则,获取当前已消费点位的下一个cursor
cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
}
// System.out.println("Start consume records, begin offset context:" + offsetCtx.toObjectNode().toString()
// + ", cursor:" + cursor);
long recordNum = 0L;
while (!bExit) { 

try { 

GetRecordsResult recordResult = client.getRecords(projectName, topicName, shardId, cursor, 10,
topicResult.getRecordSchema());
List<RecordEntry> records = recordResult.getRecords();
if (records.size() == 0) { 

// 将最后一次消费点位上报
client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
// 可以先休眠(30秒)一会,再继续消费新记录
Thread.sleep(1000*30);
// System.out.println("sleep 1s and continue consume records! shard id:" + shardId);
} else { 

//将dataHub的数据序列化以后存到数据库
List<DwdTfcEvtSftAmapreportRtEntity> eventList = new ArrayList<>();
for (RecordEntry record : records) { 

// 处理记录逻辑
DwdTfcEvtSftAmapreportRtEntity dwdTfcEvtSftAmapreportRtEntity = new DwdTfcEvtSftAmapreportRtEntity();
JsonNode jsonNode = record.toJsonNode().get("Data");
// System.out.println(jsonNode.toString());
eventList.add(dwdTfcEvtSftAmapreportRtEntity.jsonNodeToEntity(jsonNode));
// 上报点位,该示例是每处理100条记录上报一次点位
offsetCtx.setOffset(record.getOffset());
recordNum++;
if (recordNum % 100 == 0) { 

client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
}
}
serv.batchInsert(eventList);
cursor = recordResult.getNextCursor();
}
} catch (SubscriptionOfflineException e) { 

// 订阅下线,退出
bExit = true;
e.printStackTrace();
} catch (OffsetResetedException e) { 

// 点位被重置,更新offset上下文
client.updateOffsetContext(offsetCtx);
cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
LOGGER.info("Restart consume shard:" + shardId + ", reset offset:"
+ offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
} catch (OffsetSessionChangedException e) { 

// 其他consumer同时消费了该订阅下的相同shard,退出
bExit = true;
e.printStackTrace();
} catch (Exception e) { 

//bExit = true;
//当线程意外中断时,等待一段时间再继续运行,而不是退出
LOGGER.info("thread shardId="+shardId+"is Exception........................");
Thread.sleep(1000*30);
e.printStackTrace();
}
}
} catch (Exception e) { 

e.printStackTrace();
}
}
}

总结

一个人的未来永远不会取决于你目前所在的位置,而是取决你的心想要到达的地方。
我希望这是一个开始,我期待生活中的更多可能性。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/234670.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Pycharm中安装Pygame方法「建议收藏」

    Pycharm中安装Pygame方法「建议收藏」本文转自:https://blog.csdn.net/zhangffyy/article/details/78524592第一步:打开Pycharm第二步:点File-&amp;amp;amp;gt;DefaultSettings-&amp;amp;amp;gt;ProjectInterpreter-&amp;amp;amp;gt;点加号第三步:搜索Pygame-&amp;amp;amp;gt;InstallPackage然后就安装好了,新建一个p

  • python使用教程_新手python入门教程

    python使用教程_新手python入门教程作者:Vamei出处:http://www.cnblogs.com/vamei欢迎转载,也请保留这段声明。谢谢!怎么能快速地掌握Python?这是和朋友闲聊时谈起的问题。Python包含的内容

  • PECP协议_rtcp协议

    PECP协议_rtcp协议  1.PathComputationElementProtocol(PCEP) asasouthboundplugininONOS.路径计算单元协议,ONOS的南向接口协议之一。(来源https://wiki.onosproject.org/display/ONOS/PCEP+Protocol) 2.ONOS是专门面向服务提供商和企业骨干网的开源SDN网…

  • 深入理解双线性插值算法

    深入理解双线性插值算法引言看了好几篇关于双线性插值算法的博文,解释得都不好理解,不过下面这篇博文就解释得很好,以下内容均参考这篇:图像处理+双线性插值法双线性插值算法双线性插值算法是解决什么问题的(原理)?在图像的仿射变换中,很多地方需要用到插值运算,常见的插值运算包括最邻近插值、双线性插值、双三次插值、兰索思插值等方法,OpenCV提供了很多方法,其中,双线性插值由于折中的插值效果和运算速度,运用比较广…

  • phpstorm2021.3.19 激活码破解方法

    phpstorm2021.3.19 激活码破解方法,https://javaforall.cn/100143.html。详细ieda激活码不妨到全栈程序员必看教程网一起来了解一下吧!

  • mybatisplus代码生成器

    官方文档:https://mp.baomidou.com/guide/逆向工程链接:https://pan.baidu.com/s/1FloqrIhI2d1ns4XgvYPIkA目录结构:生成:xml映射文件,mapper接口,service接口与实现,controller类,实体类与AR;XML映射文件可以不与mapper放一个包,可以自己…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号