Flume-Kafka-Flume对接Kafka以及Kafka数据分类传输

Flume-Kafka-Flume对接Kafka以及Kafka数据分类传输Flume对接KafkaFlume日志采集组件;Flume对接kafka主要是为了通过kafka的topic功能,动态的增加或者减少接收的节点,并且Flume要对接多个节点是需要多个channel和sink的会导致内存不够的情况。那么可以实现的场景就是Flume采集日志文件,通过kafka给多给业务线使用。1)配置flume(flume-kafka.conf)#definea1.sources=r1a1.sinks=k1a1.channels=c1#sourcea1

大家好,又见面了,我是你们的朋友全栈君。

Flume 对接 Kafka

Flume日志采集组件;Flume对接kafka主要是为了通过kafka的topic功能,动态的增加或者减少接收的节点,并且Flume要对接多个节点是需要多个channel和sink的会导致内存不够的情况。

那么可以实现的场景就是Flume采集日志文件,通过kafka给多给业务线使用。

1)配置 flume(flume-kafka.conf)

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  1. 启动消费者
kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
  1. 进入 flume 根目录下,启动 flume
bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

4)启动nc发送数据

[bd@hadoop113 ~]$ nc localhost 44444
hello
OK
word
OK

结果如下
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
hello
word

Kafka数据分类

依据Kafka Sink的配置

Property Name Default Description
kafka.topic default-flume-topic The topic in Kafka to which the messages will be published. If this parameter is configured, messages will be published to this topic. If the event header contains a “topic” field, the event will be published to that topic overriding the topic configured here.

在消息头中携带了topic字段的话,该消息就会被发送到topic字段对应的topic去。

那么在flume接收到消息之后,可以通过拦截器为topic加上header,即可将其进行分类。

Flume拦截器如下:

public class JudgeTestStringInterceptor implements Interceptor { 

// 声明一个存放事件的List
private List<Event> allEvents;
public void initialize () { 

// 初始化
allEvents = new ArrayList<Event>();
}
/** * 单个事件拦截 * @param event * @return */
public Event intercept (Event event) { 

// 1、获取事件中的头信息
Map<String, String> headers = event.getHeaders();
// 2、获取事件中的body信息
String body = new String(event.getBody());
// 3、根据body中是否有“test”来决定添加怎样的头信息
// 有的话添加<topic, first>没有则添加<topic, second>
if (body.contains("test")) { 

headers.put("topic", "first");
} else { 

headers.put("topic", "second");
}
return event;
// 如果返回null则认为该事件无用,将会被过滤
}
/** * 批量事件拦截 * @param list * @return */
public List<Event> intercept (List<Event> list) { 

// 1、清空集合
allEvents.clear();
// 2、遍历event
for (Event event : list) { 

// 3、给每个事件添加头信息
allEvents.add(intercept(event));
}
return allEvents;
}
public void close () { 

}
// 定义一个Builder对象
public static class Builder implements Interceptor.Builder { 

public Interceptor build () { 

return new JudgeTestStringInterceptor();
}
public void configure (Context context) { 

}
}
}

配置文件type-kafka.conf如下:

# define
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# interceptor
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.starnet.interceptor.JudgeTestStringInterceptor$Builder
# sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.bootstrap.servers = hadoop113:9092,hadoop114:9092,hadoop115:9092
a1.sinks.k1.kafka.topic = first
a1.sinks.k1.kafka.flumeBatchSize = 20
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1
# channel
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# bind
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

启动flume,两个消费者以及nc之后结果如下:

[bd@hadoop113 ~]$ nc localhost 44444
test
OK
hello
OK
word
OK
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic first
test
[bd@hadoop113 ~]$ kafka-console-consumer.sh --zookeeper hadoop113:2181 --topic second
hello
word
版权声明:本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌侵权/违法违规的内容, 请发送邮件至 举报,一经查实,本站将立刻删除。

发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/152396.html原文链接:https://javaforall.cn

【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛

【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...

(0)


相关推荐

  • BatchShell软件—-Xshell与Ansible结合

    BatchShell软件—-Xshell与Ansible结合

  • 关于cBridge2.0,你不能错过的关键信息(二)!

    关于cBridge2.0,你不能错过的关键信息(二)!我们之前讨论了cBridge2.0的两种流动性模型,还深入探讨了「自管」流动性模型的设计挑战。今天,我们详细聊聊针对该模型设计挑战的解决方案。首先,我们从节点协调和操作问题开始。1/n上篇ELI5短文中我们提到,“cBridge2.0是第一个也是唯一一个允许流动性提供者(LP)在「自管」和「共管」流动性模型之间自由选择的跨链架构。在「自管」模式下,也就是「非托管」模式,LP可以100%地控制其流动性。为此,每个LP须要在服务器中运行一个cBridge节点「程序」…

  • fileinputstream java_Java FileInputStream close()方法

    fileinputstream java_Java FileInputStream close()方法JavaFileInputStreamclose()方法java.io.FilterInputStream.close()用于关闭流。1语法publicvoidclose()2参数无3返回值无4示例packagecom.yiidian;/***一点教程网:http://www.yiidian.com*//***java.io.FilterInputStream.close…

  • asp中的session使用方法

    asp中的session使用方法Session是什么呢?简单来说就是服务器给客户端的一个编号。当一台WWW服务器运行时,可能有若干个用户浏览正在运正在这台服务器上的网站。当每个用户首次与这台WWW服务器建立连接时,他就与这个服务器

  • PHP学习之一晚撸下W3chscool

    PHP学习之一晚撸下W3chscoolPHP多维数组其实简单的而言,多维数组就是由单个的数组组成的,两个数组嵌套组成一个二维数组,三个顾名思义就是三维数组。先来一个简单的数组。数字是key,引号里的是value<?php$array=array(‘1’=>”咋”,’2’=>”日”);echo$array[2];?>输出:日然后再来几个有难…

  • String类型转int,转long

    String类型转int,转longStringstr1="123";Stringstr2="123.0";不带小数:可直接可转为intinta=Integer.parseInt(str);带小数,直接转为int会报数字格式化异常,需要先转为double,后转为int转int: intb=(int)Double.parseDouble(str);转long:longc =(lon…

发表回复

您的电子邮箱地址不会被公开。

关注全栈程序员社区公众号