Flume+Kafka整合案例实现

Flume+Kafka整合案例实现 一、为什么要集成Flume和Kafka我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成?那首先就应该明白业务需求,一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/SparkStreaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果Flume直接对接实时计算框架,当数据采集速…

大家好,又见面了,我是你们的朋友全栈君。

 

一、为什么要集成Flume和Kafka

我们很多人在在使用Flume和kafka时,都会问一句为什么要将Flume和Kafka集成?那首先就应该明白业务需求,一般使用Flume+Kafka架构都是希望完成实时流式的日志处理,后面再连接上Flink/Storm/Spark Streaming等流式实时处理技术,从而完成日志实时解析的目标。第一、如果Flume直接对接实时计算框架,当数据采集速度大于数据处理速度,很容易发生数据堆积或者数据丢失,而kafka可以当做一个消息缓存队列,从广义上理解,把它当做一个数据库,可以存放一段时间的数据。第二、Kafka属于中间件,一个明显的优势就是使各层解耦,使得出错时不会干扰其他组件。

因此数据从数据源到flume再到Kafka时,数据一方面可以同步到HDFS做离线计算,另一方面可以做实时计算,可实现数据多分发。

二、概念剖析Flume+Kafka

Flume 是一个分布式、可靠、和高可用的海量日志聚合的系统,支持在系统中定制各类数据发送方,通过监控整个文件目录或者某一个特定文件,用于收集数据;同时Flume也 提供数据写到各种数据接受方(可定制)的能力,用于转发数据。Flume 的易用性在于通过读取配置文件,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume 经常被用来作为数据处理的工具。

  • Source:Flume 搜集日志的方式多种多样,比如可以检测文件夹的变化spool Source,可以监测端口信息 Netcat Source,可以监控某各文件新增的内容 Exec Source等等,通常使用检测文件夹变化的方式来实时收集信息,所以本例中我们也将使用Spool Source。
  • Channel:提供了一层缓冲机制,来实现数据的事务性传输,最大限度保证数据的安全传输。常用的有MemoryChannel:所有的events 被保存在内存中,优点是高吞吐,缺点是容量有限并且Agent 死掉时会丢失内存中的数据;FileChannel:所有的Events 被保存在文件中,优点是容量较大且死掉时数据可恢复,缺点是速度较慢。因此为了保证Event 在数据流点对点传输中是可靠地,要注意Channel 的选择。目前为了提高速度,我们暂时采用MemoryChannel,之后的目标是实现一个自定义channel—doubleChannel,解决上述的两个痛点问题。
  • Sink:将数据转发到目的地,或者继续将数据转发到另外一个source,实现接力传输,多层之间通过AVRO Sink来实现。本例中,我们的最终目标是实现日志实时处理,因此实时的采集数据流就把数据发送到Kafka 中。

那么小结一下,使用的是对文件夹中文件变化进行监测的Spooling DirectorySource,channel 是用的MemoryChannel,sink 是自定义的kafkasink,用于向kafka 发送数据。

Kafka 是由LinkedIn 开发的开源分布式消息系统,主要用于处理LinkedIn 的活跃数据,说白了也就是用户访问日志数据。这些数据主要包括PV、UV、用户行为(登陆、浏览、搜索、分享、点击)、系统运行日志(CPU、内存、磁盘、进程、网络)等方面的数据。这些数据通常以日志的形式进行存储,现有的消息队列系统可以很好的用于日志分析系统对于实时数据的处理,提高日志解析效率。那么说到Kafka,就必须掌握三个原理部分:Producer、Topic、Consumer:

  • Producer:消息和数据的生产者,向Kafka的一个topic发布消息的过程即为生产过程,在本例中Flume应该是Producer;
  • Topic:主题,Kafka处理的消息的不同分类(逻辑概念),可以根据Topic的不同,去区分处理不同的消息。说的更直白一些,Topic就是起到资源隔离的作用,Producer向指定Topic中产生消息,Consumer再从指定的Topic中消费消息。
  • Consumer:消息和数据的消费者,订阅topic并处理其发布的消息的过程即为消费过程。

三、Flume+Kafka实战(详细步骤)

3.1、Netcat Source + Kafka Sink

首先进行Netcat Source,这个source 我们可以用来进行测试,最简单也是最直观,在被监控的端口输入测试消息,连接kafka之后便可在consumer界面上看到提示的测试信息。

1、cd /usr/flume/conf  在conf目录下新建kafka_netcat.conf ,vi kafka_netcat.conf  。如果你之前已经写过了netcat.conf  你可以使用命令 cp netcat.conf kafka_netcat.conf  之后修改netcat里面的内容即可,不需要每次都进行全段全段的撰写.

#example.conf: A single-node flume configuration
#Test Kafka Sink in netcat Source

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#Describe/configue the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Describe the sink
#设置kafkaSink 注意大小写
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置kafka的主题topic
a1.sinks.k1.topic = kafka_netcat_test
#设置kafka 的 broker地址以及端口号
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave3:9092
#设置kafka序列化方式
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder

#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

整个配置文件内容分为三个大部分:1、从整体上描述代理agent中sources、sinks、channels所涉及到的组件;2、详细描述agent中每一个source、sink与channel的具体实现:Source使用了netcat,指定绑定的主机以及端口号;Sink按照Kafka的方式进行配置,务必要注意type中的KafkaSink是首字母大写,否则会报错3、通过channel将source与sink连接起来。

2、创建kafka topic kafka_netcat_test,保证测试的数据是发送到指定的topic中

[root@master bin]# ./kafka-topics.sh --create --zookeeper master:2181,slave1:2181,slave3:2181/kafka --replication-factor 1 --partitions 1 --topic kafka_netcat_test

Flume+Kafka整合案例实现

利用查询命令,查看kafka_netcat_test topic是否创建成功

Flume+Kafka整合案例实现

3、启动Kafka Consumer,指定topic是kafka_netcat_test

[root@master bin]# ./kafka-console-consumer.sh --bootstrap-server master:9092,slave1:9092,slave3:9092 --topic kafka_netcat_test --from-beginning

4、启动Flume

[root@master conf]# flume-ng agent -n a1 -c ../conf/ -f kafka_netcat.conf  -Dflume.root.logger=INFO,console

 正确启动之后,会打印出producer相关配置信息,如下图所示:

Flume+Kafka整合案例实现

5、至此,我们的配置就结束了,下面看看效果,kafka是不是能作为sink接收数据,监听44444端口,并输入一些测试信息

Flume+Kafka整合案例实现

6、去kafka consumer的界面,检查界面中已经出现刚刚在44444检测端口输入的测试信息,如下图所示:

Flume+Kafka整合案例实现

3.2、Spool Source + kafka Sink

1、将spool.conf 复制成为 kafka_spool.conf ;cp spool.conf  kafka_spool.conf,添加以下内容:

#example.conf: A single-node flume configuration
#test kafka sink with spooldir source

#Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#Describe/configue the source
a1.sources.r1.type = spooldir
a1.sources.r1.channels = c1
a1.sources.r1.spoolDir = /usr/flume/logs
a1.sources.r1.fileHeader = true

#Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
#设置kafka的主题topic
a1.sinks.k1.topic = kafka_spooldir_test
#设置消费者编码为UTF-8
a1.sinks.k1.custom.encoding=UTF-8
#绑定kafka主机以及端口号
a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave3:9092
#设置kafka序列化方式
a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder


#use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、在kafka创建kafka_spooldir_test 的topic,具体命令与3.1节相似

Flume+Kafka整合案例实现

3、启动kafka consumer,指定topic是kafka_spooldir_test ,并启动flume,在这里不再写出执行的命令,如3.1节类似。此外添加测试文件,在监控的目录下添加文件测试vi /usr/flume/logs/kafka_spool_test.conf

Flume+Kafka整合案例实现

此时在kafka consumer界面中可以接收到我们在测试文件里面添加的测试内容,如上图所示。

四、总结

上述介绍了两种对日志监控的方式,还有一中是exec source,在这里不做过多赘述,基本的实现方式都和第三节相似。

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152349.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • 【Java SE】————标识符及命名规则和关键字

    【Java SE】————标识符及命名规则和关键字Java标识符:             Java是一种可以撰写跨平台应用软件的面向对象的程序设计语言,其中,对于变量,常量,函数,语句块也有名字,我们统统称之为Java标识符.。       标识符是用来给类、对象、方法、变量、接口和自定义数据类型命名的。      Java标识符由数字,字母和下划线(_),美元符号($)组成。在Java中是区分大小写的,而且

  • 雅虎十四条性能优化原则「建议收藏」

    雅虎十四条性能优化原则「建议收藏」雅虎十四条性能优化原则欢迎访问我的博客https://qqqww.com/,祝所有码农同胞们早日走上人生巅峰,迎娶白富美~~首先我去看了《雅虎十四条性能优化原则》,当然是看大佬博客翻译过来的,纯英文的我看不懂Web应用性能优化黄金法则:先优化前端程序(front-end)的性能,因为这是80%或以上的最终用户响应时间的花费所在减少HTTP请求使用CDN添加Expire…

  • fmincon函数源代码_fminbnd函数

    fmincon函数源代码_fminbnd函数输入参数:fun要求解的函数值;x0函数fun参数值的初始化;输出参数:X输出最优参数值原文链接:MATLAB优化函数fmincon解析

  • 手机号码归属地查询App

    手机号码归属地查询App结合MVP设计模式和解析Json数据,制作一款“手机号码归属地查询的App小程序(Android)”说明:实现的原理很简单,有多种设计方式和代码编写风格。本文主要是认识、理解MVP设计模式和Json数据的常见解析框架的使用。源码:请点击链接访问我的GitHub进行查看准备工作:AndroidStudio开发工具(谷爹的亲儿子)浏览器(进行测试淘宝开放平台返回给我们的Json数据并进行…

  • 概率论中 PDF,PMF,CDF的含义[通俗易懂]

    概率论中 PDF,PMF,CDF的含义[通俗易懂]概率论中PDF,PMF,CDF的含义在概率论中,我们经常能碰到这样几个概念PDF,PMF,CDF,这里就简单介绍一下PDF:概率密度函数(probabilitydensityfunction),在数学中,连续型随机变量的概率密度函数(在不至于混淆时可以简称为密度函数)是一个描述这个随机变量的输出值,在某个确定的取值点附近的可能性的函数。概率密度函数都是针对连续性随机变量的,对于连续性随机变量,都是针对某一段区间的取值,在一个点的取值都是几乎为0的,所以我们研究连续性随机变量时,都是取变量在一段

  • 聊聊LuaJIT「建议收藏」

    聊聊LuaJIT「建议收藏」JIT什么是JITJIT=JustInTime即时编译,是动态编译的一种形式,是一种优化虚拟机运行的技术。程序运行通常有两种方式,一种是静态编译,一种是动态解释,即时编译混合了这二者。Ja

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号