Spring Boot 中使用@KafkaListener并发批量接收消息[通俗易懂]

Spring Boot 中使用@KafkaListener并发批量接收消息[通俗易懂]kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到Kafka消息队列拥堵的情况。碰到这种情况时,有不能直接清理整改消息队列,因为还有别的服务正在使用该队列。因此只能额外启动一个相同名称的consumer-group来加快消息消费(经测试,如果该topic只有一个分区,实际上再启动一个新的消费者作用不到)。具体代码在这里,欢迎加星号,fork。官方文档……

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

kakfa是我们在项目开发中经常使用的消息中间件。由于它的写性能非常高,因此,经常会碰到读取Kafka消息队列时拥堵的情况。遇到这种情况时,有时我们不能直接清理整个topic,因为还有别的服务正在使用该topic。因此只能额外启动一个相同名称的consumer-group来加快消息消费(如果该topic只有一个分区,再启动一个新的消费者,没有作用)。

完整的代码在这里,欢迎加星号、fork。

官方文档在https://docs.spring.io/spring-kafka/reference/html/_reference.html

###第一步,并发消费###
先看代码,重点是这我们使用的是ConcurrentKafkaListenerContainerFactory并且设置了factory.setConcurrency(4); (我的topic有4个分区,为了加快消费将并发设置为4,也就是有4个KafkaMessageListenerContainer)

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

注意也可以直接在application.properties中添加spring.kafka.listener.concurrency=3,然后使用@KafkaListener并发消费。

###第二步,批量消费###
然后是批量消费。重点是factory.setBatchListener(true);
以及 propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
一个设启用批量消费,一个设置批量消费每次最多消费多少条消息记录。

重点说明一下,我们设置的ConsumerConfig.MAX_POLL_RECORDS_CONFIG是50,并不是说如果没有达到50条消息,我们就一直等待。官方的解释是”The maximum number of records returned in a single call to poll().”, 也就是50表示的是一次poll最多返回的记录数。

从启动日志中可以看到还有个 max.poll.interval.ms = 300000, 也就说每间隔max.poll.interval.ms我们就调用一次poll。每次poll最多返回50条记录。

max.poll.interval.ms官方解释是”The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member. “;

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(4);
        factory.setBatchListener(true);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

   @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, propsConfig.getBroker());
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, propsConfig.getEnableAutoCommit());
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, propsConfig.getGroupId());
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, propsConfig.getAutoOffsetReset());
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 50);
        return propsMap;
    }

启动日志截图
这里写图片描述

关于max.poll.records和max.poll.interval.ms官方解释截图:
这里写图片描述

###第三步,分区消费###
对于只有一个分区的topic,不需要分区消费,因为没有意义。下面的例子是针对有2个分区的情况(我的完整代码中有4个listenPartitionX方法,我的topic设置了4个分区),读者可以根据自己的情况进行调整。

public class MyListener {
    private static final String TPOIC = "topic02";

    @KafkaListener(id = "id0", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "0" }) })
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p0 Received message={}",  message);
            }
        }
    }

    @KafkaListener(id = "id1", topicPartitions = { @TopicPartition(topic = TPOIC, partitions = { "1" }) })
    public void listenPartition1(List<ConsumerRecord<?, ?>> records) {
        log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id1 records size " +  records.size());

        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                String topic = record.topic();
                log.info("p1 Received message={}",  message);
            }
        }
}

关于分区和消费者关系,后面会补充,先摘录如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

最后,总结,如果我们的topic有多个分区,经过以上步骤可以很好的加快消息消费。如果只有一个分区,因为已经有一个同名group id在消费了,新启动的一个基本上没有作用(本人测试结果)。

具体代码在这里,欢迎加星号,fork。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/181878.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • 请简述什么是Vue组件化开发_vuecli和webpack

    请简述什么是Vue组件化开发_vuecli和webpack前言真实项目开发过程中,我们都是使用组件化的去开发vue的项目,但是组件化的思想又是如何来的呢?下面就从开始讲解演变过程演变过程1.0一般情况下vue都是单页面开发,所以项目中只会有一个inde

  • 普通用户nginx安装步骤,并能够启动mongo的配置

    普通用户nginx安装步骤,并能够启动mongo的配置普通用户nginx安装步骤,并能够启动mongo的配置

  • Shell脚本调用阿里云API实现DDNS动态域名解析[通俗易懂]

    Shell脚本调用阿里云API实现DDNS动态域名解析[通俗易懂]由于服务器的外网是动态拨号,每次获取的外网IP都不同。手头上刚好有阿里云的域名。为此,想通过编写一个Shell脚本,定期通过互联网服务获取当前机器所在网络的IP地址,并将新的IP地址通过阿里云提供的API,更新到对应的域名解析记录。申请AccessKey登陆阿里云官网,在控制台的右上角,将鼠标移动到头像上,会出现如下列表:选择AccessKey管理,会弹出如下提示:选择开始使用子用户Access

  • idea 激活码【中文破解版】

    (idea 激活码)2021最新分享一个能用的的激活码出来,希望能帮到需要激活的朋友。目前这个是能用的,但是用的人多了之后也会失效,会不定时更新的,大家持续关注此网站~IntelliJ2021最新激活注册码,破解教程可免费永久激活,亲测有效,下面是详细链接哦~https://javaforall.cn/100143.html…

  • 图标变成了一张白纸_电脑图标是白纸形式但能打开

    图标变成了一张白纸_电脑图标是白纸形式但能打开大家好,我是波导终结者。WIN10到现在也有些年头了,虽然好的不学学坏的,天天搞强制升级有些烦人,有时候新版本的BUG也比较致命,但是整体的性能,功能和稳定性上还是有飞跃性的提升。其实WIN10自带不少实用工具,而且兼容性和稳定性没得说,很多时候我们可以不必寻找第三方工具,省了很多钱,也提高了不少效率。今天就一起来看看吧。WIN10自带截图工具WIN10自带截图工具,虽然肯定不比专业的截图工具强,…

    2022年10月19日
  • 爬虫爬取妹子图片(入门篇)

    爬虫爬取妹子图片(入门篇)

    2021年11月11日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号