大家好,又见面了,我是你们的朋友全栈君。如果您正在找激活码,请点击查看最新教程,关注关注公众号 “全栈程序员社区” 获取激活教程,可能之前旧版本教程已经失效.最新Idea2022.1教程亲测有效,一键激活。
Jetbrains全系列IDE稳定放心使用
之前的csdn找不回来了,决定重新注册一个。望支持~~~
为了解决多台服务,共同消费kafka消息,目前采用的是随机partition。
画了个简图:
/**
* @ClassName: RiskPartitioner
* @author DHing
*
*/
public class RiskPartitioner implements Partitioner {
private Logger LOG = LoggerFactory.getLogger(getClass());
/* (非 Javadoc)
*
*
* @param topic
* @param key
* @param keyBytes
* @param value
* @param valueBytes
* @param cluster
* @return
* @see org.apache.kafka.clients.producer.Partitioner#partition(java.lang.String, java.lang.Object, byte[], java.lang.Object, byte[], org.apache.kafka.common.Cluster)
*这个方法就决定了消息往哪个分区里面发送
这个方法的返回值就是表示我们的数据要去哪个分区,如果返回值是0,表示我们的数据去0分区
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
LOG .info("Collecting Kafka data:[ topic : {} ], [ value : {} ] " , topic, value);
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
int num = partitions.size();
int partNum = 0;
try {
// partNum = Integer.parseInt((String) key);
partNum = new Random().nextInt(255);
} catch (Exception e) {
partNum = key.hashCode();
}
return Math.abs(partNum % num);
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
我们定定义分区过后,需要加入到Config进行生效:
@Configuration
public class KafkaProducerConfig {
@Value("${kafka.server.producer.urls}")
private String urls;
@Value("${kafka.server.producer.key}")
private String key;
@Value("${kafka.server.producer.value}")
private String value;
private String acks;
private String retries;
private String batchSize;
private String partitioner;
public Properties getProp(){
Properties props = new Properties();
props.put("bootstrap.servers", urls);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//自定义分区类
props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
props.put("key.serializer", key);
props.put("value.serializer", value);
return props;
}
@Bean
@Qualifier("kafkaProducer")
public KafkaProducer<LongSerializer, StringSerializer> getKafka() {
Properties props = new Properties();
// props.put("bootstrap.servers", urls);
// props.put("key.serializer", key);
// props.put("value.serializer", value);
props.put("bootstrap.servers", urls);
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
//自定义分区类
props.put("partitioner.class", "这就是我们的定义分区类的包属性, eg: com.x.x.patition");
props.put("key.serializer", key);
props.put("value.serializer", value);
return new KafkaProducer<LongSerializer, StringSerializer>(props);
}
}
亲测没有问题,还有其他的方式进行自定义分区。这个就是算法的问题(个人理解)
之前的csdn找不回来了,决定重新注册一个。望支持~~~
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/184515.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...