Flume与Kafka对接「建议收藏」

Flume与Kafka对接「建议收藏」引言flume为什么要与kafka对接?我们都知道flume可以跨节点进行数据的传输,那么flume与sparkstreaming对接不好吗?主要是flume对接到kafka的topic,可以给多个consumergroup去生成多条业务线。虽然flume中的channelselector中的副本策略也可以做多给多个sink传输数据,但是每个channelselector都是很消耗资源的。文章目录一、flume采集的数据发往一个topic二、flume采集的数据发往多个topic总结.

大家好,又见面了,我是你们的朋友全栈君。

引言
flume为什么要与kafka对接?
我们都知道flume可以跨节点进行数据的传输,那么flume与spark streaming对接不好吗?主要是flume对接到kafka的topic,可以给多个consumer group去生成多条业务线。虽然flume中的channel selector中的副本策略也可以给多个sink传输数据,但是每个channel selector都是很消耗资源的。其次,kafka也可以起到一个消峰的作用


一、flume采集的数据发往一个topic

这里为了方便测试,我采用的是netcat source、memory channel、kafka sink,当然你也可以采用你自己想要的方式配置flume,只需要根据官方文档修改对应的source和channel即可。

necat-flume-kafka.conf的配置文件如下:

#Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1

#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = wjt
a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20 
a1.sinks.k1.kafka.producer.acks = 1 
a1.sinks.k1.kafka.producer.linger.ms = 1 

#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

其中你只需要修改sink中的topic和brokerList即可,当然你也可以增加其他的配置
1、启动kafka消费者
在这里插入图片描述
2、启动flume
在这里插入图片描述
3、启动netcat的客户端并发送几条数据
在这里插入图片描述
4、观察到kafka consumer很快就消费到了数据
在这里插入图片描述

二、flume采集的数据发往多个topic

如果数据有多种类型,比如点赞数据、评论数据、喜欢数据等等,是不是就要发往不同的topic去分析数据,这时候就需要用到flume的拦截器来做分类。
flume可以给event加上头信息,结合channel selector来发往不同的sink。
在flume官方文档可以看到:
在这里插入图片描述
意思是:如果你的event的头信息(k-v类型)包含一个topic字段,那么这个event将会被发送到对应的topic,并覆盖你配置的kafka.topic

拦截器的代码:

package wjt.demo;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
/** * @description: * @author: wanjintao * @time: 2020/8/29 11:45 */
public class myInterceptor implements Interceptor { 

//声明一个存放事件的集合
private List<Event> addHeaderEvents;
@Override
public void initialize() { 

//初始化存放事件的集合
addHeaderEvents = new ArrayList<>();
}
//单个事件拦截
@Override
public Event intercept(Event event) { 

//1. 获取事件中的头信息
Map<String, String> headers = event.getHeaders();
//2. 获取事件中的body信息
String body = new String(event.getBody());
//3. 根据body中是否有“Hello”来决定是否添加头信息
if (body.contains("hello")) { 

//4. 有hello添加“wan”头信息
headers.put("topic", "www1");
} else { 

//4. 没有hello添加“tao”头信息
headers.put("topic", "www2");
}
return event;
}
//批量事件拦截
@Override
public List<Event> intercept(List<Event> events) { 

//1. 清空集合
addHeaderEvents.clear();
//2. 遍历events
for (Event event : events) { 

//3. 给每一个事件添加头信息
addHeaderEvents.add(intercept(event));
}
//4. 返回结果
return addHeaderEvents;
}
@Override
public void close() { 

}
public static class Builder implements Interceptor.Builder { 

@Override
public Interceptor build() { 

return new myInterceptor();
}
@Override
public void configure(Context context) { 

}
}
}

你只需要修改单个事件拦截的代码即可,我这里是如果数据包含hello,将会给事件加上header(topic,www1),反之则给事件加上header(topic,www2),打包上传至flume/lib目录下

netcat-flume-typekafka.conf的配置文件:

#Name
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#Source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
#Interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = wjt.demo.myInterceptor$Builder
#Channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#Sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = wjt
a1.sinks.k1.brokerList = node01:9092,node02:9092,node03:9092
a1.sinks.k1.kafka.flumeBatchSize = 20 
a1.sinks.k1.kafka.producer.acks = 1 
a1.sinks.k1.kafka.producer.linger.ms = 1 
#Bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

你只需要将a1.sources.r1.interceptors.i1.type的值改为你上面的拦截器的全类名$Builder即可
1、先启动consumer1和consumer2(flume启动顺序都是先启动服务端在启动客户端)
在这里插入图片描述
在这里插入图片描述
2、启动flume
在这里插入图片描述
3、启动netcat客户端
在这里插入图片描述
4、观察consumer消费的topic可以看到,www1只接受到了包含hello的数据,www2只接受到了没有包含hello的数据
在这里插入图片描述
在这里插入图片描述

总结

很多时候flume官方文档可以帮助我们解决很多自己想要的业务场景,我们要更多地去查看官方文档

版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152405.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)
blank

相关推荐

  • python fabric实现远程操作和部署

    python fabric实现远程操作和部署

  • Tomcat 日志概述[通俗易懂]

    Tomcat 日志概述[通俗易懂]1   Tomcat日志概述Tomcat日志信息分为两类:一是运行中的日志,它主要记录运行的一些信息,尤其是一些异常错误日志信息。二是访问日志信息,它记录的访问的时间,IP,访问的资料等相关信息。2   Tomcat日志配置2.1 访问日志的配置默认tomcat不记录访问日志,如下方法可以使

  • linux查看进程下的线程_linux查看线程状态

    linux查看进程下的线程_linux查看线程状态鉴于linux下线程的广泛使用我们怎么查看某个进程拥有的线程id了现在很多服务的设计主进程->子进程->线程(比如mysql,varnish)主进程负责侦听网络上的连接并把连接发

  • RAID10磁盘阵列损坏修复操作

    RAID10磁盘阵列损坏修复操作-f模拟硬盘损坏mdadm/dev/md0-f/dev/sdb1、查看损坏磁盘阵列的情况2、将损坏的硬盘设备移除3、插上新的硬盘(在真机上操作,虚拟机之间将损坏的硬盘删除,然后在添加新的硬盘即可)4、卸载挂载操作5、将新的硬盘添加到RAID10磁盘阵列中6、查看修复成功后的磁盘阵列信息(因为新添加的需要等待一段时间等待系统重新创建)7、重新挂载1、查看损坏后的磁盘阵列信息2、将损坏的硬盘从磁盘阵列中移除mdadm/dev/md0-r损坏的硬盘设备名mdadm-D/

  • htmla标签下划线去除_html超链接的下划线怎么去掉?a标签去下划线的方法都在这里…

    htmla标签下划线去除_html超链接的下划线怎么去掉?a标签去下划线的方法都在这里…本篇文章就是关于html超链接取消下划线的用法,教你如何快速的去掉HTML超链接下划线的方法,最后还有相关代码解释,下面就让我们一起看看这篇文章吧首先我们使用css的基础样式来做一个最简单的去下划线的方法:htmla超链接标签,默认有的浏览器显示有下划线,有的没有下划线,大多锚文本超链接A标签内字体是有下划线的,怎么去除超链接下划线?html超链接去除下划线怎么做?去掉去除超链接锚文本的…

  • TinyXML用法小结[通俗易懂]

    TinyXML用法小结[通俗易懂]TinyXML用法小结1.     介绍Tinyxml的官方网址:http://www.grinninglizard.com官方介绍文档:http://www.grinninglizard.com/tinyxmldocs/tutorial0.html在TinyXML中,根据XML的各种元素来定义了一些类:TiXmlBase:整个TinyXML模型的基类。TiXmlAttr…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号