天天看點

kafka——AdminClient API

一、Kafka 核心 API

下圖是官方文檔中的一個圖,形象的描述了能與 Kafka內建的用戶端類型

kafka——AdminClient API

Kafka的五類用戶端API類型如下:

  • AdminClient API:允許管理和檢測Topic、broker以及其他Kafka執行個體,與Kafka自帶的腳本指令作用類似。
  • Producer API:釋出消息到1個或多個Topic,也就是生産者或者說釋出方需要用到的API。
  • Consumer API:訂閱1個或多個Topic,并處理産生的消息,也就是消費者或者說訂閱方需要用到的API。
  • Stream API:高效地将輸入流轉換到輸出流,通常應用在一些流處理場景。
  • Connector API:從一些源系統或應用程式拉取資料到Kafka,如上圖中的DB。

本文中,我們将主要介紹 AdminClient API。

二、Topic 建立與删除

2.1、建立 topic

建立 topic 的序列圖如下所示:

kafka——AdminClient API
  • 1、controller 在 ZooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被建立,則 controller 會通過 watch 得到該 topic 的 partition/replica 配置設定。
  • 2、controller從 /brokers/ids 讀取目前所有可用的 broker 清單,對于 set_p 中的每一個 partition:
    • 2.1、從配置設定給該 partition 的所有 replica(稱為AR)中任選一個可用的 broker 作為新的 leader,并将AR設定為新的 ISR
    • 2.2、将新的 leader 和 ISR 寫入 /brokers/topics/[topic]/partitions/[partition]/state
    1. controller 通過 RPC 向相關的 broker 發送 LeaderAndISRRequest。

2.2、删除 topic

删除 topic 的序列圖如下所示:

kafka——AdminClient API
  • 1、controller 在 zooKeeper 的 /brokers/topics 節點上注冊 watcher,當 topic 被删除,則 controller 會通過 watch 得到該 topic 的 partition/replica 配置設定。

三、AdminClient API

3.1、導入相關依賴

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>2.6.0</version>
</dependency>
           

3.2、建構AdminClient

public static AdminClient adminClient(){
	Properties properties = new Properties();
	properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.174.128:9092");
	AdminClient adminClient = AdminClient.create(properties);
	return adminClient;
}
           

3.3、建立Topic執行個體

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 建立Topic執行個體
 */
public static void createTopic(){
	AdminClient adminClient = AdminSample.adminClient();
	//副本因子
	Short re = 1;
	NewTopic newTopic = new NewTopic(TOPIC_NAME,1,re);
	CreateTopicsResult createTopicsResult = adminClient.createTopics(Arrays.asList(newTopic));
	System.out.println("CreateTopicsResult : " + createTopicsResult);
	adminClient.close();
}
           

3.4、建立Topic執行個體

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 擷取topic清單
 */
public static void topicList() throws Exception {
	AdminClient adminClient = adminClient();

	//是否檢視Internal選項
	ListTopicsOptions options = new ListTopicsOptions();
	options.listInternal(true);

	//ListTopicsResult listTopicsResult = adminClient.listTopics();
	ListTopicsResult listTopicsResult = adminClient.listTopics(options);
	Set<String> names = listTopicsResult.names().get();

	//列印names
	names.stream().forEach(System.out::println);

	Collection<TopicListing> topicListings = listTopicsResult.listings().get();
	//列印TopicListing
	topicListings.stream().forEach((topicList) -> {
		System.out.println(topicList.toString());
	});
	adminClient.close();
}
           

3.5、删除topic

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 删除topic
 */
public static void delTopic() throws Exception {
	AdminClient adminClient = adminClient();
	DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
	deleteTopicsResult.all().get();
}
           

3.6、描述topic

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 描述topic
 * name: yibo_topic
 * desc: (name=yibo_topic,
 *      internal=false,
 *      partitions=
 *          (partition=0,
 *          leader=192.168.174.128:9092 (id: 0 rack: null),
 *          replicas=192.168.174.128:9092 (id: 0 rack: null),
 *          isr=192.168.174.128:9092 (id: 0 rack: null)),
 *          authorizedOperations=null)
 * @throws Exception
 */
public static void describeTopic() throws Exception {
	AdminClient adminClient = adminClient();
	DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));
	Map<String, TopicDescription> descriptionMap = describeTopicsResult.all().get();
	descriptionMap.forEach((key,value) -> {
		System.out.println("name: " + key+" desc: " + value);
	});
}
           

3.7、查詢配置資訊

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 查詢配置資訊
 * ConfigResource(type=TOPIC, name='yibo_topic')
 * Config(
 *      entries=
 *          [ConfigEntry(name=compression.type, value=producer, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=leader.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=message.downconversion.enable, value=true, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=min.insync.replicas, value=1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=segment.jitter.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=cleanup.policy, value=delete, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=flush.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=follower.replication.throttled.replicas, value=, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=segment.bytes, value=1073741824, source=STATIC_BROKER_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=retention.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=flush.messages, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=message.format.version, value=2.6-IV0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=max.compaction.lag.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=file.delete.delay.ms, value=60000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=max.message.bytes, value=1048588, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=min.compaction.lag.ms, value=0, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=message.timestamp.type, value=CreateTime, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=preallocate, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=min.cleanable.dirty.ratio, value=0.5, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=index.interval.bytes, value=4096, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=unclean.leader.election.enable, value=false, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=retention.bytes, value=-1, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=delete.retention.ms, value=86400000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=segment.ms, value=604800000, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=message.timestamp.difference.max.ms, value=9223372036854775807, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[]),
 *          ConfigEntry(name=segment.index.bytes, value=10485760, source=DEFAULT_CONFIG, isSensitive=false, isReadOnly=false, synonyms=[])])
 * @throws Exception
 */
public static void describeConfig() throws Exception {
	AdminClient adminClient = adminClient();
	//TODO 這裡做一個預留,叢集時會講到
	//ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER,TOPIC_NAME);

	ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
	DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
	Map<ConfigResource, Config> resourceConfigMap = describeConfigsResult.all().get();
	resourceConfigMap.forEach((key,value) -> {
		System.out.println(key + " " + value);
	});
}
           

3.8、修改配置資訊 老版API

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 修改配置資訊 老版API
 * @throws Exception
 */
public static void alterConfig1() throws Exception {
	AdminClient adminClient = adminClient();
	Map<ConfigResource,Config> configMap = new HashMap<>();
	ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
	Config config = new Config(Arrays.asList(new ConfigEntry("preallocate","true")));
	configMap.put(configResource,config);
	AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
	alterConfigsResult.all().get();
}
           

3.9、修改配置資訊 新版API

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 修改配置資訊 新版API
 * @throws Exception
 */
public static void alterConfig2() throws Exception {
	AdminClient adminClient = adminClient();
	Map<ConfigResource, Collection<AlterConfigOp>> configMap = new HashMap<>();
	ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,TOPIC_NAME);
	AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
	configMap.put(configResource,Arrays.asList(alterConfigOp));
	AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMap);
	alterConfigsResult.all().get();
}
           

3.10、增加partitions數量

private static final String TOPIC_NAME = "yibo_topic";

/**
 * 增加partitions數量
 * @param partitions
 * @throws Exception
 */
public static void incrPartitions(int partitions) throws Exception {
	AdminClient adminClient = adminClient();
	Map<String,NewPartitions> partitionsMap = new HashMap<>();
	NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
	partitionsMap.put(TOPIC_NAME,newPartitions);
	CreatePartitionsResult partitionsResult = adminClient.createPartitions(partitionsMap);
	partitionsResult.all().get();
}