大家好,又见面了,我是你们的朋友全栈君。
代码功能
在 java 代码中调用 scala 接口 addPartitions
.
使用场景
在kafka中如果需要定制kafka-topic的管理,那么其中一个功能很可能会用到:增加partition数量。
但是在kafka-1.0.x之上的版本的AdminUtils
中预留了相关的api addPartitions
,具体功能的实现可以参考下面源码(scala):
/** * Add partitions to existing topic with optional replica assignment * * @param zkUtils Zookeeper utilities * @param topic Topic for adding partitions to * @param existingAssignment A map from partition id to its assigned replicas * @param allBrokers All brokers in the cluster * @param numPartitions Number of partitions to be set * @param replicaAssignment Manual replica assignment, or none * @param validateOnly If true, validate the parameters without actually adding the partitions * @return the updated replica assignment */
@deprecated("This method is deprecated and will be replaced by kafka.zk.AdminZkClient.", "1.1.0")
def addPartitions(zkUtils: ZkUtils,
topic: String,
existingAssignment: Map[Int, Seq[Int]],
allBrokers: Seq[BrokerMetadata],
numPartitions: Int = 1,
replicaAssignment: Option[Map[Int, Seq[Int]]] = None,
validateOnly: Boolean = false): Map[Int, Seq[Int]] = {
val existingAssignmentPartition0 = existingAssignment.getOrElse(0,
throw new AdminOperationException(
s"Unexpected existing replica assignment for topic '$topic', partition id 0 is missing. " +
s"Assignment: $existingAssignment"))
val partitionsToAdd = numPartitions - existingAssignment.size
if (partitionsToAdd <= 0)
throw new InvalidPartitionsException(
s"The number of partitions for a topic can only be increased. " +
s"Topic $topic currently has ${existingAssignment.size} partitions, " +
s"$numPartitions would not be an increase.")
replicaAssignment.foreach { proposedReplicaAssignment =>
validateReplicaAssignment(proposedReplicaAssignment, existingAssignmentPartition0,
allBrokers.map(_.id).toSet)
}
val proposedAssignmentForNewPartitions = replicaAssignment.getOrElse {
val startIndex = math.max(0, allBrokers.indexWhere(_.id >= existingAssignmentPartition0.head))
AdminUtils.assignReplicasToBrokers(allBrokers, partitionsToAdd, existingAssignmentPartition0.size,
startIndex, existingAssignment.size)
}
val proposedAssignment = existingAssignment ++ proposedAssignmentForNewPartitions
if (!validateOnly) {
info(s"Creating $partitionsToAdd partitions for '$topic' with the following replica assignment: " +
s"$proposedAssignmentForNewPartitions.")
// add the combined new list
AdminUtils.createOrUpdateTopicPartitionAssignmentPathInZK(zkUtils, topic, proposedAssignment, update = true)
}
proposedAssignment
}
复制代码
虽然 java 和 scala 之间的契合性很强,但是上述的参数列表在 java 中实现并调用该接口的复杂度还是比较高的,本人也是花费了一定的时间才调试通过的。
具体实现
下面部分是在 java 中调用该接口的实现:
public class addPartitions {
public static boolean addPartitions(String zookeeperUri, String topic, int partitionNum) {
boolean succeed = true;
ZkClient zkClient = createZkClient(zookeeperUri);
ZkUtils zkUtils = ZkUtils.apply(zkClient, JaasUtils.isZkSecurityEnabled());
//get existing assignment
Seq<String> names = JavaConverters.asScalaBufferConverter(Arrays.asList(topic)).asScala();
scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>> assignment = (scala.collection.mutable.Map<String, scala.collection.Map<Object, Seq<Object>>>)zkUtils.getPartitionAssignmentForTopics(names);
Map<String, scala.collection.Map<Object, Seq<Object>>> partitionaAssigmentMap = JavaConverters.mutableMapAsJavaMapConverter(assignment).asJava();
//get all brokers metadata in the cluster
Seq<BrokerMetadata> allBrokerMeta = AdminUtils.getBrokerMetadatas(zkUtils, RackAwareMode.Enforced$.MODULE$, scala.Option.apply(null));
try {
AdminUtils.addPartitions(zkUtils, topic, partitionaAssigmentMap.get(topic), allBrokerMeta, partitionNum, Option.empty(), false);
} catch (InvalidPartitionsException exception) {
System.out.println("exception", exception);
succeed = false;
}
return succeed;
}
}
复制代码
此外,推荐 java 和 scala 之间格式转化的神器类 JavaConverters
.
发布者:全栈程序员-用户IM,转载请注明出处:https://javaforall.cn/153334.html原文链接:https://javaforall.cn
【正版授权,激活自己账号】: Jetbrains全家桶Ide使用,1年售后保障,每天仅需1毛
【官方授权 正版激活】: 官方授权 正版激活 支持Jetbrains家族下所有IDE 使用个人JB账号...