Kafka如何删除topic中的部分数据_kafka修改topic副本数

Kafka如何删除topic中的部分数据_kafka修改topic副本数概述  在平时对kafka…

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

概述

  在平时对kafka的运维工作中,我们经常会由于某些原因去删除一个topic,比如这个topic是测试用的,生产环境中需要删除。或者我想扩容topic的同时,这个topic中的数据我不想要了,这时候删除topic,增加broker,再重新创建topic就会是比较简单的方法。但是kafka删除topic时,有很多关键的点必须清楚,否则在删除topic的时候就会出现各种各样的问题。

  我测试环境使用的kafka版本是0.10.2.0,不同版本的kafka默认配置和bin目录下脚本使用的方式略有不同,以下讨论仅在0.10.2.0版本的kafka中实测过。使用的producer和consumer均为php客户端。

推荐的自动化的删除方法

  在kafka0.8.2.x之后的kafka都支持自动化删除topic,并且官方提供了把这个功能做到了bin/kafka-topics.sh中。只需要下面简单的一步,和这个topic相关的数据就会被删除:

    bin/kafka-topics.sh –zookeeper zk_host:port/chroot –delete –topic my_topic_name

  如果使用这种删除方法,需要注意以下几个问题:

    1. config文件中的delete.topic.enable需要设置为true

      在0.10.2.0版本中,这个参数默认是为false的。经过实测,如果这个参数不显式地指定为true,上面的命令和没执行一样,producer该生产生产,consumer该消费消费,consumer_group的logsize也正常显式。broker的删除原理就是用户在zookeeper/admin/delete_topics中创建一个节点(以topic命名),controller(其实就是某个选举出来的broker),监听zookeeper的/admin/delete_topics目录,如果发现有新的节点创建,则会启动删除topic的逻辑(可以参考这篇博文:https://www.cnblogs.com/huxi2b/p/4842695.html)。如果你ls一下这个目录,发现如果delete.topic.enable为false,这个临时节点也是不会创建的,也就是说,删除topic的逻辑压根不会启动。

    2. 停止producer和consumer

      不停producer和consumer不一定会发生问题,但是停掉了一定不会发生问题。要是不是什么实时的业务,还是停掉比较好一些。在接下来的讨论里面,你就会发现,在删除topic的同时,不停止producer和consumer会产生多么复杂的情况,而且搞不好还会漏消费数据,造成数据丢失的情况。
      在开始讨论之前,不得不提到一个config文件中的参数,auto.create.topics.enable。该参数在官方文档中的描述是这样的:Enable auto creation of topic on the server。在实践中的效果是这样的:如果你给一个不存在的topic中produce数据,或者你给一个不存
在 的topic发起consume请求,那么这个topic就会自动被创建。与这个配置相关的配置还有2个,num.partitions和default.replication.factor,分别控制自动创建的topic的partition数和副本数。在0.10.2.0版本kafka中,默认值是auto.create.topics.enable=true, num.partitions=1, default.replication.factor=1。

     第一种情况:auto.create.topics.enable关闭

      根据实测,当producer正在生产,或者consumer正在消费的时候,执行delete topic的命令行,producer会被卡住,consumer也会停止消费,topic被删除,logsize变为0。在不重启producer进程和consumer进程的情况下,如果手动重新创建topic,这时producer开始成功生产数据,consumer也开始消费数据,消费到的是topic重建后producer产生的数据,lag也显示正常,就和创建好topic,再开启producer和consumer是一样的。可以看到,如果auto.crete.topics.enable为false的情况下,即便是不关闭producer和consumer,直接调用delete topic命令行,也不会发生出现错误。

    第二种情况,auto.create.topics.enable开启

      在auto.create.topics.enable为true的场景下, 需要讨论的情况就有很多了:  

      (1)如果是只有1个partition,只有1个consumer消费,且num.partitions=1

       假设producer生产数据比consumer消费数快,而且已经运行了一段时间了,也就是说已经积压了一些数据了。
       此时,不关闭producer和consumer,直接调用delete topic的命令行,你会发现:原来的topic确实已经被删除了,但是因为producer和consumer正在运行,所以broker有重新创建了一个partition为1的topic,而且logsize为0。但是很快,因为producer并不会因为topic被重新创建了而停止,所以logsize会继续从0开始增长,增长的数量就是topic被重建后,producer生产成功的消息条数,producer的行为很好理解。但是consumer就会出现一些令人费解的行为,首先是consumer会继续消费topic被重建之前,producer生产的数据,直到把这些数据消费完毕。但是理论上delete topic命令行一执行,log文件被删除了(logsize也显示为0,也不是poll缓存下的,我已经把receive.message.max.bytes设置的非常小了),为什么还能消费得到。这个问题暂时没有找到答案,可能是因为page cache的原因。但是根据测试确实会发生这样的情况,这种情况就会造成consumer消费了本来应该是被删除的数据。而且在此期间,lag有一段时间会为负值,是因为logsize变为了从0开始增加,但是consumer给broker提交的offset仍然是topic重建之前的值,所以lag=logsize-consumer_offset,也就变成了负值。第二个异常行为是,consumer把topic重建前producer生产的数据消费完之后,不能继续消费topic重建之后producer生产的数据,会显示RD_KAFKA_RESP_ERR_PARTITION_EOF错误。除非重启,才有可能正确消费数据。为什么说可能呢?因为在auto.create.topics.enable=true且producer和consumer正在运行的情况下,topic是被删除了,但是consumer_group并没有被删除。也就是说,consumer重启之后,会从上次被杀掉时候的offset开始消费新的日志。分为以下2种情况:

       <1> topic重建后,producer新生产的数据的个数小于consumer被杀掉最后提交的offset

         

Kafka如何删除topic中的部分数据_kafka修改topic副本数

         Topic重建后,producer新生产的数据的个数小于consumer offset,所以lag=LogSize-ConsumerOffset为负。如果此时重启consumer,则consumer下次给broker发送的fetch请求是消费offset=26的数据,大于LogSize。根据实测,会从offset=0开始消费,也就是正常从头开始消费,不会漏掉数据,lag也会变为从12开始递减。

        <2> topic重建后,producer新生产的数据的个数大于consumer被杀掉最后提交的offset

          

Kafka如何删除topic中的部分数据_kafka修改topic副本数

          Topic重建后,由于producer生产数据快于consumer的消费速度,此时新生产的数据个数已经大于ConsumerOffset。如果此时重启consumer,则consumer下次给broker发送的fetch请求是消费offset为40的数据,那么broker理所当然地把新生产的数据中的offset为40的数据发送给consumer,consumer也从此在这个offset开始消费。同时新生产的数据中0~39的数据就永远都不会被消费了,造成了丢失数据的后果。

       (2)如果是只有1个partition,但是有多个consumer消费,且num.partitions=1       

         对,你没有看错。就是多个consumer消费一个partition。这种情况和上一种情况唯一的区别就是consumer的个数。但是topic重建后,发现logsize并不会成为0,producer继续生产数据,logsize增加相应的个数。不重启consumer,consumer会把topic重建之前生产的数据全部消费完,这造成消费了本来应该删除数据的错误,然后接着消费producer新生产的数据,并不会卡住。producer和consumer还有整个topic的各项参数就像什么都没有发生一样。

      上面举的两个例子是最简单的两种情况,但是已经可以感觉得到如果在删除topic时auto.create.topic.enable=true并且不关闭producer和consumer产生的结果有多复杂了。笔者还做过更多的测试,比如原始partition个数并不为1,但是num.partition为1(自动创建的topic的partition个数为1)。或者原始partition个数不为1,num.partition也不为1, 但是数值不一样。再加上consumer个数是否为1个这两种情况,结果分析起来会更加复杂。 所以,如果为了代码写起来方便,执意设置auto.create.topic.enable=true, 那么在删除topic时,还是关闭producer和consumer为妙。
      使用这个命令行还需要注意的一点是,如果某台broker挂掉了,也会影响到删除topic。具体的表现就是producer和consumer都被卡住了。topic显示删除不掉。但是如果把broker再拉起来,topic就会被成功删除了。
但是获过头来想,delete.topic.enable默认为false其实也是符合生产环境安全需求的。假设有多个部门在共享一个kafka集群。如果某天另外一个部门的同事delete topic的时候把topic粘贴错了,或者他根本不知道有这个topic。这样topic会被直接删除,这在线上环境是很严重的事情。

  

手动的删除方法

   1. 停止producer和consumer

   2. 停止kafka(不是停止zookeeper,因为第4步要用到zookeeper) 

   3. 删除config文件中log.dir下的topic相关文件

   4. 在zookeeper上删除 /config/topics/topic_name, /brokers/topics/topic_name, /admin/topic_name

  重启kafka之后,发现topic被删除。这时可以自行创建新的topic。

  关于是否一定要停止kafka才能手动删除topic,笔者做了一些测试。关闭了producer,关闭了consumer。然后做了第3步和第4步。然后重启producer和consumer。发现producer可以继续produce成功,但是不会生成物理文件,也不会在zookepe的/brokers/topics/和/config/topics/, /admin/delete_topics中生成任何数据。。开启多个consumer可以继续消费(消费到的是删除topic之前producer生产的数据,消费的可能是broker的page cache中的东西),但是去log_dir下看,没有物理文件。zookeper的/brokers/topics/和/config/topics/, /admin/delete_topics中没有任何数据。这造成了consumer消费了本该删除的数据,producer丢失了生产的数据的后果。所以手动删除topic还是停止kafka,producer,consumer比较好。

  使用这个方法的时候有一点要特别注意。这种方式会造成上面讲到的topic虽然删除了,但是consumer_group依然存在的问题。如果topic重建之后,producer先运行,且新生产的数据个数大于consumer被杀掉时的ConsumerOffset,那么就会造成开头一部分数据无法消费到。如果新生产的数据少于consumer被杀掉时的ConsumerOffset,那么从offset=0开始消费。删除ConsumerOffset,在0.10.2.0版本中没有提供,因为这些东西都是保存在__consumer_offset topic中的。没有很方便的脚本把某个consumer_group的位移信息从__consumer_offset中删除。 而且这个topic保存了所有consumer_group的位移信息,乱动这个topic需要承担其他consumer_group位移丢失的风险。如果某个consumer_group在一定时间内,如果再也没有consumer加入,那么这个consumer_group的位移信息将会被自动删除。这个时间由config中的offsets.retention.minutes参数控制,默认是1天。解决刚才说的consumer_group在topic删除后仍然存留的问题可以通过重置offset的方式实现。在kafka reset offset 0.11 版提供了命令行的方法。笔者已经使用0.11版的命令行去操作0.10版的kafka, 测试显示一切正常。

  使用方式可以参考这个文档:https://www.cnblogs.com/huxi2b/p/7284767.html

参考资料

  删除topic的逻辑:

    https://www.cnblogs.com/huxi2b/p/4842695.html

  几个topic相关的kafka配置参数:

    https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/broker-configurations/

  一些删除topic相关讨论:

    https://stackoverflow.com/questions/33537950/how-to-delete-a-topic-in-apache-kafka

    https://stackoverflow.com/questions/25568178/kafka-server-stop-sh-not-working-when-kafka-started-from-python-script

    http://wanwenli.com/kafka/2016/11/04/Kafka-Group-Coordinator.html
    https://github.com/yahoo/kafka-manager/issues/341
    https://stackoverflow.com/questions/29243896/removing-a-kafka-consumer-group-in-zookeeper 

       https://stackoverflow.com/questions/39529511/what-is-the-use-of-consumer-offsets-and-schema-topics-in-kafka

    https://stackoverflow.com/questions/32390265/what-determines-kafka-consumer-offset

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/181409.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 软件包管理(三)–编译安装

    软件包管理(三)–编译安装

  • xp 架设网站服务器软件,xp架设ftp服务器软件

    xp 架设网站服务器软件,xp架设ftp服务器软件xp架设ftp服务器软件内容精选换一换由于智能写Cache是华为自研闭源工具,仅支持华为鲲鹏处理器使用。需在所有云服务器上安装DataProvider软件,SAP技术支持人员通过该软件收集云服务器所在的平台信息,以便在SAP系统故障、性能下降时进行定位和分析。SAPNetWeaver所在的服务器上,在创建服务器的时候需要为其指定名为DataproviderAccess的Agency,同时也需…

  • sai2 常用快捷键 2020

    sai2 常用快捷键 2020Ctrl+A全选Ctrl+B从剪贴板创建画布Ctrl+D取消选择Ctrl+E合并图层Ctrl+H显示选区边缘Ctrl+Y还原Ctrl+T自由变换Ctrl+R显示尺子按Shift可调节比例Ctrl+U色相Ctrl+X剪贴Ctrl+W关闭视图Shift+PageUp逆时针旋转Shift+PageDown顺时针旋转[小一号画笔]大一号画笔Delete清除图层%0~9%更改画笔浓度(小键盘)A选区笔B喷枪C水彩笔E橡皮擦H水平翻

  • 会话Cookie中的IDOR导致批量帐户泄露

    会话Cookie中的IDOR导致批量帐户泄露如果你熟悉IDOR是什么,你将知道它可以在url,请求正文,GET或POST请求等任何地方,也可以在cookie中。当我注意到Cookie中有一个被称为shoppingID会话Cookie的事件时,In试图重现CSRF问题。在仔细查看了cookie的价值之后,我意识到一些很快引起我注意的事情:shoppingID=88ea39539e74fa67c09a4fc0bc8ebe6d00978392PEr9ySESSIONID3552522PXGLkC;你注意到了吗?如果没有,请不要再继续查看。如果你

  • pycharm database 没有_pycharm社区版使用教程

    pycharm database 没有_pycharm社区版使用教程网上教程都是直接打开右上角的database,但是我死活也没找到,后来发现应该是因为社区版的问题,需要自己安装,详细步骤如下图。1.打开File—》Settings—-》Plugins搜索database,选择DatabaseNavigator安装即可done~

  • windows下cmd查看端口占用情况[通俗易懂]

    windows下cmd查看端口占用情况[通俗易懂]查看端口占用情况进入cmd输入netstat-ano可以列出所有端口占用情况如果只是找特定端口号,输入netstat-ano|findstr“8082”,其中8082为端口号,对应PID为16040继续输入tasklist|findstr“16040”或者到任务管理器-进程查找…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号