kafka多线程消费[通俗易懂]

kafka多线程消费[通俗易懂]1、zookeeper集群搭建:https://blog.csdn.net/qq_31289187/article/details/809333652、kafka集群搭建:https://blog.csdn.net/qq_31289187/article/details/809552283、kafka生成消息:https://blog.csdn.net/qq_31289187/articl…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

1、zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper

2、kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN博客

3、kafka生成消息:kafka-producer生产者案例_燕少༒江湖的博客-CSDN博客_kafkaproducer单例

4、kafka多线程消费:offset从zookeeper中得到,一个线程对应一个partition,这样消费速度很快,而且消息的顺序可控,线程数量和partition一样,多了浪费资源,少了效率很低,也可以不通过zookeeper来消费,kafka0.9以后的版本就可以将offset记录到对应消费group到对应的broker上。

5、pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.cn.dl</groupId>
    <artifactId>kafka-consumer1</artifactId>
    <version>1.0</version>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.6</source>
                    <target>1.6</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.43</version>
        </dependency>
    </dependencies>

</project>

6、KafkaConsumterMain

package com.dl.cn;

import java.io.IOException;
import java.util.Properties;

/**
 * Created by tiger on 2018/8/20.
 */
public class KafkaConsumterMain {
    public static void main(String[] args) throws IOException {
        String topic = "user-info";
        int threadNum = 2;
        Properties properties = ReadPropertiesUtils.readConfig("config.properties");
        KafkaConsumterServer kafkaConsumterDemo = new KafkaConsumterServer(topic,threadNum,properties);
        kafkaConsumterDemo.consumer();
    }
}

7、KafkaConsumterServer

package com.dl.cn;

import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * Created by tiger on 2018/8/20.
 */
public class KafkaConsumterServer {
    private String topic;
    private Properties properties;
    private int threadNum;

    public KafkaConsumterServer(String topic,int threadNum,Properties properties) {
        this.topic = topic;
        this.threadNum = threadNum;
        this.properties = properties;
    }
    /**
     * 创建固定线程池消费消息
     * 线程和partition一对一
     * */
    public void consumer() {
        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
        topicCountMap.put(topic, new Integer(threadNum));
        ConsumerConfig consumerConfig = new ConsumerConfig(properties);
        ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
        Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
        //创建固定数量的线程池
        ExecutorService executor = Executors.newFixedThreadPool(threadNum);
        for (KafkaStream stream : streams) {
            executor.submit(new KafkaConsumerThread(stream));
        }
    }
}

8、KafkaConsumerThread

package com.dl.cn;

import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;

/**
 * Created by tiger on 2018/8/20.
 */
public class KafkaConsumerThread implements Runnable{
    private KafkaStream<byte[], byte[]> stream;

    public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
        this.stream = stream;
    }

    @Override
    public void run() {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            MessageAndMetadata<byte[], byte[]> mam = it.next();
            System.out.println(Thread.currentThread().getName() + ">>>partition[" + mam.partition() + "],"
                    + "offset[" + mam.offset() + "], " + new String(mam.message()));
        }
    }
}

9、ReadPropertiesUtils

package com.dl.cn;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;

/**
 * Created by tiger on 2018/8/18.
 */
public class ReadPropertiesUtils {
    /**
     * 读取properties配置文件
     * @param configFileName
     * @exception
     * @return
     * */
    public static Properties readConfig(String configFileName) throws IOException {
        Map<String,String> config = new HashMap<String, String>();
        InputStream in = ReadPropertiesUtils.class.getClassLoader().getResourceAsStream(configFileName);
        Properties properties = new Properties();
        properties.load(in);
        return properties;
    }
}

10、config.properties

kafka多线程消费[通俗易懂]

11、测试结果:

线程1对应partition0,线程2对应partition1,两者互不干扰

kafka多线程消费[通俗易懂]

12、

auto.offset.reset=smallest,意思是从topic最早数据开始消费
auto.offset.reset=largest,是从topic最新数据开始消费

在zk中可以看到消费组

kafka多线程消费[通俗易懂]

比如在代码中用到tiger7777这个消费者组

在代码中看到线程2最后消费的消息offset=1755

kafka多线程消费[通俗易懂]

线程1最后消费的消息offset=2243

kafka多线程消费[通俗易懂]

zookeeper中记录的offset值

kafka多线程消费[通俗易懂]

生产者不断生产数据,消费者不断消费数据

将tiger7777,中partition对应的offset的值更新为200,然后重新启动

消费者,发现消息从offset=200开始重新消费,而且发现只有一个线程在继续消费

kafka多线程消费[通俗易懂]

kafka多线程消费[通俗易懂]

kafka多线程消费[通俗易懂]

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/182243.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 计算机系统新手入门,电脑初学者入门教程

    计算机系统新手入门,电脑初学者入门教程本篇主要从关机、任务管理器、电脑屏幕锁定这三个方面,帮助初次学习电脑的人尽快掌握一些基本操作,快一起来学习吧。工具/材料电脑(本篇以Windows7系统为例)电脑关机01方法一。首先,点击屏幕左下方的win(当点击时,会提示“开始”的字样)。02点击后,可以看见有关机选项,点击关机,后面出现几个选项,根据需要进行选择就可以。03方法二:使用快捷键。按下快捷键Alt+F4。(这里需要注意,是同时按…

  • Python量化交易学习笔记(50)——程序化交易1

    Python量化交易学习笔记(50)——程序化交易1easytrader安装pipinstalleasytrader下载安装e海通财PC独立交易版

  • 【Cubieboard2】配置编译内核支持SPI全双工通信驱动

    【Cubieboard2】配置编译内核支持SPI全双工通信驱动1,cubieboard2A20系列,无论是官方还是社区的系统,默认都是不支持SPI总线驱动的。需要重新编译配置内核,修改文件才能支持SPI全双工通信。本文以Cuieboard2Debain为例,进行讲解;2,重新编译配置内核(1)先去官网下载对应版本的linux内核源码,地址:https://github.com/linux-sunxi/linux-sunxi我下载的是sun-xi

  • 西瓜视频地址解析_西瓜去水印免费

    西瓜视频地址解析_西瓜去水印免费json解析工具:https://www.json.cn/base64解析工具:https://www.sojson.com/base64.html1.先获取videoid2.通过videoid

  • 【linux命令】 tree命令

    【linux命令】 tree命令文章目录Tree命令安装方法一,yum安装方法二,源码安装Tree命令安装方法一,yum安装命令:yuminstalltree方法二,源码安装1.下载安装包,地址:http://mama.indstate.edu/users/ice/tree/2.解压安装1)Linux环境(CentOS6.5)下安装a.解压tree-1.7.0.tgz文件,命令:tar-zxvftree-1.7.0.tgzb.进入解压目录中,命令:cdtree-1.7.0      c.安装文件,命令:

  • Navicat for MySQL 使用SSH方式链接远程数据库(二)

    Navicat for MySQL 使用SSH方式链接远程数据库(二)

    2021年10月19日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号