kafka重复消费解决方案_kafka重复消费原因

kafka重复消费解决方案_kafka重复消费原因前面博客小编向大家分享了kafka如何保证消息不丢失?,基本是从producer和broker来分析的,producer要支持重试和acks,producer要做好副本和及时刷盘落地。这篇博客呢,就跟大家一起聊一下kafka消费者如何消费的?如何避免重复消费?消费流程:一般我们消费测试是不会变的,都使用默认的,也就是第一种,range策略。默认策略,保证基本是均衡的。计算公式:n=分区数/消费者数m=分区数%消费者数前m个消费者,消费n+1个,剩余的消费n个eg:12个par

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

一、前言

前面博客小编向大家分享了 kafka如何保证消息不丢失?,基本是从producer和broker来分析的,producer要支持重试和acks,producer要做好副本和及时刷盘落地。

这篇博客呢,就跟大家一起聊一下 kafka 消费者如何消费的?如何避免重复消费?

二、消费者消费流程

消费流程:

  1. 从zk获取要消费的partition 的leader的位置 以及 offset位置
  2. 拉数据,这里拉数据是直接从broker的pagecash拉取,零拷贝 ,所以很快。
  3. 如果pagecash数据不全,就会从磁盘中拉取,并发送
  4. 消费完成后,可以手动提交offset,也可以自动提交offset。
    在这里插入图片描述

消费策略有哪些?如何配置

一般我们消费测试是不会变的,都使用默认的,也就是第一种,range策略。

  • Range 范围分配策略(默认)

默认策略,保证基本是均衡的。
计算公式 :
n = 分区数/消费者数
m = 分区数%消费者数
前m个消费者,消费n+1个,剩余的消费n个
在这里插入图片描述
在这里插入图片描述
eg:12个partition,9个消费者
12/9 = 1
12%9 = 3
前3台 消费2个partition,后6台各消费1个partition。

  • RoundRobin 轮询

先根据topic 和 topic的partition的hashcode进行一个排序,然后以轮询的方式分配给各个消费者。

在这里插入图片描述

  • stricky粘性分配策略

在没有reblence的时候和轮询策略一样
当发生rebalence的时候,尽可能的保证与上一次分配一致

比如默认是
在这里插入图片描述
比如consumer2 挂了,topicA p1 和topicB p2就没有消费者了,这个时候要进行消费组的rebalence。
在这里插入图片描述

然后按照轮询策略分配一下。
在这里插入图片描述

可以在配置消费配置的时候,指定消费策略:

//Range
propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RangeAssignor.class);

//RoundRobin
propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.RoundRobinAssignor.class);

//stricky
propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.StickyAssignor.class);

什么是零拷贝?

普通把文件发送到远程服务器的方法:
在这里插入图片描述
1.读磁盘内容,拷贝到内核缓冲区
2.cpu把内核缓冲区数据拷贝到用户空间缓冲区
3.调用write(),把用户空间缓冲区数据拷贝到内核的Socket Buffer中
4.把sb中的数据拷贝到网卡缓冲区 NIC Buffer ,网卡在传输

从上面的流程看, 1和3 其实是多余的,用户和内核相互转换,会带来cpu上下文切换,对cpu性能有影响。

零拷贝 就是对这两次的拷贝忽略掉,应用程序可以直接把磁盘中的数据从内核中,直接传输到socket,不用互相拷贝。其中用到了Direct Memory Access 技术,可以把数据直接从内核空间传递到网卡设备,kafka中把数据直接从磁盘复制到 pagecash,给消费者读取,如图:

在这里插入图片描述
在这里插入图片描述
零拷贝其实不是没有拷贝,只是减少了不必要的拷贝次数,比如内核到用户空间的拷贝。
linux 中使用sendfile()实现零拷贝
java中nio用到零拷贝,比如filechannel.transferTo()。

mmap 文件映射机制:把磁盘文件映射到内存,用户通过修改内存,就可以修改磁盘文件。提高io效率,减少了复制开销。

三、如何避免重复消费?

分析原因:

1.生产者重复提交
2.rebalence引起重复消费

超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance,提交offset失败。其他消费者会从没有提交的位置消费,从而导致重复消费。

解决方案:

1.提高消费速度

  • 增加消费者
  • 多线程消费
  • 异步消费
  • 调整消费处理时间

2.幂等处理

  • 消费者设置幂等校验

  • 开启kafka幂等配置,生产者开启幂等配置,将消息生成md5,然后保存到redis中,处理新消息的时候先校验。这个尽量不要开启,消耗性能。

props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);

四、如何顺序消费?

我们知道kafka,整个topic有多个partition,每个partition内的消息是有顺序的。

五、如何延迟消费?

kafka是无状态的,没有延迟的功能。pulsar和rabbitmq实现更加方便。
在这里插入图片描述
开发延迟推送服务,定时检索延迟消息,发送给kafka。

六、频繁rebanlence怎么解决?

再均衡,保证所有消费者相对均衡消费。rebalence的时候,所有消费者,停止消费,直到rebanlence完成。

触发时机:
1.consumer个数变化
2.订阅topic个数变化
3.订阅的topic的partition变化

解决方案:

使用消息队列Kafka版时消费客户端频繁出现Rebalance

频繁出现rebalence,可能是消费者的消费时间过长,超过一定时间(max.poll.interval.ms设置的值,默认5分钟)未进行poll拉取消息,则会导致客户端主动离开队列,而引发Rebalance。

1.参数调整:
session.timeout.ms:v0.10.2之前的版本可适当提高该参数值,需要大于消费一批数据的时间,但不要超过30s,建议设置为25s;而v0.10.2及其之后的版本,保持默认值10s即可。
max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积。
max.poll.interval.ms: 该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

2.尽量提高客户端的消费速度,消费逻辑另起线程进行处理。
3.减少Group订阅Topic的数量,一个Group订阅的Topic最好不要超过5个,建议一个Group只订阅一个Topic。

附:批量消费代码

import com.ctrip.framework.apollo.ConfigService;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class BehaviorConsumerConfig { 

public Map<String, Object> consumerConfigs() { 

Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecordsConfig);
propsMap.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, org.apache.kafka.clients.consumer.StickyAssignor.class);
propsMap.put("security.protocol", protocol);
propsMap.put("ssl.truststore.location", truststoreLocation.replaceAll("file://", ""));
propsMap.put("ssl.truststore.password", truststorePassword);
propsMap.put("login.config.location", loginConfigLocation);
propsMap.put("sasl.mechanism", mechanism);
return propsMap;
}
@Bean("batchContainerFactory")
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() { 

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
// 并发创建的消费者数量
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(3000);
//设置为批量消费,每个批次数量在Kafka配置参数中设置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
factory.setBatchListener(true);
return factory;
}
}

七、小结

本篇我们基本上把消费者的消费梳理干净了,以及消费会遇到的 重复消费,顺序消费,延迟消费等问题都也解释了给出了解决方案。方案一通百通。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/183641.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • html5 sexteen,Sexy TV shows blamed for teen pregnancies

    html5 sexteen,Sexy TV shows blamed for teen pregnanciesExposuretosomeformsofentertainmentisacorruptinginfluenceonchildren,leadingteenswhowatchsexyprogramsintoearlypregnanciesandchildrenwhoplayviolentvideogamestoadoptaggressiv…

  • ConcurrentHashMap 1.7和1.8区别

    ConcurrentHashMap 1.7和1.8区别ConcurrentHashMap与HashMap和Hashtable最大的不同在于:put和get两次Hash到达指定的HashEntry,第一次hash到达Segment,第二次到达Segment里面的Entry,然后在遍历entry链表(1)从1.7到1.8版本,由于HashEntry从链表变成了红黑树所以concurrentHashMap的时间复杂度从O(n)到O…

  • linux tomcat宕机自动启动脚本,tomcat宕机自动重启脚本「建议收藏」

    linux tomcat宕机自动启动脚本,tomcat宕机自动重启脚本「建议收藏」#!/bin/bash#获取tomcat进程ID/usr/share/tomcatTomcatID=$(ps-ef|greptomcat|grep-w‘tomcat‘|grep-v‘grep‘|awk‘{print$2}‘)#tomcat启动程序(这里注意tomcat实际安装的路径)#StartTomcat=/usr/local/tomcat/bin/startup.s…

  • PHPExcel_把Excel数据导入数据库PHP

    PHPExcel_把Excel数据导入数据库PHPPHPExcel导出到Excel前提,准备工作1、PHP版本5.3以上2、官网下载稳定版本的PHPExcel官网地址:http://phpexcel.codeplex.com/以下均以PHPExcel_1.8.0稳定版为学习版本插曲:当我用在官网下载的1.8.0版本练习时,发现与PHP7不能兼容,经Goole后发现要下载Github上的最新版本,附地址:https://github.c

    2022年10月29日
  • linux定时器编程实验报告,Linux定时器实验.doc[通俗易懂]

    linux定时器编程实验报告,Linux定时器实验.doc[通俗易懂]Linux定时器实验Linux第六次实验及分析报告实验要求:1)在用户态编写一个程序,该程序设定一个定时器,在时间到期的时候做出某种可观察的响应(方法不限)2)分析你的程序的实际执行借助了内核的哪些机制3)提交实验与分析报告一:在用户态编写一个程序,该程序设定一个定时器,在时间到期的时候做出某种可观察的响应(方法不限)G++进行编译运行结果如下:可见调用间隔定时器定时10秒成功!二:分析你的程序的…

  • linux 内核态与用户态_linux内核态和用户态通信

    linux 内核态与用户态_linux内核态和用户态通信创建于2013-04-13迁移自本人的百度空间——————————–1/内核态-&gt;用户态      在kernel module中调用printk是最简单的传递信息到用户空间的方法。 2/用户态-&gt;内核态      在linux中,用户对设备的操作往往被抽象为对文件的操作。利用这一特性,可以通过注册和实现伪字符设备…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号