大家好,又见面了,我是你们的朋友全栈君。
当我实现三个消费者去同时消费一个Topic(默认没有分区)消息时,三个消费者同时消费到了同样的消息
现象如下:
2019-06-11 15:30:02.516 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO com.example.kafka.consumer.TestB - ==[cousumerC]==1560238202513====sendTest3===3
2019-06-11 15:30:02.517 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO com.example.kafka.consumer.TestB - ==[cousumerA]==1560238202513====sendTest3===3
2019-06-11 15:30:02.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO com.example.kafka.consumer.TestB - ==[cousumerB]==1560238202513====sendTest3===3
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#1-0-C-1] INFO com.example.kafka.consumer.TestB - ==[cousumerA]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] INFO com.example.kafka.consumer.TestB - ==[cousumerC]==1560238204513====sendTest3===4
2019-06-11 15:30:04.518 [org.springframework.kafka.KafkaListenerEndpointContainer#2-0-C-1] INFO com.example.kafka.consumer.TestB - ==[cousumerB]==1560238204513====sendTest3===4
消费者
@KafkaListener(groupId = "test2",topicPartitions = {
@TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerA(String msg) {
logger.info("==[cousumerA]==" + msg);
}
@KafkaListener(groupId = "test2",topicPartitions = {
@TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerB(String msg) {
logger.info("==[cousumerB]==" + msg);
}
@KafkaListener(groupId = "test2",topicPartitions = {
@TopicPartition(topic = "hzl.test.aaa",partitionOffsets = @PartitionOffset(partition = "0",initialOffset = "-1"))})
public void cousumerC(String msg) {
logger.info("==[cousumerC]==" + msg);
}
生产者
@Autowired
KafkaTemplate kafkaTemplate;
int i =0;
@Scheduled(fixedRate = 2000)
public void sendTest3() {
kafkaTemplate.send("hzl.test.aaa", System.currentTimeMillis() + "====sendTest3===" + i++);
}
如果采用如下方式,则只会被消费一次
@KafkaListener(topics = "hzl.test.aaa",groupId = "test2",)
public void cousumerD(String msg) {
logger.info("==[cousumerD]==" + msg);
}
@KafkaListener(topics = "hzl.test.aaa",groupId = "test2",)
public void cousumerE(String msg) {
logger.info("==[cousumerE]==" + msg);
}
@KafkaListener(topics = "hzl.test.aaa",groupId = "test2",)
public void cousumerF(String msg) {
logger.info("==[cousumerF]==" + msg);
}
另外,如何监听topic中最新的消息
auto-offset-reset: latest
这样设置好像也是从消费组中提交后的offset开始消费的,并不是最新的一条消息?
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/153317.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...