大家好,又见面了,我是你们的朋友全栈君。
3.安装:kafka_2.10(scala) 0.10.0.1(kafka)
a.安装kafka_2.10-0.10.0.1.tgz
b.复制至{/home/crx/soft}
$>cp /mnt/hgfs/2.安装环境/download/apache-kafka/kafka_2.10-0.10.0.1.tgz /home/crx/soft/
c.解压
$>tar -zxvf kafka_2.10-0.10.0.1.tgz
$>rm kafka_2.10-0.10.0.1.tgz
$>ln -s kafka_2.10-0.10.0.1/ kafka
d.修改环境变量,追加
$>gedit ~/.bash_profile
#Kafka install
export KAFKA_HOME=/home/crx/soft/kafka
export PATH=$KAFKA_HOME/bin:$PATH
$>source ~/.bash_profile
e.安装完成! (最好重启虚拟机,否则每开启一个.sh前都需要source)
bin目录下:
kafka-topics.sh 分类 kafa-console-consumer.sh 消费者 kafka-console-producer.sh 生产者
单节点–单Broker集群
1.开启zookeeper
$>zookeeper-server-start.sh /home/crx/soft/kafka/config/zookeeper.properties
$>jps
4264 Jps
3178 QuorumPeerMain (在另一个窗口打开查看)
自动创建路径
dataDir=/tmp/zookeeper (原zookeeper在/home/crx/tmp/zookeeper)
clientPort=2181
maxClientCnxns=0
2.开启Broker
参照{KAFKA_HOME/config/server.properties}
broker.id=0 //必须为整数;brokerID理解为分区号
log.dirs=/home/crx/tmp/kafka-logs //【修改】消息存放位置
zookeeper.connect=localhost:2181 //注册zookeeper
$>kafka-server-start.sh /home/crx/soft/kafka/config/server.properties
$>jps
3424 Kafka -->【Broker守护进程】
4264 Jps
3178 QuorumPeerMain
3.创建主题
--zookeeper参数设置;
如果使用了--zookeeper参数,那么consumer的信息将会存放在zk之中
查看的方法是使用./zookeeper-client,ls /consumers/[group_id]/
--bootstrap-server参数设置
如果使用了--bootstrap-server参数,那么consumer的信息将会存放在kafka之中
创建主题 分区是1,副本是1:
$>kafka-topics.sh --create --topic test --zookeeper localhost:2181 --partitions 1 --replication-factor 1
reated topic "test".
list topic命令查看主题
$>kafka-topics.sh --list --zookeeper localhost:2181
test
zk客户端查看:
$>zkCli.sh
[zk: localhost:2181(CONNECTED) 1] ls /brokers/topics
[test]
作用:1.在Zookeeper中注册topic节点;
2.在【broker-->log.dirs属性】目录创建toptic主题
查看tmp下生成的日志信息,消息是按照主题分类的
[crx@master tmp]$ ll
total 28
drwxrwxr-x. 3 crx crx 4096 Jan 12 15:22 kafka-logs
[crx@master tmp]$ cd kafka-logs/
[crx@master kafka-logs]$ ls
recovery-point-offset-checkpoint replication-offset-checkpoint test-0
[crx@master kafka-logs]$ cd test-0/
[crx@master test-0]$ ls
00000000000000000000.index 00000000000000000000.log
Segment file组成:由2大部分组成,分别为index file和data file,
此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件
Segment文件命名规则:
partion全局的第一个segment从0开始,后续每个segment文件名为上一个segment文件最后一条消息
的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
4.开启生产者producer
$>kafka-console-producer.sh --topic test --broker-list localhost:9092
$>jps
3424 Kafka
4770 Jps
4531 ConsoleProducer
3178 QuorumPeerMain
作用:开启守护进程ConsoleProducer
5.开启消费者
$>kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning 注:此处的beginning不加的话会看不到开启此消费者之前的消息
$>jps
3424 Kafka
5521 Jps
4531 ConsoleProducer
5270 ConsoleConsumer
3178 QuorumPeerMain
作用:开启守护进程ConsoleConsumer
6.在producer(生产者一端)发送消息
producer>hello world
consumer>hello world
$>cat 00000000000000000000.log
总结:生产者可以按主题生产数据, 互不干扰 将数据存储到broker中
消费者没有时候,也可以生产产品,当有消费者时,如订阅号那么会将历史消息,
主题消息都能收到 可以重复消费
kafka消息是主动拉取消息,而不是broker主动发送消息,消费者把消息存在zookeeper中
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/134196.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...