大家好,又见面了,我是你们的朋友全栈君。
一、安装 JDK
rpm -qa | grep java
rpm -qa | grep jdk
rpm -qa | grep gcj
rpm -qa | grep java | xargs rpm -e --nodeps #卸载老版本
yum list java-1.8*
yum install java-1.8.0-openjdk* -y
java -version
#这里为了图方便使用了 yum 安装 JDK
二、安装 zookeeper
官方地址:https://archive.apache.org/dist/zookeeper/zookeeper-3.7.0/
mkdir /data
#wget http://101.34.22.188/zookeeper/apache-zookeeper-3.7.0-bin.tar.gz -P /data
wget https://mirrors.bfsu.edu.cn/apache/zookeeper/zookeeper-3.7.0/apache-zookeeper-3.7.0-bin.tar.gz -P /data --no-check-certificate
tar -zxvf /data/apache-zookeeper-3.7.0-bin.tar.gz
#修改配置文件
cd /data/apache-zookeeper-3.7.0-bin/conf
cp /data/apache-zookeeper-3.7.0-bin/conf/zoo_sample.cfg /data/apache-zookeeper-3.7.0-bin/conf/zoo.cfg
vim /data/apache-zookeeper-3.7.0-bin/conf/zoo.cfg
dataDir=/data/apache-zookeeper-3.7.0-bin/data #修改数据存放目录
dataLogDir=/data/apache-zookeeper-3.7.0-bin/log
pidfile=/var/run/zookeeper.pid
......
mkdir -p /data/apache-zookeeper-3.7.0-bin/data #创建 data 文件夹
mkdir -p /data/apache-zookeeper-3.7.0-bin/log #创建 log 文件夹
#启动 zookeeper
cd /data/apache-zookeeper-3.7.0-bin/bin
./zkServer.sh start
----------------------------------------------
nohup /data/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start &
tail -f /data/apache-zookeeper-3.7.0-bin/bin/nohup.log
./zkServer.sh start-foreground #查看启动失败错误信息,大概率是版本问题,要下带 bin 的版本
设置 zookeeper 开机自动启动
cat > /usr/lib/systemd/system/zookeeper.service <<EOF [Unit] Description=zookeeperservice After=network.target [Service] Type=forking PIDFile=/var/run/zookeeper.pid WorkingDirectory=/data/apache-zookeeper-3.7.0-bin ExecStart=/data/apache-zookeeper-3.7.0-bin/bin/zkServer.sh start User=root Group=root ExecReload=/bin/kill -s HUP $MAINPID ExecStop=/bin/kill -s QUIT $MAINPID PrivateTmp=true [Install] WantedBy=multi-user.target EOF
systemctl daemon-reload
systemctl enable zookeeper.service && systemctl restart zookeeper.service
systemctl status zookeeper.service
三、安装 kafka
官网:http://kafka.apache.org/downloads
#wget http://101.34.22.188/kafka/kafka_2.12-2.8.0.tgz -P /data
wget https://archive.apache.org/dist/kafka/2.8.0/kafka_2.12-2.8.0.tgz -P /data --no-check-certificate
tar xf /data/kafka_2.12-2.8.0.tgz
修改配置
cd /data/kafka_2.12-2.8.0
vim /data/kafka_2.12-2.8.0/config/server.properties
# broker 的编号,如果集群中有多个 broker,则每个 broker 的编号需要设置的不同
broker.id=0
# 31 行
listeners=PLAINTEXT://192.168.10.20:9092
# 123 行,修改 zookeeper.connect 为自己的 IP:PORT
zookeeper.connect=localhost:2181/kafka
#存放消息日志文件的地址
log.dirs=/tmp/kafka-logs
server.properties 配置详解
//当前机器在集群中的唯一标识,和zookeeper的myid性质一样(broker.id和host.name每个节点都不相同)
broker.id=0
//当前kafka对外提供服务的端口默认是9092listeners=PLAINTEXT://192.168.1.202:9092
//这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
host.name=hadoop1
//这个是borker进行网络处理的线程数
num.network.threads=3
//这个是borker进行I/O处理的线程数
num.io.threads=8
//发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.send.buffer.bytes=102400
//kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.receive.buffer.bytes=102400
//这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
socket.request.max.bytes=104857600
//消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,
//如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
log.dirs=/home/hadoop/log/kafka-logs
//默认的分区数,一个topic默认1个分区数
num.partitions=1
//每个数据目录用来日志恢复的线程数目
num.recovery.threads.per.data.dir=1
//默认消息的最大持久化时间,168小时,7天
log.retention.hours=168
//轮转时间,当需要删除指定小时之前的数据时,该设置项很重要
log.roll.hours=12
//这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.segment.bytes=1073741824
//每隔300000毫秒去检查上面配置的log失效时间
log.retention.check.interval.ms=300000
//是否启用log压缩,一般不用启用,启用的话可以提高性能
log.cleaner.enable=false
//设置zookeeper的连接端口
zookeeper.connect=192.168.123.102:2181,192.168.123.103:2181,192.168.123.104:2181
//设置zookeeper的连接超时时间
zookeeper.connection.timeout.ms=6000
启动 kafka
nohup /data/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/kafka_2.12-2.8.0/config/server.properties &
或
/data/kafka_2.12-2.8.0/bin/kafka-server-start.sh -daemon /data/kafka_2.12-2.8.0/config/server.properties
设置开机自动启动
cat > /usr/lib/systemd/system/kafka.service <<EOF [Unit] Description=kafkaservice After=network.target [Service] WorkingDirectory=/data/kafka_2.12-2.8.0 ExecStart=/data/kafka_2.12-2.8.0/bin/kafka-server-start.sh /data/kafka_2.12-2.8.0/config/server.properties ExecStop=/data/kafka_2.12-2.8.0/bin/kafka-server-stop.sh User=root Group=root Restart=always RestartSec=10 [Install] WantedBy=multi-user.target EOF
systemctl daemon-reload
systemctl enable kafka.service && systemctl restart kafka.service
systemctl status kafka.service
netstat -antp | grep 9092
ps aux | grep kafka
lsof -i:9092
jps -l
测试
# topic 是发布消息的 category,以单节点的配置创建了一个叫 dblab01 的 topic.可以用 list 列出所有创建的 topics,来查看刚才创建的主题是否存在
/data/kafka_2.12-2.8.0/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partition 1 --topic dblab01
/data/kafka_2.12-2.8.0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic dblab01
/data/kafka_2.12-2.8.0/bin/kafka-topics.sh --list --zookeeper localhost:2181
/data/kafka_2.12-2.8.0/bin/kafka-console-producer.sh --broker-list PLAINTEXT://192.168.10.20:9092 --topic dblab01
/data/kafka_2.12-2.8.0/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab01 --from-beginning #测试命令可能有些版本问题
参考
https://www.cnblogs.com/jiangcong/p/13961761.html
https://www.cnblogs.com/zyt-bg/p/10368786.html
https://www.cnblogs.com/jiangcong/p/14820403.html
https://ibyte.blog.csdn.net/article/details/113346565
Kafka 专栏
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/134565.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...