DataHub Java接入实时数据

DataHub Java接入实时数据DataHubJava接入实时数据序言问题代码总结序言Datahub的相关介绍和优势,我在这里就不一一赘述,留个官方文档的连接([DataHub官方文档](https://help.aliyun.com/document_detail/47439.html?spm=a2c0j.8235941.654670.ddoc.26d91a22JWAbt9)),大家可以自己去看看。我想在这里记录的是…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全家桶1年46,售后保障稳定

DataHub Java接入实时数据

序言

  Datahub的相关介绍和优势,我在这里就不一一赘述。大家可以自己去看官方文档。我想在这里记录的是我做这个
  需求中遇到的一点问题和它们的解决办法,如果大家有更好的思路和办法,欢迎大家指正。

Jetbrains全家桶1年46,售后保障稳定

问题

1、在项目开始是只运行一次任务代码,即DataHub的接流任务。由于写博客的时间距离我当初写代码的时间比较久了,
我已经记不得这个问题当初具体的详细情景,项目用的spring的框架。在网上也找了很久的办法,最后,总之,我用
了一个取巧的方法,利用spring的定时任务,每十年执行一次,相信应该没有一个项目会连续在服务器上跑十年吧。。。
/** * 一次性定时任务,每十年执行一次 */
@Scheduled(fixedRate = 1000*60*60*24*365*10)
2、第二个问题是关于DataHub的游标问题。在早期的DataHub的产品中并没有提供游标的存储,用户需要自己存储游
标,以便在项目重启后、或接流异常中断以后继续读取数据。当然,目前的DataHub已经支持游标的存储,只需要我们
进行简单的配置。在这里,需要注意一点,一个project下多个topic只要一个游标就可以了。这块可以去看官方的示
例代码,对着改就行了。
3、利用多线程对多个topic进行接流。个人认为,这是我多线程目前为止用的最好的一次了。
4、几个线程池概念,这块有机会还是要重新理一下。
Java通过Executors提供四种线程池,分别为:
newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,
则新建线程。
newFixedThreadPool 创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
newScheduledThreadPool 创建一个定长线程池,支持定时及周期性任务执行。
newSingleThreadExecutor 创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照
指定顺序(FIFO, LIFO, 优先级)执行。

代码

package com.bywin.apsarabrain.trafficgis.web.api.road.task;

import com.aliyun.datahub.DatahubClient;
import com.aliyun.datahub.DatahubConfiguration;
import com.aliyun.datahub.auth.AliyunAccount;
import com.aliyun.datahub.common.data.Field;
import com.aliyun.datahub.common.data.FieldType;
import com.aliyun.datahub.common.data.RecordSchema;
import com.aliyun.datahub.exception.OffsetResetedException;
import com.aliyun.datahub.exception.OffsetSessionChangedException;
import com.aliyun.datahub.exception.SubscriptionOfflineException;
import com.aliyun.datahub.model.*;
import com.bywin.apsarabrain.trafficgis.biz.service.IDwdTfcEvtSftAmapreportRtServ;
import com.bywin.apsarabrain.trafficgis.dal.entity.DwdTfcEvtSftAmapreportRtEntity;
import com.bywin.apsarabrain.trafficgisb.common.util.StringUtil;
import org.codehaus.jackson.JsonNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;

/** * @Author: zyye * @Date: 2018/10/29 22:00 * @Description: 交通事件(一次性定时任务,每十年执行一次) */
@Component("TrafficEventTask")
public class TrafficEventTask { 
   

    private static final Logger LOGGER = LoggerFactory.getLogger("TrafficEventTask");

    @Autowired
    private IDwdTfcEvtSftAmapreportRtServ serv;

    @Value("${dataHub.accessId}")
    private String accessId;

    @Value("${dataHub.accessKey}")
    private String accessKey;

    @Value("${dataHub.endpoint}")
    private String endpoint;

    @Value("${dataHub.projectName}")
    private String projectName;

    @Value("${dataHub.topicName}")
    private String topicName;

    @Value("${dataHub.subId}")
    private String subId;

    DatahubConfiguration conf;

    private DatahubClient client;

    /** * 一个基于数组结构的有界阻塞队列,按FIFO原则进行排序 */
    static BlockingQueue blockingQueue = new ArrayBlockingQueue<>(10);

    /** * 线程池 */
    static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3, 10,
            1, TimeUnit.MINUTES, blockingQueue);

    /** * 一次性定时任务,每十年执行一次 */
    @Scheduled(fixedRate = 1000*60*60*24*365*10)
    public void run() { 
   
        LOGGER.info("--------------run trafficEventTask----------------");
        //多台服务器,只需要一台服务器执行定时任务,通过路径判断
        if (StringUtil.isRunTask()){ 
   
            LOGGER.info("++++++++++++++++++++runing trafficEventTask++++++++++++++++++++++++");
            AliyunAccount account = new AliyunAccount(accessId, accessKey);
            conf = new DatahubConfiguration(account, endpoint);
            client = new DatahubClient(conf);
            //查询有几个shard
            ListShardResult listShardResult = client.listShard(projectName, topicName);

            //根据shard的数量创建线程消费
            for(int i=0;i<listShardResult.getShards().size();i++)
            { 
   
                String shardId = listShardResult.getShards().get(i).getShardId();
                Runnable runnable=new TaskWithoutResult(shardId);
                threadPoolExecutor.submit(runnable);
            }
            //threadPoolExecutor.execute(()-> System.out.println(Thread.currentThread().getName()));
            //threadPoolExecutor.shutdown();//不会触发中断
            //threadPoolExecutor.shutdownNow();//会触发中断
        }
    }

    /** * 无返回值的多线程任务 * @author zyye * */
    class TaskWithoutResult implements Runnable
    { 
   
        /** * dataHub的shardId */
        private String shardId;

        public TaskWithoutResult(String shardId)
        { 
   
            this.shardId=shardId;
        }

        @Override
        public void run(){ 
   
            LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"开始运行");
            try { 
   
                task(shardId);
            } catch (Exception e) { 
   
                //捕捉中断异常
                LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"被中断");
            }
            LOGGER.info("线程_shardId="+shardId+"::"+Thread.currentThread()+"结束运行");
        }
    }

    /** * 根据dataHub的shardId从多线程里读取数据 * @param shardId */
    private void task(String shardId) { 
   
        LOGGER.info("开始执行dataHub任务!shardId="+shardId);

        //获得dataHub上对应的字段
        RecordSchema schema = new RecordSchema();
        schema.addField(new Field("src_evt_id", FieldType.STRING));
        schema.addField(new Field("gmt_create", FieldType.STRING));
        schema.addField(new Field("lng", FieldType.DOUBLE));
        schema.addField(new Field("lat", FieldType.DOUBLE));
        schema.addField(new Field("geohash", FieldType.STRING));
        schema.addField(new Field("evt_desc", FieldType.STRING));
        schema.addField(new Field("evt_start_time", FieldType.STRING));
        schema.addField(new Field("evt_end_time", FieldType.STRING));
        schema.addField(new Field("evt_update_time", FieldType.STRING));
        schema.addField(new Field("evt_detail", FieldType.STRING));
        schema.addField(new Field("evt_type_no", FieldType.BIGINT));
        schema.addField(new Field("sub_evt_type_no", FieldType.BIGINT));
        schema.addField(new Field("nick_name", FieldType.STRING));
        schema.addField(new Field("picture", FieldType.STRING));
        schema.addField(new Field("road_name", FieldType.STRING));
        schema.addField(new Field("region_shape", FieldType.STRING));
        schema.addField(new Field("dt", FieldType.STRING));
        schema.addField(new Field("adcode", FieldType.STRING));

        try { 
   
            boolean bExit = false;
            GetTopicResult topicResult = client.getTopic(projectName, topicName);
            // 首先初始化offset上下文
            OffsetContext offsetCtx = client.initOffsetContext(projectName, topicName, subId, shardId);
            // 开始消费的cursor
            String cursor = null;
            if (!offsetCtx.hasOffset()) { 
   
                // 之前没有存储过点位,先获取初始点位,比如这里获取当前该shard最早的数据
                GetCursorResult cursorResult = client.getCursor(projectName, topicName, shardId, GetCursorRequest.CursorType.OLDEST);
                cursor = cursorResult.getCursor();
            } else { 
   
                // 否则,获取当前已消费点位的下一个cursor
                cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
            }
// System.out.println("Start consume records, begin offset context:" + offsetCtx.toObjectNode().toString()
// + ", cursor:" + cursor);
            long recordNum = 0L;
            while (!bExit) { 
   
                try { 
   
                    GetRecordsResult recordResult = client.getRecords(projectName, topicName, shardId, cursor, 10,
                            topicResult.getRecordSchema());
                    List<RecordEntry> records = recordResult.getRecords();
                    if (records.size() == 0) { 
   
                        // 将最后一次消费点位上报
                        client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
                        // 可以先休眠(30秒)一会,再继续消费新记录
                        Thread.sleep(1000*30);
// System.out.println("sleep 1s and continue consume records! shard id:" + shardId);
                    } else { 
   
                        //将dataHub的数据序列化以后存到数据库
                        List<DwdTfcEvtSftAmapreportRtEntity> eventList = new ArrayList<>();
                        for (RecordEntry record : records) { 
   
                            // 处理记录逻辑
                            DwdTfcEvtSftAmapreportRtEntity dwdTfcEvtSftAmapreportRtEntity = new DwdTfcEvtSftAmapreportRtEntity();
                            JsonNode jsonNode = record.toJsonNode().get("Data");
// System.out.println(jsonNode.toString());
                            eventList.add(dwdTfcEvtSftAmapreportRtEntity.jsonNodeToEntity(jsonNode));
                            // 上报点位,该示例是每处理100条记录上报一次点位
                            offsetCtx.setOffset(record.getOffset());
                            recordNum++;
                            if (recordNum % 100 == 0) { 
   
                                client.commitOffset(offsetCtx);
// System.out.println("commit offset suc! offset context: " + offsetCtx.toObjectNode().toString());
                            }
                        }
                        serv.batchInsert(eventList);
                        cursor = recordResult.getNextCursor();
                    }
                } catch (SubscriptionOfflineException e) { 
   
                    // 订阅下线,退出
                    bExit = true;
                    e.printStackTrace();
                } catch (OffsetResetedException e) { 
   
                    // 点位被重置,更新offset上下文
                    client.updateOffsetContext(offsetCtx);
                    cursor = client.getNextOffsetCursor(offsetCtx).getCursor();
                    LOGGER.info("Restart consume shard:" + shardId + ", reset offset:"
                            + offsetCtx.toObjectNode().toString() + ", cursor:" + cursor);
                } catch (OffsetSessionChangedException e) { 
   
                    // 其他consumer同时消费了该订阅下的相同shard,退出
                    bExit = true;
                    e.printStackTrace();
                } catch (Exception e) { 
   
                    //bExit = true;
                    //当线程意外中断时,等待一段时间再继续运行,而不是退出
                    LOGGER.info("thread shardId="+shardId+"is Exception........................");
                    Thread.sleep(1000*30);
                    e.printStackTrace();
                }
            }
        } catch (Exception e) { 
   
            e.printStackTrace();
        }
    }
}

总结

一个人的未来永远不会取决于你目前所在的位置,而是取决你的心想要到达的地方。
我希望这是一个开始,我期待生活中的更多可能性。
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/234670.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • java实现万年历[通俗易懂]

    java实现万年历[通俗易懂]万年历代码实现packagecom.zll;​importjava.util.Scanner;​/***总结:遇到的bug把传入判断年份的日期都传成输入的year了,应该传入要计算的每一年

  • javaweb-青橙项目-2-77

    javaweb-青橙项目-2-77

  • 微软邮箱设置smtp_邮件服务器怎么设置

    微软邮箱设置smtp_邮件服务器怎么设置配置SMTP服务器和自定义警报和反馈请求电子邮件09/01/2016本文内容AzureDevOpsServer2020|AzureDevOpsServer2019|TFS2018-TFS2013备注AzureDevOpsServer以前名为VisualStudioTeamFoundationServer。若要使用反馈请求和警报,你必须为Azure…

  • 树莓派4b串口通信配置

    树莓派4b串口通信配置树莓派4b本身是两个串口,运行ls/dev-al如下:请注意:在默认状态下,serial0(就是GPIO14,15)是映射到ttyS0的(就是MINI串口:/dev/ttyS0),ttyS0的特点是其工作时钟来自于CPU,CPU的时钟呢又是从600MHZ到1.5Ghz动态变化的,所以这个串口经常会因为时钟频率发生变化而发生错误,因此我们不用这个串口。默认状态下,serial1(跟板载蓝牙…

  • delphi多线程[通俗易懂]

    delphi多线程[通俗易懂]   Delphi中有一个线程类TThread是用来实现多线程编程的,这个绝大多数Delphi书藉都有说到,但基本上都是对TThread类的几个成员作一简单介绍,再说明一下Execute的实现和Synchronize的用法就完了。然而这并不是多线程编程的全部,我写此文的目的在于对此作一个补充。  线程本质上是进程中一段并发运行的代码。一个进程至少有一个线程,即所谓的主线程。同时还可以有多个子线

    2022年10月22日
  • 数据结构PDF下载

    数据结构PDF下载数据结构算法实现及解析C语言[第二版]高一凡pdf文字版http://qunying.jb51.net:81/201303/books/sjjg_sfszjjx_jb51net.rar大话数据结构中文PDF清晰扫描版完整版[36M]http://qunying.jb51.net:81/201209/books/dhsjjg_jb51.rarC#语言描述数据结构pdf版ht…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号