大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE稳定放心使用
1、zookeeper集群搭建:zookeeper安装以及使用_燕少༒江湖的博客-CSDN博客_zookeeper
2、kafka集群搭建:kafka集群搭建以及遇到的异常_燕少༒江湖的博客-CSDN博客
3、kafka生成消息:kafka-producer生产者案例_燕少༒江湖的博客-CSDN博客_kafkaproducer单例
4、kafka多线程消费:offset从zookeeper中得到,一个线程对应一个partition,这样消费速度很快,而且消息的顺序可控,线程数量和partition一样,多了浪费资源,少了效率很低,也可以不通过zookeeper来消费,kafka0.9以后的版本就可以将offset记录到对应消费group到对应的broker上。
5、pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.cn.dl</groupId>
<artifactId>kafka-consumer1</artifactId>
<version>1.0</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.43</version>
</dependency>
</dependencies>
</project>
6、KafkaConsumterMain
package com.dl.cn;
import java.io.IOException;
import java.util.Properties;
/**
* Created by tiger on 2018/8/20.
*/
public class KafkaConsumterMain {
public static void main(String[] args) throws IOException {
String topic = "user-info";
int threadNum = 2;
Properties properties = ReadPropertiesUtils.readConfig("config.properties");
KafkaConsumterServer kafkaConsumterDemo = new KafkaConsumterServer(topic,threadNum,properties);
kafkaConsumterDemo.consumer();
}
}
7、KafkaConsumterServer
package com.dl.cn;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by tiger on 2018/8/20.
*/
public class KafkaConsumterServer {
private String topic;
private Properties properties;
private int threadNum;
public KafkaConsumterServer(String topic,int threadNum,Properties properties) {
this.topic = topic;
this.threadNum = threadNum;
this.properties = properties;
}
/**
* 创建固定线程池消费消息
* 线程和partition一对一
* */
public void consumer() {
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(threadNum));
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(consumerConfig);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
//创建固定数量的线程池
ExecutorService executor = Executors.newFixedThreadPool(threadNum);
for (KafkaStream stream : streams) {
executor.submit(new KafkaConsumerThread(stream));
}
}
}
8、KafkaConsumerThread
package com.dl.cn;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
/**
* Created by tiger on 2018/8/20.
*/
public class KafkaConsumerThread implements Runnable{
private KafkaStream<byte[], byte[]> stream;
public KafkaConsumerThread(KafkaStream<byte[], byte[]> stream) {
this.stream = stream;
}
@Override
public void run() {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> mam = it.next();
System.out.println(Thread.currentThread().getName() + ">>>partition[" + mam.partition() + "],"
+ "offset[" + mam.offset() + "], " + new String(mam.message()));
}
}
}
9、ReadPropertiesUtils
package com.dl.cn;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
/**
* Created by tiger on 2018/8/18.
*/
public class ReadPropertiesUtils {
/**
* 读取properties配置文件
* @param configFileName
* @exception
* @return
* */
public static Properties readConfig(String configFileName) throws IOException {
Map<String,String> config = new HashMap<String, String>();
InputStream in = ReadPropertiesUtils.class.getClassLoader().getResourceAsStream(configFileName);
Properties properties = new Properties();
properties.load(in);
return properties;
}
}
10、config.properties
11、测试结果:
线程1对应partition0,线程2对应partition1,两者互不干扰
12、
auto.offset.reset=smallest,意思是从topic最早数据开始消费
auto.offset.reset=largest,是从topic最新数据开始消费
在zk中可以看到消费组
比如在代码中用到tiger7777这个消费者组
在代码中看到线程2最后消费的消息offset=1755
线程1最后消费的消息offset=2243
zookeeper中记录的offset值
生产者不断生产数据,消费者不断消费数据
将tiger7777,中partition对应的offset的值更新为200,然后重新启动
消费者,发现消息从offset=200开始重新消费,而且发现只有一个线程在继续消费
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/182243.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...