1.在SpringBoot整合kafka
- 添加依赖 kafka-clients
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.0</version> </dependency>
2.SpringBoot2.x整合Kafka客户端+adminApi单元测试
单元测试配置客户端+创建topic
package net.xdclass.xdclasskafka; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.CreateTopicsResult; import org.apache.kafka.clients.admin.NewTopic; import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Properties; import java.util.concurrent.ExecutionException; public class KafkaAdminTest { private static final String TOPIC_NAME = "xdclass-sp-topic-test"; /** * 设置admin 客户端 * @return */ public static AdminClient initAdminClient(){ Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"81.71.147.62:9092"); AdminClient adminClient = AdminClient.create(properties); return adminClient; } @Test public void createTopicTest(){ AdminClient adminClient = initAdminClient(); //指定分区数量,副本数量 NewTopic newTopic = new NewTopic(TOPIC_NAME,2,(short) 1); CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic)); try { //future等待创建,成功则不会有任何报错 createTopicsResult.all().get(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } } }
查看topic
./kafka-topics.sh --list --zookeeper 81.71.147.62:2181
查看broker节点topic状态信息
./kafka-topics.sh --describe --zookeeper 81.71.147.62:2181 --topic xdclass-sp-topic-test
可以看到对应信息
3.Kafka使用JavaAPI-AdminClient删除和列举topic
/** * 列举topic列表 * @throws ExecutionException * @throws InterruptedException */ @Test public void listTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); //是否查看内部的topic,可以不用 ListTopicsOptions options = new ListTopicsOptions(); options.listInternal(true); ListTopicsResult listTopicsResult = adminClient.listTopics(options); Set<String> topics = listTopicsResult.names().get(); for(String name : topics){ System.err.println(name); } }
控制台输出
xdclass-topic
xdclass-sp-topic-test
t1
__consumer_offsets
t2
/** * 删除topic */ @Test public void delTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("xdclass-sp11-topic","version1-topic","my-topic")); deleteTopicsResult.all().get(); }
删除控制台输出为空
4.AdminClientApi查看Topic详情和增加分区数量
/** * 查看某个topic详情 */ @Test public void detailTopicTest() throws ExecutionException, InterruptedException { AdminClient adminClient = initAdminClient(); DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME)); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get(); Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet(); entries.stream().forEach((entry)-> System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue())); }
/** * 增加分区数量 * * 如果当主题中的消息包含有key时(即key不为null),根据key来计算分区的行为就会有所影响消息顺序性 * * 注意:Kafka中的分区数只能增加不能减少,减少的话数据不知怎么处理 * * @throws Exception */ @Test public void incrPartitionTopicTest() throws ExecutionException, InterruptedException { Map<String,NewPartitions> infoMap = new HashMap<>(1); AdminClient adminClient = initAdminClient(); NewPartitions newPartitions = NewPartitions.increaseTo(5); infoMap.put(TOPIC_NAME,newPartitions); CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(infoMap); createPartitionsResult.all().get(); }
本文作者为DBC,转载请注明。