1.在SpringBoot整合kafka
- 添加依赖 kafka-clients
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.0</version>
</dependency> 2.SpringBoot2.x整合Kafka客户端+adminApi单元测试
单元测试配置客户端+创建topic
package net.xdclass.xdclasskafka;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaAdminTest {
private static final String TOPIC_NAME = "xdclass-sp-topic-test";
/**
* 设置admin 客户端
* @return
*/
public static AdminClient initAdminClient(){
Properties properties = new Properties();
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"81.71.147.62:9092");
AdminClient adminClient = AdminClient.create(properties);
return adminClient;
}
@Test
public void createTopicTest(){
AdminClient adminClient = initAdminClient();
//指定分区数量,副本数量
NewTopic newTopic = new NewTopic(TOPIC_NAME,2,(short) 1);
CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
try {
//future等待创建,成功则不会有任何报错
createTopicsResult.all().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
查看topic
./kafka-topics.sh --list --zookeeper 81.71.147.62:2181
查看broker节点topic状态信息
./kafka-topics.sh --describe --zookeeper 81.71.147.62:2181 --topic xdclass-sp-topic-test
可以看到对应信息

3.Kafka使用JavaAPI-AdminClient删除和列举topic
/**
* 列举topic列表
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void listTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
//是否查看内部的topic,可以不用
ListTopicsOptions options = new ListTopicsOptions();
options.listInternal(true);
ListTopicsResult listTopicsResult = adminClient.listTopics(options);
Set<String> topics = listTopicsResult.names().get();
for(String name : topics){
System.err.println(name);
}
}
控制台输出
xdclass-topic
xdclass-sp-topic-test
t1
__consumer_offsets
t2
/**
* 删除topic
*/
@Test
public void delTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-sp11-topic","version1-topic","my-topic"));
deleteTopicsResult.all().get();
} 删除控制台输出为空




4.AdminClientApi查看Topic详情和增加分区数量
/**
* 查看某个topic详情
*/
@Test
public void detailTopicTest() throws ExecutionException, InterruptedException {
AdminClient adminClient = initAdminClient();
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();
Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue()));
} /**
* 增加分区数量
*
* 如果当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响消息顺序性
*
* 注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理
*
* @throws Exception
*/
@Test
public void incrPartitionTopicTest() throws ExecutionException, InterruptedException {
Map<String,NewPartitions> infoMap = new HashMap<>(1);
AdminClient adminClient = initAdminClient();
NewPartitions newPartitions = NewPartitions.increaseTo(5);
infoMap.put(TOPIC_NAME,newPartitions);
CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap);
createPartitionsResult.all().get();
}
本文作者为DBC,转载请注明。