大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE使用 1年只要46元 售后保障 童叟无欺
使用Docker(k8s)安装Kafka并使用宿主机连接
- 安装Docker及docker-compose
具体安装方法可以去官网看教程
检查docker-compose是否安装成功
- 创建 docker-compose.yml 文件
version: '2'
services:
zookeeper:
image: "zookeeper"
hostname: "zookeeper.local"
container_name: "zookeeper"
#设置网络别名 可随意取
networks:
local:
aliases:
- "zookeeper.local"
kafka:
image: "wurstmeister/kafka"
hostname: "kafka.local"
container_name: "kafka"
ports:
- "9092:9092"
networks:
local:
aliases:
- "kafka.local"
environment:
KAFKA_ADVERTISED_HOST_NAME: kafka.local
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
#设置网络,名为local模式
networks:
local:
driver: bridge
- 进入Kafka容器
docker exec -it kafka /bin/bash
- 创建Topic
/opt/kafka_2.13-2.7.0/bin/kafka-topics.sh --create --zookeeper zookeeper:2181 -replication-factor 1 --partitions 1 --topic test_topic
- 测试命令行生产消费
- 生产者
/opt/kafka_2.13-2.7.0/bin/kafka-console-producer.sh --broker-list kafka.local:9092 --topic test_topic
- 消费者
/opt/kafka_2.13-2.7.0/bin/kafka-console-consumer.sh --bootstrap-server kafka.local:9092 --topic test_top
ic --from-beginning
- 从宿主机使用代码连接Kafka
6.1 进入Zookeeper容器查看brokers注册信息# 进入容器 docker exec -it zookeeper /bin/bash # 进入zookeeper命令行 bin/zkCli.sh
6.2 查看brokers注册信息
get /brokers/ids/1001
6.3 配置宿主机hosts# 添加 127.0.0.1 kafka.local
6.4 使用Java代码连接Kafka
public class KafkaConsumerDemo { public static void main(String[] args) { //1.配置消费者连接属性 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka.local:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "test_demo"); //2.创建Kafka消费者 KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //3.订阅topics consumer.subscribe(Arrays.asList("test_topic")); // consumer.assign(Arrays.asList(new TopicPartition("test_topic", 0))); // consumer.seekToBeginning(Arrays.asList(new TopicPartition("test_topic", 0)));//不改变当前offset //4.死循环读取消息 for (; ; ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1)); if (records != null && !records.isEmpty()) { records.forEach(r -> { System.out.println("key:" + r.key() + "----value:" + r.value()); }); } } } }
测试成功
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/171579.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...