kafka删除topic数据[通俗易懂]

kafka删除topic数据[通俗易懂]kafka删除topic数据一、概述生产环境中,有一个topic的数据量非常大。这些数据不是非常重要,需要定期清理。要求:默认保持24小时,某些topic需要保留2小时或者6小时二、清除方式主要有3个:1.基于时间2.基于日志大小3.基于日志起始偏移量详情,请参考链接:https://blog.csdn.net/u013256816/article/details/80418297接下来,主要介绍基于时间的清除!kafka版本为:2.11-..

大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。

Jetbrains全系列IDE稳定放心使用

kafka删除topic数据
一、概述
生产环境中,有一个topic的数据量非常大。这些数据不是非常重要,需要定期清理。

要求:默认保持24小时,某些topic 需要保留2小时或者6小时

二、清除方式
主要有3个:

1. 基于时间

2. 基于日志大小

3. 基于日志起始偏移量

详情,请参考链接:

https://blog.csdn.net/u013256816/article/details/80418297

接下来,主要介绍基于时间的清除!

kafka版本为:  2.11-1.1.0

zk版本为:  3.4.13

三、kafka配置
# 启用删除主题
delete.topic.enable=true
# 检查日志段文件的间隔时间,以确定是否文件属性是否到达删除要求。
log.retention.check.interval.ms=1000
 

注意:这2行配置必须存在,否则清除策略失效!

log.retention.check.interval.ms 参数的单位是微秒,这里表示间隔1秒钟

四、清除策略
全局topic
在 server.properties 文件中配置的是全局策略,针对每一个topic

单个topic
针对单个topic策略,需要使用脚本kafka-configs.sh

此脚本不需要重启kafka就会生效!

首先来查看一下,当前的topic策略,比如test

bin/kafka-configs.sh –zookeeper zookeeper-1.default.svc.cluster.local:2181 –describe –entity-type topics –entity-name test
 

参数解释:

–describe  详细信息

–entity-type 实体类型

–entity-name 指定topic名

输出:

Configs for topic ‘test’ are
 

这个表示为策略为空

删除topic数据
如果需要删除topic所有数据,使用命令

bin/kafka-topics.sh –delete –topic test –zookeeper zookeeper-1.default.svc.cluster.local:2181
 

这个命令,请谨慎执行!!!

如果想保留主题,只删除主题现有数据(log)。可以通过修改数据保留时间实现

bin/kafka-configs.sh –zookeeper zookeeper-1.default.svc.cluster.local:2181 –entity-type topics –entity-name test –alter –add-config retention.ms=10000
 

执行输出:

Completed Updating config for entity: topic ‘test’.
 

注意:修改保留时间为10秒,但不是修改后10秒就马上删掉,kafka是采用轮训的方式,轮训到这个topic发现10秒前的数据都是删掉。时间由server.properties里面的log.retention.check.interval.ms选项为主

再次查看topic策略

bin/kafka-configs.sh –zookeeper zookeeper-1.default.svc.cluster.local:2181 –describe –entity-type topics –entity-name test
 

输出:

Configs for topic ‘test’ are retention.ms=10000
 

发现目前的删除策略为 retention.ms=10000

删除策略
如果需要删除上面的10秒策略,使用以下命令:

bin/kafka-configs.sh –zookeeper zookeeper-1.default.svc.cluster.local:2181 –entity-type topics –entity-name test –alter –delete-config retention.ms
 

输出:

Completed Updating config for entity: topic ‘test’.
 

再次查看topic策略

bin/kafka-configs.sh –zookeeper zookeeper-1.default.svc.cluster.local:2181 –describe –entity-type topics –entity-name test
 

输出:

Configs for topic ‘test’ are
 

发现策略为空,说明删除成功了!

五、测试清除策略
测试思路

 

说明:
第一步,设置清除策略为保留10秒

第二步,进入生产者模式,输入消息 a

第三步,等待5秒,再次进入生产者模式,输入消息 b

第四部,进入消费者模式,看输出的消息是a还是b

判断标准:
在进行第三步时,a这条消息,应该已经被删除了。所以在第15秒进入消费者模式时,应该输出 b,这样的话,策略才是成功的!

设置策略
topic 为test的数据保留10秒

bin/kafka-configs.sh –zookeeper zookeeper-1.default.svc.cluster.local:2181 –entity-type topics –entity-name test –alter –add-config retention.ms=10000
 

生产模式
进入生产模式,输入a

bin/kafka-console-producer.sh –broker-list kafka-1.default.svc.cluster.local:9092 –topic test
> a
 

等待5秒后,再次进入生产模式,输入b

bin/kafka-console-producer.sh –broker-list kafka-1.default.svc.cluster.local:9092 –topic test
> b
 

消费者模式
等待5秒后,进入 消费者模式

bin/kafka-console-consumer.sh –bootstrap-server kafka-1.default.svc.cluster.local:9092 –topic test –from-beginning

b
 

如果消费者输出为b,表示策略成功!

备注:

如果生产环境中,正在不断的进行生产和消费,执行kafka-configs.sh 脚本,是否会有影响呢?

答案是不会的,它是动态策略!

本文参考链接:

https://blog.csdn.net/forrest_ou/article/details/78999983
 

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/181053.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • Android面试题之Service

    Android面试题之Service1.service是否在mainthread中执行,service里面是否能执行耗时的操作?默认情况,如果没有service所运行的进程,Service和Activity是运行在当前app所在进程中的mainthread里面service里面不能执行耗时的操作(网络请求,拷贝数据库,大文件)特殊情况,可以在清单文件中配置service所在的进程,让service在另外的进程中执行。

  • PHP中header头设置Cookie与内置setCookie的区别

    PHP中header头设置Cookie与内置setCookie的区别

    2021年10月26日
  • pycharm使用python_pytorch中文手册

    pycharm使用python_pytorch中文手册本小节只讲如何通过pycharm使用pytorch,pytorch的详细安装点击这里https://blog.csdn.net/huang_shao1/article/details/82958551anaconda的详细安装点击这里https://blog.csdn.net/huang_shao1/article/details/82958615如图所示,我们编辑好了自己pytorch项…

  • springmvc源码下载_web系统源码下载

    springmvc源码下载_web系统源码下载Spring源码下载、编译Spring源码下载Spring源码编译1、新增下载源地址2、修改依赖地址三级目录Spring源码下载注意:Spring源码使用的是Gradle,而不是Maven。因此下载Spring源码之前可以先安装Gradle,参考:Gradle的下载、安装和配置环境。Spring源码gitee地址:https://gitee.com/mirrors/Spring-Framework。Spring源码gitee仓库地址:https://gitee.com/mirrors/Spring

  • Navicat连接MySQL失败1251

    Navicat连接MySQL失败1251错误提示:1251-Clientdoesnotsupportauthenticationprotocolrequestedbyserver;considerupgradingMySQLclient原因:MySQL8版本以上采用新的加密方式,旧的不能用解决办法:更改MySQL的加密方式ALTERUSER’root’@’localhost’IDENTIFIE…

    2022年10月14日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号