天天看點

kafka常用指令總結 kafka指令

版權聲明:本文為部落客原創文章,未經部落客允許不得轉載。 https://blog.csdn.net/qq_26654727/article/details/82752988

文章目錄

1.1 Adding and removing topics

add:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic my_topic_name
       --partitions 20 --replication-factor 3 --config x=y
           

remove:

1.2 Modifying topics

add partition:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name
       --partitions 40
           

add configes:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --config x=y
           

To remove a config:

> bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic my_topic_name --deleteConfig x
           

delete config (不使用):

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name
           

bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot
           

前提是需要加上配置

auto.leader.rebalance.enable=true
           

bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zkconnect localhost:2181 --group test
Group           Topic                          Pid Offset          logSize         Lag             Owner
my-group        my-topic                       0   0               0               0               test_jkreps-mn-1394154511599-60744496-0
my-group        my-topic                       1   0               0               0               test_jkreps-mn-1394154521217-1a0be913-0
           

1.5 Expanding your cluster(水準擴充叢集)

  • –generate: 在此模式下,給定主題清單和代理清單,該工具會生成候選重新配置設定,以将指定主題的所有分區移動到新代理。此選項僅提供了一種友善的方法,可在給定主題和目标代理清單的情況下生成分區重新配置設定計劃。
  • –execute:在此模式下,該工具将根據使用者提供的重新配置設定計劃啟動分區的重新配置設定。 (使用–reassignment-json-file選項)。這可以是由管理者手工制作的自定義重新配置設定計劃,也可以使用–generate選項提供
  • –verify: 在此模式下,該工具将驗證最後一次–execute期間列出的所有分區的重新配置設定狀态。狀态可以是成功完成,失敗或正在進行中

> cat topics-to-move.json
{"topics": [{"topic": "foo1"},
            {"topic": "foo2"}],
 "version":1
}
           

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" --generate
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Proposed partition reassignment configuration

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}
           

3.該工具生成一個候選配置設定,将所有分區從主題foo1,foo2移動到代理5,6。 但請注意,此時分區移動尚未開始,它隻是告訴您目前的配置設定和建議的新配置設定。 應儲存目前配置設定,以防您想要復原它。 新的配置設定應儲存在json檔案中(例如expand-cluster-reassignment.json),以便使用–execute選項輸入工具,如下所示 -

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2]},
               {"topic":"foo1","partition":0,"replicas":[3,4]},
               {"topic":"foo2","partition":2,"replicas":[1,2]},
               {"topic":"foo2","partition":0,"replicas":[3,4]},
               {"topic":"foo1","partition":1,"replicas":[2,3]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":2,"replicas":[5,6]},
               {"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":2,"replicas":[5,6]},
               {"topic":"foo2","partition":0,"replicas":[5,6]},
               {"topic":"foo1","partition":1,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[5,6]}]
}
           

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition [foo1,0] completed successfully
Reassignment of partition [foo1,1] is in progress
Reassignment of partition [foo1,2] is in progress
Reassignment of partition [foo2,0] completed successfully
Reassignment of partition [foo2,1] completed successfully
Reassignment of partition [foo2,2] completed successfully
           

1.6 自定義分區配置設定和遷移

cat custom-reassignment.json
{
	"version": 1,
	"partitions": [{
		"topic": "foo1",
		"partition": 0,
		"replicas": [5, 6]
	}, {
		"topic": "foo2",
		"partition": 1,
		"replicas": [2, 3]
	}]
}
           

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}
           

> bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file custom-reassignment.json --execute
Current partition replica assignment

{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[1,2]},
               {"topic":"foo2","partition":1,"replicas":[3,4]}]
}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions
{"version":1,
 "partitions":[{"topic":"foo1","partition":0,"replicas":[5,6]},
               {"topic":"foo2","partition":1,"replicas":[2,3]}]
}
           

1.7 Increasing replication factor(新增副本數)

邏輯和上兩個一緻 通過json檔案 手動修改副本資料位置

提供json檔案:

> cat increase-replication-factor.json
{
	"version": 1,
	"partitions": [{
		"topic": "foo",
		"partition": 0,
		"replicas": [5, 6, 7]
	}]
}
           

bin/zookeeper-server-start.sh config/zookeeper.properties &
           

bin/kafka-server-start.sh config/server.properties &

bin/kafka-server-stop.sh

bin/zookeeper-server-stop.sh

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

bin/kafka-topics.sh --list --zookeeper localhost:2181

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

bin/kafka-console-producer.sh --broker-list 130.51.23.95:9092 --topic my-replicated-topic

bin/kafka-console-consumer.sh --zookeeper 130.51.23.95:2181 --topic test --from-beginnin