大数据——Flume+Kafka+Flume整合模式

大数据——Flume+Kafka+Flume整合模式创建kafka主题#启动kafka服务kafka-server-start.sh/opt/software/kafka280cala212/conf/kraft/server.properites#创建主题#topic主题名test01#partitions分区数1#replication-factor备份数量1kafka-topics.sh–create–topictest01–partitions1–replication-factor1…

大家好,又见面了,我是你们的朋友全栈君。

大数据——Flume+Kafka+Flume整合模式

创建kafka主题

#启动kafka服务
kafka-server-start.sh /opt/software/kafka280scala212/conf/kraft/server.properites

#创建主题
#topic主题名test01    
#partitions分区数1 
#replication-factor备份数量1
kafka-topics.sh --create --topic test01 --partitions 1 --replication-factor 1 --bootstrap-server 192.168.131.200:9092

#查看主题
kafka-topics.sh --list --bootstrap-server 192.168.131.200:9092

创建flume配置文件(采用KafkaSink作为kafka生产者)

#创建并编辑文件名为flume_kafka01.conf配置文件
vim /root/flume/flume_kafka01.conf

#创建flume 的三大组件sources channels sinks
a1.sources = s1
a1.channels = c1
a1.sinks = k1

#这里选用的是taildir类型的source,支持断点续采
a1.sources.s1.type = taildir

#需要侦听的文件,支持多目录侦听
a1.sources.s1.filegroups = f1
#侦听前缀为prolog的文件
a1.sources.s1.filegroups.f1 = /root/flume_log/prolog*
#断点记录保存文件路径
a1.sources.s1.positionFile = /opt/software/fluem190/data/taildir/tail_prolog_01.json
#设置采集批量
a1.sources.s1.batchSize = 10

a1.channels.c1.type = file
a1.channels.c1.file.checkpointDir = /opt/software/flume190/mydata/checkpoint04
a1.channels.c1.file.capacity = 1000
a1.channels.c1.file.transactionCapacity = 100
#transactionCapacity 默认值为100,且必须大于100
#transactionCapacity >= batchSize

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = 192.168.131.200:9092
a1.sinks.k1.kafka.topic = test01
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.linger.ms = 500
a1.sinks.k1.kafka.acks = 1

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

创建flume配置文件(采用KafkaSource作为kafka消费者)

vim /root/flume/kafka_flume01.conf

a1.sources = s1
a1.channels = c1
a1.sinks = k1

a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.batchSize = 10
a1.sources.si.batchDurationMillis = 2000
a1.sources.s1.kafka.bootstrap.server = 192.168.131.200:9092
a1.sources.s1.topics = test01
a1.sources.s1.kafka.consumer.groupid = first_test
a1.sources.s1.kafka.consumer.auto.offset.reset = earliest

a1.channels.c1.type = file 
a1.channels.c1.checkpointDir = /opt/software/flume190/mydata/checkpoint05
a1.channels.c1.file.dataDirs = /opt/software/flume190/mydata/data
a1.channels.c1.capaticy = 1000
a1.channels.c1.transactionCapacity = 10

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /kafka_flume/log/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = log-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
sinks.k1.hdfs.roundUnit = minute

a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

启动flume消费者

flume-ng agent -n a1 -c conf/ -f /root/flume/kafka_flume01.conf -Dflume.root.logger=INFO,console

启动flume生产者

flume-ng agent -n a1 -c conf/ -f /root/flume/flume_kafka02.conf -Dflume.root.logger=INFO,console

启动控制台kafka消费者

kafka-console-consumer.sh --bootstrap-server test:9092 --from-beginning --topic kb12_01 --property print.key=true --key-deserializer org.apache.kafka.common.serialization.LongDeserializer --value-deserializer org.apache.kafka.common.serialization.StringDeserializer

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152384.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • linux定时执行sql文件[通俗易懂]

    linux定时执行sql文件[通俗易懂]linux定时执行sql文件

  • 基于PS2手柄的Arduino遥控小车

    基于PS2手柄的Arduino遥控小车前言本文利用PS2手柄和Arduino开发板制作了一个简易的遥控小车,利用蓝牙进行通信,可以实现前后左右的移动。(原理掌握之后可以自己拓展相关功能)一、零件1.ArduinoUNO开发板:ArduinoUNO是ArduinoUSB接口系列的最新版本,作为Arduino平台的参考标准模板。UNO的处理器核心是ATmega328,同时具有14路数字输入/输出口(其中6路可作为PWM输出),6路模拟输入,一个16MHz晶体振荡器,一个USB口,一个电源插座,一个ICSPheader和一个复位按钮。2

  • Android App程序退出 黑屏问题

    Android App程序退出 黑屏问题在退出App的时候手机会闪动一下,出现像黑屏一样的效果,不是程序崩溃的效果就只是单单的黑一下,然后退出。。这个怎么破???记录下来:等解决了,回来更新。。。。。。。。。

  • vs2019配置opencv什么版本_vs配置opencv

    vs2019配置opencv什么版本_vs配置opencv一、环境vs2019社区版、win1064位操作系统二、opencv配置步骤(共5步)1、下载OpenCV4.0.1,官网为opencv.org2、安装opencv,我的安装目录是D:\opencv-4.0.1-vc14_vc153、添加环境变量,在PATH中添加:D:\opencv-4.0.1-vc14_vc15\opencv\build\x64\vc15…

    2022年10月20日
  • SPSS聚类分析「建议收藏」

    SPSS聚类分析「建议收藏」聚类分析是根据对象的特性对其进行定量分类的一种多元统计方法。比如:不同地区城镇居民收入和消费状况的分类研究;区域经济及社会发展水平的分析及全国区域经济综合评价…….通常聚类分析分为Q型聚类

  • 论计算机发展史及展望_策略单元培训心得

    论计算机发展史及展望_策略单元培训心得一种对计算机发展史展开研究的策略(3页)本资源提供全文预览,点击全文预览即可全文预览,如果喜欢文档就下载吧,查找使用更方便哦!9.9积分一种对计算机发展史展开研究的策略一种对计算机发展史展开研究的策略一种对计算机发展史展开研究的策略一、引言随着中国的开放,科学技术的国际交流日益深入,现代化意义上的计算机产品与技术被不断介绍并引入到国内,且在短时间内取得了迅.L.猛的发展。然而,作为…

    2022年10月18日

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号